use crate::sql_test::util::{generate_random_bytes, setup_db, test_create_user}; use collab_entity::CollabType; use database::collab::{ insert_into_af_collab, insert_into_af_collab_bulk_for_user, select_blob_from_af_collab, select_collab_meta_from_af_collab, }; use database_entity::dto::CollabParams; use sqlx::PgPool; #[sqlx::test(migrations = false)] async fn insert_collab_sql_test(pool: PgPool) { setup_db(&pool).await.unwrap(); let user_uuid = uuid::Uuid::new_v4(); let name = user_uuid.to_string(); let email = format!("{}@appflowy.io", name); let user = test_create_user(&pool, user_uuid, &email, &name) .await .unwrap(); let mut object_ids = vec![]; let data_sizes = vec![ 5120, // 5 KB 10240, // 10 KB 102400, // 100 KB 512000, // 500 KB 5120000, // 5 MB ]; let start_time = std::time::Instant::now(); for &data_size in &data_sizes { let encoded_collab_v1 = generate_random_bytes(data_size); let object_id = uuid::Uuid::new_v4().to_string(); object_ids.push(object_id.clone()); let mut txn = pool.begin().await.unwrap(); let params = CollabParams { object_id, collab_type: CollabType::Unknown, encoded_collab_v1: encoded_collab_v1.into(), embeddings: None, }; insert_into_af_collab(&mut txn, &user.uid, &user.workspace_id, ¶ms) .await .unwrap(); txn.commit().await.unwrap(); } let duration = start_time.elapsed(); println!("Insert time: {:?}", duration); for object_id in object_ids { let meta = select_collab_meta_from_af_collab(&pool, &object_id, &CollabType::Unknown) .await .unwrap() .unwrap(); assert_eq!(meta.oid, object_id); assert_eq!(meta.workspace_id.to_string(), user.workspace_id); assert!(meta.created_at.is_some()); assert!(meta.deleted_at.is_none()); } } #[sqlx::test(migrations = false)] async fn insert_bulk_collab_sql_test(pool: PgPool) { setup_db(&pool).await.unwrap(); let user_uuid = uuid::Uuid::new_v4(); let name = user_uuid.to_string(); let email = format!("{}@appflowy.io", name); let user = test_create_user(&pool, user_uuid, &email, &name) .await .unwrap(); let mut object_ids = vec![]; let data_sizes = vec![ 5120, // 5 KB 10240, // 10 KB 102400, // 100 KB 512000, // 500 KB 5120000, // 5 MB ]; let mut collab_params_list = vec![]; let mut original_data_list = vec![]; // Store original data for validation // Prepare bulk insert data for &data_size in &data_sizes { let encoded_collab_v1 = generate_random_bytes(data_size); let object_id = uuid::Uuid::new_v4().to_string(); object_ids.push(object_id.clone()); let params = CollabParams { object_id, collab_type: CollabType::Unknown, encoded_collab_v1: encoded_collab_v1.clone().into(), // Store the original data for validation embeddings: None, }; collab_params_list.push(params); original_data_list.push(encoded_collab_v1); // Keep track of original data } // Perform bulk insert let start_time = std::time::Instant::now(); // Start timing let mut txn = pool.begin().await.unwrap(); insert_into_af_collab_bulk_for_user(&mut txn, &user.uid, &user.workspace_id, &collab_params_list) .await .unwrap(); txn.commit().await.unwrap(); let duration = start_time.elapsed(); println!("Bulk insert time: {:?}", duration); // Validate inserted data for (i, object_id) in object_ids.iter().enumerate() { let inserted_data = select_blob_from_af_collab(&pool, &CollabType::Unknown, object_id) .await .unwrap(); // Ensure the inserted data matches the original data let original_data = &original_data_list[i]; assert_eq!( inserted_data, *original_data, "Data mismatch for object_id: {}", object_id ); println!( "Validated data size: {} bytes for object_id: {}", original_data.len(), object_id ); } } #[sqlx::test(migrations = false)] async fn test_bulk_insert_empty_collab_list(pool: PgPool) { setup_db(&pool).await.unwrap(); let user_uuid = uuid::Uuid::new_v4(); let user = test_create_user(&pool, user_uuid, "test@appflowy.io", "test_user") .await .unwrap(); let collab_params_list: Vec = vec![]; // Empty list let mut txn = pool.begin().await.unwrap(); let result = insert_into_af_collab_bulk_for_user( &mut txn, &user.uid, &user.workspace_id, &collab_params_list, ) .await; assert!(result.is_ok()); txn.commit().await.unwrap(); } #[sqlx::test(migrations = false)] async fn test_bulk_insert_duplicate_oid_partition_key(pool: PgPool) { setup_db(&pool).await.unwrap(); let user_uuid = uuid::Uuid::new_v4(); let user = test_create_user(&pool, user_uuid, "test@appflowy.io", "test_user") .await .unwrap(); let object_id = uuid::Uuid::new_v4().to_string(); let encoded_collab_v1 = generate_random_bytes(1024); // 1KB of random data // Two items with the same oid and partition_key let collab_params_list = vec![ CollabParams { object_id: object_id.clone(), collab_type: CollabType::Unknown, encoded_collab_v1: encoded_collab_v1.clone().into(), embeddings: None, }, CollabParams { object_id: object_id.clone(), // Duplicate oid collab_type: CollabType::Unknown, encoded_collab_v1: generate_random_bytes(2048).into(), // Different data to test update embeddings: None, }, ]; let mut txn = pool.begin().await.unwrap(); insert_into_af_collab_bulk_for_user(&mut txn, &user.uid, &user.workspace_id, &collab_params_list) .await .unwrap(); txn.commit().await.unwrap(); // Validate the data was updated, not duplicated let data = select_blob_from_af_collab(&pool, &CollabType::Unknown, &object_id) .await .unwrap(); assert_eq!(data, encoded_collab_v1); // should equal the data that insert first time } #[sqlx::test(migrations = false)] async fn test_batch_insert_comparison(pool: PgPool) { setup_db(&pool).await.unwrap(); let user_uuid = uuid::Uuid::new_v4(); let user = test_create_user(&pool, user_uuid, "test@appflowy.io", "test_user") .await .unwrap(); // Define the different test cases let row_sizes = vec![1024, 5 * 1024]; // 1KB and 5KB row sizes let total_rows = vec![500, 1000, 2000, 3000, 6000]; // Number of rows let chunk_sizes = vec![2000]; // Chunk size for batch inserts // Iterate over the different row sizes for row_size in row_sizes { // Iterate over the different total row counts for &total_row_count in &total_rows { // Generate data for the total row count let collab_params_list: Vec = (0..total_row_count) .map(|_| CollabParams { object_id: uuid::Uuid::new_v4().to_string(), collab_type: CollabType::Unknown, encoded_collab_v1: generate_random_bytes(row_size).into(), // Generate random bytes for the given row size embeddings: None, }) .collect(); // Group the results for readability println!("\n=============================="); println!( "Row Size: {}KB, Total Rows: {}", row_size / 1024, total_row_count ); // === Test Case 1: Insert all rows in one batch === let start_time = std::time::Instant::now(); let mut txn = pool.begin().await.unwrap(); let result = insert_into_af_collab_bulk_for_user( &mut txn, &user.uid, &user.workspace_id, &collab_params_list, ) .await; assert!(result.is_ok()); // Ensure the insert doesn't fail txn.commit().await.unwrap(); let total_time_single_batch = start_time.elapsed(); println!( "Batch Insert - Time for inserting {} rows of size {}KB in one batch: {:?}", total_row_count, row_size / 1024, total_time_single_batch ); // === Test Case 2: Insert rows in chunks === for &chunk_size in &chunk_sizes { let mut total_time_multiple_batches = std::time::Duration::new(0, 0); for chunk in collab_params_list.chunks(chunk_size) { let start_time = std::time::Instant::now(); let mut txn = pool.begin().await.unwrap(); let result = insert_into_af_collab_bulk_for_user(&mut txn, &user.uid, &user.workspace_id, chunk) .await; assert!(result.is_ok()); // Ensure the insert doesn't fail txn.commit().await.unwrap(); total_time_multiple_batches += start_time.elapsed(); } println!( "Chunked Insert - Time for inserting {} rows of size {}KB in {}-row chunks: {:?}", total_row_count, row_size / 1024, chunk_size, total_time_multiple_batches ); } println!("==============================\n"); } } }