diff --git a/libs/database-entity/src/dto.rs b/libs/database-entity/src/dto.rs index d53791e8..57142337 100644 --- a/libs/database-entity/src/dto.rs +++ b/libs/database-entity/src/dto.rs @@ -22,10 +22,6 @@ use validator::Validate; /// The default compression level of ZSTD-compressed collabs. pub const ZSTD_COMPRESSION_LEVEL: i32 = 3; -/// The threshold used to determine whether collab data should land -/// in S3 or Postgres. Collabs with size below this value will land into Postgres. -pub const S3_COLLAB_THRESHOLD: usize = 2000; - #[derive(Debug, Clone, Validate, Serialize, Deserialize)] pub struct CreateCollabParams { #[validate(custom = "validate_not_empty_str")] diff --git a/libs/database/src/collab/cache.rs b/libs/database/src/collab/cache.rs index 964e4b53..54d21b8e 100644 --- a/libs/database/src/collab/cache.rs +++ b/libs/database/src/collab/cache.rs @@ -19,6 +19,7 @@ pub struct CollabCache { mem_cache: CollabMemCache, success_attempts: Arc, total_attempts: Arc, + s3_collab_threshold: usize, } impl CollabCache { @@ -26,12 +27,14 @@ impl CollabCache { redis_conn_manager: redis::aio::ConnectionManager, pg_pool: PgPool, s3: AwsS3BucketClientImpl, + s3_collab_threshold: usize, ) -> Self { let mem_cache = CollabMemCache::new(redis_conn_manager.clone()); - let disk_cache = CollabDiskCache::new(pg_pool.clone(), s3); + let disk_cache = CollabDiskCache::new(pg_pool.clone(), s3, s3_collab_threshold); Self { disk_cache, mem_cache, + s3_collab_threshold, success_attempts: Arc::new(AtomicU64::new(0)), total_attempts: Arc::new(AtomicU64::new(0)), } @@ -168,8 +171,15 @@ impl CollabCache { let object_id = params.object_id.clone(); let encode_collab_data = params.encoded_collab_v1.clone(); let s3 = self.disk_cache.s3_client(); - CollabDiskCache::upsert_collab_with_transaction(workspace_id, uid, params, transaction, s3) - .await?; + CollabDiskCache::upsert_collab_with_transaction( + workspace_id, + uid, + params, + transaction, + s3, + self.s3_collab_threshold, + ) + .await?; // when the data is written to the disk cache but fails to be written to the memory cache // we log the error and continue. diff --git a/libs/database/src/collab/disk_cache.rs b/libs/database/src/collab/disk_cache.rs index e628aafd..a2ab082a 100644 --- a/libs/database/src/collab/disk_cache.rs +++ b/libs/database/src/collab/disk_cache.rs @@ -21,19 +21,23 @@ use crate::file::{BucketClient, ResponseBlob}; use crate::index::upsert_collab_embeddings; use app_error::AppError; use database_entity::dto::{ - CollabParams, PendingCollabWrite, QueryCollab, QueryCollabResult, S3_COLLAB_THRESHOLD, - ZSTD_COMPRESSION_LEVEL, + CollabParams, PendingCollabWrite, QueryCollab, QueryCollabResult, ZSTD_COMPRESSION_LEVEL, }; #[derive(Clone)] pub struct CollabDiskCache { pg_pool: PgPool, s3: AwsS3BucketClientImpl, + s3_collab_threshold: usize, } impl CollabDiskCache { - pub fn new(pg_pool: PgPool, s3: AwsS3BucketClientImpl) -> Self { - Self { pg_pool, s3 } + pub fn new(pg_pool: PgPool, s3: AwsS3BucketClientImpl, s3_collab_threshold: usize) -> Self { + Self { + pg_pool, + s3, + s3_collab_threshold, + } } pub async fn is_exist(&self, workspace_id: &str, object_id: &str) -> AppResult { @@ -67,6 +71,7 @@ impl CollabDiskCache { params, &mut transaction, self.s3.clone(), + self.s3_collab_threshold, ) .await?; @@ -91,10 +96,11 @@ impl CollabDiskCache { mut params: CollabParams, transaction: &mut Transaction<'_, sqlx::Postgres>, s3: AwsS3BucketClientImpl, + s3_collab_threshold: usize, ) -> AppResult<()> { let mut delete_from_s3 = Vec::new(); let key = collab_key(workspace_id, ¶ms.object_id); - if params.encoded_collab_v1.len() > S3_COLLAB_THRESHOLD { + if params.encoded_collab_v1.len() > s3_collab_threshold { // put collab into S3 let encoded_collab = std::mem::take(&mut params.encoded_collab_v1); tokio::spawn(Self::insert_blob_with_retries( @@ -222,7 +228,7 @@ impl CollabDiskCache { let mut blobs = HashMap::new(); for param in params_list.iter_mut() { let key = collab_key(workspace_id, ¶m.object_id); - if param.encoded_collab_v1.len() > S3_COLLAB_THRESHOLD { + if param.encoded_collab_v1.len() > self.s3_collab_threshold { let blob = std::mem::take(&mut param.encoded_collab_v1); blobs.insert(key, blob); } else { @@ -277,6 +283,7 @@ impl CollabDiskCache { params, &mut transaction, s3.clone(), + self.s3_collab_threshold, ) .await { diff --git a/services/appflowy-collaborate/src/application.rs b/services/appflowy-collaborate/src/application.rs index 7889bd48..fa0d8c08 100644 --- a/services/appflowy-collaborate/src/application.rs +++ b/services/appflowy-collaborate/src/application.rs @@ -130,6 +130,7 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result String { @@ -191,6 +192,7 @@ pub fn get_configuration() -> Result { .parse()?, edit_state_max_count: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_COUNT", "100").parse()?, edit_state_max_secs: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_SECS", "60").parse()?, + s3_collab_threshold: get_env_var("APPFLOWY_COLLAB_S3_THRESHOLD", "8000").parse()?, }, redis_uri: get_env_var("APPFLOWY_REDIS_URI", "redis://localhost:6379").into(), ai: AISettings { diff --git a/src/application.rs b/src/application.rs index f5656b63..663c02b2 100644 --- a/src/application.rs +++ b/src/application.rs @@ -285,6 +285,7 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result Result { .parse()?, edit_state_max_count: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_COUNT", "100").parse()?, edit_state_max_secs: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_SECS", "60").parse()?, + s3_collab_threshold: get_env_var("APPFLOWY_COLLAB_S3_THRESHOLD", "60").parse()?, }, published_collab: PublishedCollabSetting { storage_backend: get_env_var("APPFLOWY_PUBLISHED_COLLAB_STORAGE_BACKEND", "postgres")