From ce793fa03e2a6d248ba566d7da898a0afc8e587c Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Mon, 28 Oct 2024 12:55:43 +0100 Subject: [PATCH] chore: make collab group prune grace period configurable --- services/appflowy-collaborate/src/application.rs | 1 + services/appflowy-collaborate/src/config.rs | 3 +++ .../appflowy-collaborate/src/group/group_init.rs | 13 ++++++++----- services/appflowy-collaborate/src/group/manager.rs | 4 ++++ services/appflowy-collaborate/src/rt_server.rs | 2 ++ src/application.rs | 1 + src/config/config.rs | 3 +++ 7 files changed, 22 insertions(+), 5 deletions(-) diff --git a/services/appflowy-collaborate/src/application.rs b/services/appflowy-collaborate/src/application.rs index 2432f3d2..55781aad 100644 --- a/services/appflowy-collaborate/src/application.rs +++ b/services/appflowy-collaborate/src/application.rs @@ -76,6 +76,7 @@ pub async fn run_actix_server( rt_cmd_recv, state.redis_connection_manager.clone(), Duration::from_secs(config.collab.group_persistence_interval_secs), + Duration::from_secs(config.collab.group_prune_grace_period_secs), state.indexer_provider.clone(), ) .await diff --git a/services/appflowy-collaborate/src/config.rs b/services/appflowy-collaborate/src/config.rs index da0eb219..7db8fca5 100644 --- a/services/appflowy-collaborate/src/config.rs +++ b/services/appflowy-collaborate/src/config.rs @@ -114,6 +114,7 @@ pub struct GoTrueSetting { #[derive(Clone, Debug)] pub struct CollabSetting { pub group_persistence_interval_secs: u64, + pub group_prune_grace_period_secs: u64, pub edit_state_max_count: u32, pub edit_state_max_secs: i64, } @@ -164,6 +165,8 @@ pub fn get_configuration() -> Result { "60", ) .parse()?, + group_prune_grace_period_secs: get_env_var("APPFLOWY_COLLAB_GROUP_GRACE_PERIOD_SECS", "60") + .parse()?, edit_state_max_count: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_COUNT", "100").parse()?, edit_state_max_secs: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_SECS", "60").parse()?, }, diff --git a/services/appflowy-collaborate/src/group/group_init.rs b/services/appflowy-collaborate/src/group/group_init.rs index c05cc63b..db6387d6 100644 --- a/services/appflowy-collaborate/src/group/group_init.rs +++ b/services/appflowy-collaborate/src/group/group_init.rs @@ -81,6 +81,7 @@ impl CollabGroup { is_new_collab: bool, collab_redis_stream: Arc, persistence_interval: Duration, + prune_grace_period: Duration, indexer: Option>, ) -> Result where @@ -94,6 +95,7 @@ impl CollabGroup { storage, collab_redis_stream, indexer, + prune_grace_period, ); let state = Arc::new(CollabGroupState { @@ -846,13 +848,12 @@ struct CollabPersister { indexer: Option>, update_sink: CollabUpdateSink, awareness_sink: AwarenessUpdateSink, + /// A grace period for prunning Redis collab updates. Instead of deleting all messages we + /// read right away, we give 1min for other potential client to catch up. + prune_grace_period: Duration, } impl CollabPersister { - /// A grace period for prunning Redis collab updates. Instead of deleting all messages we - /// read right away, we give 1min for other potential client to catch up. - pub const GRACE_PERIOD_MS: u64 = 1000 * 60; // 5min - pub fn new( uid: i64, workspace_id: String, @@ -861,6 +862,7 @@ impl CollabPersister { storage: Arc, collab_redis_stream: Arc, indexer: Option>, + prune_grace_period: Duration, ) -> Self { let update_sink = collab_redis_stream.collab_update_sink(&workspace_id, &object_id); let awareness_sink = collab_redis_stream.awareness_update_sink(&workspace_id, &object_id); @@ -874,6 +876,7 @@ impl CollabPersister { indexer, update_sink, awareness_sink, + prune_grace_period, } } @@ -1067,7 +1070,7 @@ impl CollabPersister { // 3. finally we can drop Redis messages let msg_id = MessageId { - timestamp_ms: message_id.timestamp_ms - Self::GRACE_PERIOD_MS, + timestamp_ms: message_id.timestamp_ms - self.prune_grace_period.as_millis() as u64, sequence_number: 0, }; let stream_key = CollabStreamUpdate::stream_key(&self.workspace_id, &self.object_id); diff --git a/services/appflowy-collaborate/src/group/manager.rs b/services/appflowy-collaborate/src/group/manager.rs index fc8b7143..185d1e8b 100644 --- a/services/appflowy-collaborate/src/group/manager.rs +++ b/services/appflowy-collaborate/src/group/manager.rs @@ -34,6 +34,7 @@ pub struct GroupManager { collab_redis_stream: Arc, control_event_stream: Arc>, persistence_interval: Duration, + prune_grace_period: Duration, indexer_provider: Arc, } @@ -48,6 +49,7 @@ where metrics_calculate: Arc, collab_stream: CollabRedisStream, persistence_interval: Duration, + prune_grace_period: Duration, indexer_provider: Arc, ) -> Result { let collab_stream = Arc::new(collab_stream); @@ -64,6 +66,7 @@ where collab_redis_stream: collab_stream, control_event_stream, persistence_interval, + prune_grace_period, indexer_provider, }) } @@ -226,6 +229,7 @@ where is_new_collab, self.collab_redis_stream.clone(), self.persistence_interval, + self.prune_grace_period, indexer, ) .await?, diff --git a/services/appflowy-collaborate/src/rt_server.rs b/services/appflowy-collaborate/src/rt_server.rs index fc34c95c..5323198d 100644 --- a/services/appflowy-collaborate/src/rt_server.rs +++ b/services/appflowy-collaborate/src/rt_server.rs @@ -52,6 +52,7 @@ where command_recv: CLCommandReceiver, redis_connection_manager: RedisConnectionManager, group_persistence_interval: Duration, + prune_grace_period: Duration, indexer_provider: Arc, ) -> Result { let enable_custom_runtime = get_env_var("APPFLOWY_COLLABORATE_MULTI_THREAD", "false") @@ -73,6 +74,7 @@ where metrics.clone(), collab_stream, group_persistence_interval, + prune_grace_period, indexer_provider.clone(), ) .await?, diff --git a/src/application.rs b/src/application.rs index c16617af..87939cdf 100644 --- a/src/application.rs +++ b/src/application.rs @@ -134,6 +134,7 @@ pub async fn run_actix_server( rt_cmd_recv, state.redis_connection_manager.clone(), Duration::from_secs(config.collab.group_persistence_interval_secs), + Duration::from_secs(config.collab.group_prune_grace_period_secs), state.indexer_provider.clone(), ) .await diff --git a/src/config/config.rs b/src/config/config.rs index 79c5d281..3fe8d330 100644 --- a/src/config/config.rs +++ b/src/config/config.rs @@ -143,6 +143,7 @@ pub struct GrpcHistorySetting { #[derive(Clone, Debug)] pub struct CollabSetting { pub group_persistence_interval_secs: u64, + pub group_prune_grace_period_secs: u64, pub edit_state_max_count: u32, pub edit_state_max_secs: i64, } @@ -249,6 +250,8 @@ pub fn get_configuration() -> Result { "60", ) .parse()?, + group_prune_grace_period_secs: get_env_var("APPFLOWY_COLLAB_GROUP_GRACE_PERIOD_SECS", "60") + .parse()?, edit_state_max_count: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_COUNT", "100").parse()?, edit_state_max_secs: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_SECS", "60").parse()?, },