diff --git a/admin_frontend/src/web_app.rs b/admin_frontend/src/web_app.rs index 05665a1c..ce502bba 100644 --- a/admin_frontend/src/web_app.rs +++ b/admin_frontend/src/web_app.rs @@ -157,7 +157,7 @@ async fn login_callback_query_handler( let found = accepted_invitations .iter() .find(|w| w.invite_id.to_string() == invite_id); - if let Some(_) = found { + if found.is_some() { return Ok((jar, render_template(templates::OpenAppFlowyOrDownload {})?)); } } diff --git a/deploy.env b/deploy.env index a1c399d6..12055165 100644 --- a/deploy.env +++ b/deploy.env @@ -8,6 +8,7 @@ APPFLOWY_GOTRUE_BASE_URL=http://gotrue:9999 APPFLOWY_DATABASE_URL=postgres://postgres:password@postgres:5432/postgres APPFLOWY_ACCESS_CONTROL=true APPFLOWY_WEBSOCKET_MAILBOX_SIZE=6000 +APPFLOWY_DATABASE_MAX_CONNECTIONS=40 # admin frontend ## URL that connects to redis docker container diff --git a/dev.env b/dev.env index 4c91a531..6b25d2f2 100644 --- a/dev.env +++ b/dev.env @@ -3,6 +3,7 @@ APPFLOWY_GOTRUE_BASE_URL=http://localhost:9999 APPFLOWY_DATABASE_URL=postgres://postgres:password@localhost:5432/postgres APPFLOWY_ACCESS_CONTROL=true APPFLOWY_WEBSOCKET_MAILBOX_SIZE=6000 +APPFLOWY_DATABASE_MAX_CONNECTIONS=40 # This file is used to set the environment variables for local development # Copy this file to .env and change the values as needed diff --git a/docker-compose-ci.yml b/docker-compose-ci.yml index 9ba4442a..cb3c355c 100644 --- a/docker-compose-ci.yml +++ b/docker-compose-ci.yml @@ -112,6 +112,8 @@ services: - APPFLOWY_S3_BUCKET=${APPFLOWY_S3_BUCKET} - APPFLOWY_S3_REGION=${APPFLOWY_S3_REGION} - APPFLOWY_ACCESS_CONTROL=${APPFLOWY_ACCESS_CONTROL} + # For the CI testing, we set the database connection to 20. The default value is 40. + - APPFLOWY_DATABASE_MAX_CONNECTIONS=20 - APPFLOWY_AI_SERVER_HOST=${APPFLOWY_AI_SERVER_HOST} - APPFLOWY_AI_SERVER_PORT=${APPFLOWY_AI_SERVER_PORT} build: diff --git a/docker-compose.yml b/docker-compose.yml index 5557907c..ca79ed59 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -108,6 +108,7 @@ services: - APPFLOWY_S3_BUCKET=${APPFLOWY_S3_BUCKET} - APPFLOWY_S3_REGION=${APPFLOWY_S3_REGION} - APPFLOWY_ACCESS_CONTROL=${APPFLOWY_ACCESS_CONTROL} + - APPFLOWY_DATABASE_MAX_CONNECTIONS=${APPFLOWY_DATABASE_MAX_CONNECTIONS} - APPFLOWY_AI_SERVER_HOST=${APPFLOWY_AI_SERVER_HOST} - APPFLOWY_AI_SERVER_PORT=${APPFLOWY_AI_SERVER_PORT} build: diff --git a/libs/database/src/file/file_storage.rs b/libs/database/src/file/file_storage.rs index 22febd5f..289397c9 100644 --- a/libs/database/src/file/file_storage.rs +++ b/libs/database/src/file/file_storage.rs @@ -5,6 +5,7 @@ use crate::resource_usage::{ use app_error::AppError; use async_trait::async_trait; use sqlx::PgPool; + use tracing::{instrument, warn}; use uuid::Uuid; @@ -60,18 +61,16 @@ where } let obj_key = format!("{}/{}", workspace_id, file_id); - let mut tx = self.pg_pool.begin().await?; + self.client.pub_blob(obj_key, &file_data).await?; + insert_blob_metadata( - &mut tx, + &self.pg_pool, &file_id, &workspace_id, &file_type, file_data.len(), ) .await?; - - self.client.pub_blob(obj_key, &file_data).await?; - tx.commit().await?; Ok(()) } diff --git a/libs/database/src/resource_usage.rs b/libs/database/src/resource_usage.rs index dcfa2d90..cc87fc4c 100644 --- a/libs/database/src/resource_usage.rs +++ b/libs/database/src/resource_usage.rs @@ -34,7 +34,7 @@ pub async fn is_blob_metadata_exists( #[instrument(level = "trace", skip_all, err)] pub async fn insert_blob_metadata( - tx: &mut Transaction<'_, sqlx::Postgres>, + pg_pool: &PgPool, file_id: &str, workspace_id: &Uuid, file_type: &str, @@ -54,10 +54,12 @@ pub async fn insert_blob_metadata( file_type, file_size as i64, ) - .execute(tx.deref_mut()) + .execute(pg_pool) .await?; let n = res.rows_affected(); - assert_eq!(n, 1); + if n != 1 { + tracing::error!("insert_blob_metadata: rows_affected: {}", n); + } Ok(()) } diff --git a/tests/collab/pending_write_test.rs b/tests/collab/pending_write_test.rs index 742003b3..efc40ff2 100644 --- a/tests/collab/pending_write_test.rs +++ b/tests/collab/pending_write_test.rs @@ -74,10 +74,21 @@ async fn simulate_small_data_set_write(pool: PgPool) { .await .unwrap(); + assert_eq!( + encode_collab_from_disk.doc_state.len(), + original_encode_collab.doc_state.len(), + "doc_state length mismatch" + ); assert_eq!( encode_collab_from_disk.doc_state, original_encode_collab.doc_state ); + + assert_eq!( + encode_collab_from_disk.state_vector.len(), + original_encode_collab.state_vector.len(), + "state_vector length mismatch" + ); assert_eq!( encode_collab_from_disk.state_vector, original_encode_collab.state_vector @@ -103,49 +114,48 @@ async fn simulate_large_data_set_write(pool: PgPool) { let queue_name = uuid::Uuid::new_v4().to_string(); let storage_queue = StorageQueue::new(collab_cache.clone(), conn, &queue_name); - let queries = Arc::new(Mutex::new(Vec::new())); - for i in 0..3 { - // sleep random seconds less than 2 seconds. because the runtime is single-threaded, - // we need sleep a little time to let the runtime switch to other tasks. - sleep(Duration::from_millis(i % 2)).await; - - let encode_collab = EncodedCollab::new_v1( - generate_random_bytes(10 * 1024), - generate_random_bytes(2 * 1024 * 1024), - ); - let params = CollabParams { - object_id: format!("object_id_{}", i), - collab_type: CollabType::Unknown, - encoded_collab_v1: encode_collab.encode_to_bytes().unwrap(), - }; - storage_queue - .push(&user.workspace_id, &user.uid, ¶ms, WritePriority::Low) - .await - .unwrap(); - queries.lock().await.push((params, encode_collab)); - } + let origin_encode_collab = EncodedCollab::new_v1( + generate_random_bytes(10 * 1024), + generate_random_bytes(2 * 1024 * 1024), + ); + let params = CollabParams { + object_id: uuid::Uuid::new_v4().to_string(), + collab_type: CollabType::Unknown, + encoded_collab_v1: origin_encode_collab.encode_to_bytes().unwrap(), + }; + storage_queue + .push(&user.workspace_id, &user.uid, ¶ms, WritePriority::Low) + .await + .unwrap(); // Allow some time for processing sleep(Duration::from_secs(30)).await; - // Check that all items are processed correctly - for (params, original_encode_collab) in queries.lock().await.iter() { - let query = QueryCollab { - object_id: params.object_id.clone(), - collab_type: params.collab_type.clone(), - }; - let encode_collab_from_disk = collab_cache - .get_encode_collab_from_disk(&user.uid, query) - .await - .unwrap(); + let query = QueryCollab { + object_id: params.object_id.clone(), + collab_type: params.collab_type.clone(), + }; + let encode_collab_from_disk = collab_cache + .get_encode_collab_from_disk(&user.uid, query) + .await + .unwrap(); + assert_eq!( + encode_collab_from_disk.doc_state.len(), + origin_encode_collab.doc_state.len(), + "doc_state length mismatch" + ); + assert_eq!( + encode_collab_from_disk.doc_state, + origin_encode_collab.doc_state + ); - assert_eq!( - encode_collab_from_disk.doc_state, - original_encode_collab.doc_state - ); - assert_eq!( - encode_collab_from_disk.state_vector, - original_encode_collab.state_vector - ); - } + assert_eq!( + encode_collab_from_disk.state_vector.len(), + origin_encode_collab.state_vector.len(), + "state_vector length mismatch" + ); + assert_eq!( + encode_collab_from_disk.state_vector, + origin_encode_collab.state_vector + ); } diff --git a/tests/workspace/blob/put_and_get.rs b/tests/workspace/blob/put_and_get.rs index 53e98360..40ffa2ec 100644 --- a/tests/workspace/blob/put_and_get.rs +++ b/tests/workspace/blob/put_and_get.rs @@ -134,3 +134,32 @@ async fn put_and_delete_workspace() { assert!(is_none); } } + +#[tokio::test] +async fn simulate_30_put_blob_request_test() { + let (c1, _user1) = generate_unique_registered_user_client().await; + let workspace_id = workspace_id_from_client(&c1).await; + + let mut handles = vec![]; + for _ in 0..30 { + let cloned_client = c1.clone(); + let cloned_workspace_id = workspace_id.clone(); + let handle = tokio::spawn(async move { + let mime = mime::TEXT_PLAIN_UTF_8; + let file_id = uuid::Uuid::new_v4().to_string(); + let url = cloned_client.get_blob_url(&cloned_workspace_id, &file_id); + let data = vec![0; 3 * 1024 * 1024]; + cloned_client.put_blob(&url, data, &mime).await.unwrap(); + url + }); + handles.push(handle); + } + + let results = futures::future::join_all(handles).await; + for result in results { + let url = result.unwrap(); + let (_, got_data) = c1.get_blob(&url).await.unwrap(); + assert_eq!(got_data, vec![0; 3 * 1024 * 1024]); + c1.delete_blob(&url).await.unwrap(); + } +}