From c7d474c9cf3d4ee6804f218665fe6c6ca8b03b91 Mon Sep 17 00:00:00 2001 From: Khor Shu Heng <32997938+khorshuheng@users.noreply.github.com> Date: Sun, 23 Jun 2024 23:19:54 +0800 Subject: [PATCH] chore: allow collab persistence and edit state behaviour to be configurable (#637) --- .../src/group/group_init.rs | 11 ++++++++++- .../appflowy-collaborate/src/group/manager.rs | 13 +++++++++++++ .../src/group/persistence.rs | 6 +++++- services/appflowy-collaborate/src/rt_server.rs | 7 +++++++ src/application.rs | 3 +++ src/config/config.rs | 17 +++++++++++++++++ 6 files changed, 55 insertions(+), 2 deletions(-) diff --git a/services/appflowy-collaborate/src/group/group_init.rs b/services/appflowy-collaborate/src/group/group_init.rs index 7ac8220b..9cd42061 100644 --- a/services/appflowy-collaborate/src/group/group_init.rs +++ b/services/appflowy-collaborate/src/group/group_init.rs @@ -24,6 +24,7 @@ use collab_stream::error::StreamError; use collab_stream::model::{CollabUpdateEvent, StreamBinary}; use collab_stream::stream_group::StreamGroup; use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, Ordering}; +use std::time::Duration; use tokio::sync::mpsc; use tracing::{debug, error, event, trace}; use yrs::updates::decoder::Decode; @@ -64,11 +65,18 @@ impl CollabGroup { storage: Arc, is_new_collab: bool, collab_redis_stream: Arc, + persistence_interval: Duration, + edit_state_max_count: u32, + edit_state_max_secs: i64, ) -> Result where S: CollabStorage, { - let edit_state = Arc::new(EditState::new(100, 360, is_new_collab)); + let edit_state = Arc::new(EditState::new( + edit_state_max_count, + edit_state_max_secs, + is_new_collab, + )); let broadcast = CollabBroadcast::new( &object_id, 10, @@ -87,6 +95,7 @@ impl CollabGroup { edit_state.clone(), collab.downgrade(), collab_type.clone(), + persistence_interval, ) .run(rx), ); diff --git a/services/appflowy-collaborate/src/group/manager.rs b/services/appflowy-collaborate/src/group/manager.rs index 2caa1d17..8c572617 100644 --- a/services/appflowy-collaborate/src/group/manager.rs +++ b/services/appflowy-collaborate/src/group/manager.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use std::time::Duration; use collab::core::collab::{DataSource, MutexCollab}; use collab::core::origin::CollabOrigin; @@ -32,6 +33,9 @@ pub struct GroupManager { metrics_calculate: CollabMetricsCalculate, collab_redis_stream: Arc, control_event_stream: Arc>, + persistence_interval: Duration, + edit_state_max_count: u32, + edit_state_max_secs: i64, } impl GroupManager @@ -44,6 +48,9 @@ where access_control: Arc, metrics_calculate: CollabMetricsCalculate, collab_stream: CollabRedisStream, + persistence_interval: Duration, + edit_state_max_count: u32, + edit_state_max_secs: i64, ) -> Result { let collab_stream = Arc::new(collab_stream); let control_event_stream = collab_stream @@ -58,6 +65,9 @@ where metrics_calculate, collab_redis_stream: collab_stream, control_event_stream, + persistence_interval, + edit_state_max_count, + edit_state_max_secs, }) } @@ -239,6 +249,9 @@ where self.storage.clone(), is_new_collab, self.collab_redis_stream.clone(), + self.persistence_interval, + self.edit_state_max_count, + self.edit_state_max_secs, ) .await?, ); diff --git a/services/appflowy-collaborate/src/group/persistence.rs b/services/appflowy-collaborate/src/group/persistence.rs index b5a49990..aef15e1a 100644 --- a/services/appflowy-collaborate/src/group/persistence.rs +++ b/services/appflowy-collaborate/src/group/persistence.rs @@ -23,12 +23,14 @@ pub(crate) struct GroupPersistence { edit_state: Arc, mutex_collab: WeakMutexCollab, collab_type: CollabType, + persistence_interval: Duration, } impl GroupPersistence where S: CollabStorage, { + #[allow(clippy::too_many_arguments)] pub fn new( workspace_id: String, object_id: String, @@ -37,6 +39,7 @@ where edit_state: Arc, mutex_collab: WeakMutexCollab, collab_type: CollabType, + persistence_interval: Duration, ) -> Self { Self { workspace_id, @@ -46,11 +49,12 @@ where edit_state, mutex_collab, collab_type, + persistence_interval, } } pub async fn run(self, mut destroy_group_rx: mpsc::Receiver) { - let mut interval = interval(Duration::from_secs(60)); + let mut interval = interval(self.persistence_interval); // TODO(nathan): remove this sleep when creating a new collab, applying all the updates // workarounds for the issue that the collab doesn't contain the required data when first created sleep(Duration::from_secs(5)).await; diff --git a/services/appflowy-collaborate/src/rt_server.rs b/services/appflowy-collaborate/src/rt_server.rs index 2d6e5aa2..3193c5c7 100644 --- a/services/appflowy-collaborate/src/rt_server.rs +++ b/services/appflowy-collaborate/src/rt_server.rs @@ -43,12 +43,16 @@ where S: CollabStorage, AC: RealtimeAccessControl, { + #[allow(clippy::too_many_arguments)] pub async fn new( storage: Arc, access_control: AC, metrics: Arc, command_recv: CLCommandReceiver, redis_connection_manager: RedisConnectionManager, + group_persistence_interval: Duration, + edit_state_max_count: u32, + edit_state_max_secs: i64, ) -> Result { if cfg!(feature = "collab-rt-multi-thread") { info!("CollaborationServer with multi-thread feature enabled"); @@ -64,6 +68,9 @@ where access_control.clone(), metrics_calculate.clone(), collab_stream, + group_persistence_interval, + edit_state_max_count, + edit_state_max_secs, ) .await?, ); diff --git a/src/application.rs b/src/application.rs index 63aa8971..cc7df5e6 100644 --- a/src/application.rs +++ b/src/application.rs @@ -130,6 +130,9 @@ pub async fn run_actix_server( state.metrics.realtime_metrics.clone(), rt_cmd_recv, state.redis_connection_manager.clone(), + Duration::from_secs(config.collab.group_persistence_interval_secs), + config.collab.edit_state_max_count, + config.collab.edit_state_max_secs, ) .await .unwrap(); diff --git a/src/config/config.rs b/src/config/config.rs index ce61ea6f..5ca92c12 100644 --- a/src/config/config.rs +++ b/src/config/config.rs @@ -17,6 +17,7 @@ pub struct Config { pub s3: S3Setting, pub appflowy_ai: AppFlowyAISetting, pub grpc_history: GrpcHistorySetting, + pub collab: CollabSetting, pub mailer: MailerSetting, } @@ -122,6 +123,13 @@ pub struct GrpcHistorySetting { pub addrs: String, } +#[derive(Clone, Debug)] +pub struct CollabSetting { + pub group_persistence_interval_secs: u64, + pub edit_state_max_count: u32, + pub edit_state_max_secs: i64, +} + // Default values favor local development. pub fn get_configuration() -> Result { let config = Config { @@ -177,6 +185,15 @@ pub fn get_configuration() -> Result { grpc_history: GrpcHistorySetting { addrs: get_env_var("APPFLOWY_GRPC_HISTORY_ADDRS", "http://localhost:50051"), }, + collab: CollabSetting { + group_persistence_interval_secs: get_env_var( + "APPFLOWY_COLLAB_GROUP_PERSISTENCE_INTERVAL", + "60", + ) + .parse()?, + edit_state_max_count: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_COUNT", "100").parse()?, + edit_state_max_secs: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_SECS", "360").parse()?, + }, mailer: MailerSetting { smtp_host: get_env_var("APPFLOWY_MAILER_SMTP_HOST", "smtp.gmail.com"), smtp_port: get_env_var("APPFLOWY_MAILER_SMTP_PORT", "465").parse()?,