fix: pool timeout when there are too many put blob requests (#533)

* fix: pool timeout when there are too many put blob requests

* chore: fix clippy

* fix: dont use transaction while putting data

---------

Co-authored-by: Zack Fu Zi Xiang <speed2exe@live.com.sg>
This commit is contained in:
Nathan.fooo 2024-05-08 12:20:06 +08:00 committed by GitHub
parent 1eb29cd614
commit 359433f14c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 94 additions and 49 deletions

View File

@ -157,7 +157,7 @@ async fn login_callback_query_handler(
let found = accepted_invitations
.iter()
.find(|w| w.invite_id.to_string() == invite_id);
if let Some(_) = found {
if found.is_some() {
return Ok((jar, render_template(templates::OpenAppFlowyOrDownload {})?));
}
}

View File

@ -8,6 +8,7 @@ APPFLOWY_GOTRUE_BASE_URL=http://gotrue:9999
APPFLOWY_DATABASE_URL=postgres://postgres:password@postgres:5432/postgres
APPFLOWY_ACCESS_CONTROL=true
APPFLOWY_WEBSOCKET_MAILBOX_SIZE=6000
APPFLOWY_DATABASE_MAX_CONNECTIONS=40
# admin frontend
## URL that connects to redis docker container

View File

@ -3,6 +3,7 @@ APPFLOWY_GOTRUE_BASE_URL=http://localhost:9999
APPFLOWY_DATABASE_URL=postgres://postgres:password@localhost:5432/postgres
APPFLOWY_ACCESS_CONTROL=true
APPFLOWY_WEBSOCKET_MAILBOX_SIZE=6000
APPFLOWY_DATABASE_MAX_CONNECTIONS=40
# This file is used to set the environment variables for local development
# Copy this file to .env and change the values as needed

View File

@ -112,6 +112,8 @@ services:
- APPFLOWY_S3_BUCKET=${APPFLOWY_S3_BUCKET}
- APPFLOWY_S3_REGION=${APPFLOWY_S3_REGION}
- APPFLOWY_ACCESS_CONTROL=${APPFLOWY_ACCESS_CONTROL}
# For the CI testing, we set the database connection to 20. The default value is 40.
- APPFLOWY_DATABASE_MAX_CONNECTIONS=20
- APPFLOWY_AI_SERVER_HOST=${APPFLOWY_AI_SERVER_HOST}
- APPFLOWY_AI_SERVER_PORT=${APPFLOWY_AI_SERVER_PORT}
build:

View File

@ -108,6 +108,7 @@ services:
- APPFLOWY_S3_BUCKET=${APPFLOWY_S3_BUCKET}
- APPFLOWY_S3_REGION=${APPFLOWY_S3_REGION}
- APPFLOWY_ACCESS_CONTROL=${APPFLOWY_ACCESS_CONTROL}
- APPFLOWY_DATABASE_MAX_CONNECTIONS=${APPFLOWY_DATABASE_MAX_CONNECTIONS}
- APPFLOWY_AI_SERVER_HOST=${APPFLOWY_AI_SERVER_HOST}
- APPFLOWY_AI_SERVER_PORT=${APPFLOWY_AI_SERVER_PORT}
build:

View File

@ -5,6 +5,7 @@ use crate::resource_usage::{
use app_error::AppError;
use async_trait::async_trait;
use sqlx::PgPool;
use tracing::{instrument, warn};
use uuid::Uuid;
@ -60,18 +61,16 @@ where
}
let obj_key = format!("{}/{}", workspace_id, file_id);
let mut tx = self.pg_pool.begin().await?;
self.client.pub_blob(obj_key, &file_data).await?;
insert_blob_metadata(
&mut tx,
&self.pg_pool,
&file_id,
&workspace_id,
&file_type,
file_data.len(),
)
.await?;
self.client.pub_blob(obj_key, &file_data).await?;
tx.commit().await?;
Ok(())
}

View File

@ -34,7 +34,7 @@ pub async fn is_blob_metadata_exists(
#[instrument(level = "trace", skip_all, err)]
pub async fn insert_blob_metadata(
tx: &mut Transaction<'_, sqlx::Postgres>,
pg_pool: &PgPool,
file_id: &str,
workspace_id: &Uuid,
file_type: &str,
@ -54,10 +54,12 @@ pub async fn insert_blob_metadata(
file_type,
file_size as i64,
)
.execute(tx.deref_mut())
.execute(pg_pool)
.await?;
let n = res.rows_affected();
assert_eq!(n, 1);
if n != 1 {
tracing::error!("insert_blob_metadata: rows_affected: {}", n);
}
Ok(())
}

View File

@ -74,10 +74,21 @@ async fn simulate_small_data_set_write(pool: PgPool) {
.await
.unwrap();
assert_eq!(
encode_collab_from_disk.doc_state.len(),
original_encode_collab.doc_state.len(),
"doc_state length mismatch"
);
assert_eq!(
encode_collab_from_disk.doc_state,
original_encode_collab.doc_state
);
assert_eq!(
encode_collab_from_disk.state_vector.len(),
original_encode_collab.state_vector.len(),
"state_vector length mismatch"
);
assert_eq!(
encode_collab_from_disk.state_vector,
original_encode_collab.state_vector
@ -103,49 +114,48 @@ async fn simulate_large_data_set_write(pool: PgPool) {
let queue_name = uuid::Uuid::new_v4().to_string();
let storage_queue = StorageQueue::new(collab_cache.clone(), conn, &queue_name);
let queries = Arc::new(Mutex::new(Vec::new()));
for i in 0..3 {
// sleep random seconds less than 2 seconds. because the runtime is single-threaded,
// we need sleep a little time to let the runtime switch to other tasks.
sleep(Duration::from_millis(i % 2)).await;
let encode_collab = EncodedCollab::new_v1(
generate_random_bytes(10 * 1024),
generate_random_bytes(2 * 1024 * 1024),
);
let params = CollabParams {
object_id: format!("object_id_{}", i),
collab_type: CollabType::Unknown,
encoded_collab_v1: encode_collab.encode_to_bytes().unwrap(),
};
storage_queue
.push(&user.workspace_id, &user.uid, &params, WritePriority::Low)
.await
.unwrap();
queries.lock().await.push((params, encode_collab));
}
let origin_encode_collab = EncodedCollab::new_v1(
generate_random_bytes(10 * 1024),
generate_random_bytes(2 * 1024 * 1024),
);
let params = CollabParams {
object_id: uuid::Uuid::new_v4().to_string(),
collab_type: CollabType::Unknown,
encoded_collab_v1: origin_encode_collab.encode_to_bytes().unwrap(),
};
storage_queue
.push(&user.workspace_id, &user.uid, &params, WritePriority::Low)
.await
.unwrap();
// Allow some time for processing
sleep(Duration::from_secs(30)).await;
// Check that all items are processed correctly
for (params, original_encode_collab) in queries.lock().await.iter() {
let query = QueryCollab {
object_id: params.object_id.clone(),
collab_type: params.collab_type.clone(),
};
let encode_collab_from_disk = collab_cache
.get_encode_collab_from_disk(&user.uid, query)
.await
.unwrap();
let query = QueryCollab {
object_id: params.object_id.clone(),
collab_type: params.collab_type.clone(),
};
let encode_collab_from_disk = collab_cache
.get_encode_collab_from_disk(&user.uid, query)
.await
.unwrap();
assert_eq!(
encode_collab_from_disk.doc_state.len(),
origin_encode_collab.doc_state.len(),
"doc_state length mismatch"
);
assert_eq!(
encode_collab_from_disk.doc_state,
origin_encode_collab.doc_state
);
assert_eq!(
encode_collab_from_disk.doc_state,
original_encode_collab.doc_state
);
assert_eq!(
encode_collab_from_disk.state_vector,
original_encode_collab.state_vector
);
}
assert_eq!(
encode_collab_from_disk.state_vector.len(),
origin_encode_collab.state_vector.len(),
"state_vector length mismatch"
);
assert_eq!(
encode_collab_from_disk.state_vector,
origin_encode_collab.state_vector
);
}

View File

@ -134,3 +134,32 @@ async fn put_and_delete_workspace() {
assert!(is_none);
}
}
#[tokio::test]
async fn simulate_30_put_blob_request_test() {
let (c1, _user1) = generate_unique_registered_user_client().await;
let workspace_id = workspace_id_from_client(&c1).await;
let mut handles = vec![];
for _ in 0..30 {
let cloned_client = c1.clone();
let cloned_workspace_id = workspace_id.clone();
let handle = tokio::spawn(async move {
let mime = mime::TEXT_PLAIN_UTF_8;
let file_id = uuid::Uuid::new_v4().to_string();
let url = cloned_client.get_blob_url(&cloned_workspace_id, &file_id);
let data = vec![0; 3 * 1024 * 1024];
cloned_client.put_blob(&url, data, &mime).await.unwrap();
url
});
handles.push(handle);
}
let results = futures::future::join_all(handles).await;
for result in results {
let url = result.unwrap();
let (_, got_data) = c1.get_blob(&url).await.unwrap();
assert_eq!(got_data, vec![0; 3 * 1024 * 1024]);
c1.delete_blob(&url).await.unwrap();
}
}