diff --git a/services/appflowy-collaborate/src/application.rs b/services/appflowy-collaborate/src/application.rs index a2880656..9e238944 100644 --- a/services/appflowy-collaborate/src/application.rs +++ b/services/appflowy-collaborate/src/application.rs @@ -78,6 +78,7 @@ pub async fn run_actix_server( state.redis_connection_manager.clone(), Duration::from_secs(config.collab.group_persistence_interval_secs), Duration::from_secs(config.collab.group_prune_grace_period_secs), + config.collab.save_snapshot_retries, state.indexer_provider.clone(), ) .await diff --git a/services/appflowy-collaborate/src/config.rs b/services/appflowy-collaborate/src/config.rs index 7db8fca5..25d92881 100644 --- a/services/appflowy-collaborate/src/config.rs +++ b/services/appflowy-collaborate/src/config.rs @@ -117,6 +117,7 @@ pub struct CollabSetting { pub group_prune_grace_period_secs: u64, pub edit_state_max_count: u32, pub edit_state_max_secs: i64, + pub save_snapshot_retries: u32, } pub fn get_env_var(key: &str, default: &str) -> String { @@ -169,6 +170,7 @@ pub fn get_configuration() -> Result { .parse()?, edit_state_max_count: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_COUNT", "100").parse()?, edit_state_max_secs: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_SECS", "60").parse()?, + save_snapshot_retries: get_env_var("APPFLOWY_COLLAB_SAVE_SNAPSHOT_RETRIES", "10").parse()?, }, redis_uri: get_env_var("APPFLOWY_REDIS_URI", "redis://localhost:6379").into(), ai: AISettings { diff --git a/services/appflowy-collaborate/src/group/group_init.rs b/services/appflowy-collaborate/src/group/group_init.rs index 43177710..f10cad51 100644 --- a/services/appflowy-collaborate/src/group/group_init.rs +++ b/services/appflowy-collaborate/src/group/group_init.rs @@ -82,6 +82,7 @@ impl CollabGroup { collab_redis_stream: Arc, persistence_interval: Duration, prune_grace_period: Duration, + save_snapshot_retries: u32, indexer: Option>, ) -> Result where @@ -147,6 +148,7 @@ impl CollabGroup { state.clone(), persistence_interval, is_new_collab, + save_snapshot_retries, )); } @@ -164,6 +166,10 @@ impl CollabGroup { &self.state.object_id } + pub fn is_cancelled(&self) -> bool { + self.state.shutdown.is_cancelled() + } + /// Task used to receive collab updates from Redis. async fn inbound_task(state: Arc) -> Result<(), RealtimeError> { let updates = state.persister.collab_redis_stream.live_collab_updates( @@ -293,7 +299,12 @@ impl CollabGroup { } } - async fn snapshot_task(state: Arc, interval: Duration, is_new_collab: bool) { + async fn snapshot_task( + state: Arc, + interval: Duration, + is_new_collab: bool, + retries: u32, + ) { if is_new_collab { tracing::trace!("persisting new collab for {}", state.object_id); if let Err(err) = state.persister.save().await { @@ -309,16 +320,25 @@ impl CollabGroup { // if saving took longer than snapshot_tick, just skip it over and try in the next round snapshot_tick.set_missed_tick_behavior(MissedTickBehavior::Skip); + let mut fail_counter = 0; loop { tokio::select! { _ = snapshot_tick.tick() => { if let Err(err) = state.persister.save().await { - tracing::warn!("failed to persist document `{}/{}`: {}", state.workspace_id, state.object_id, err); + tracing::warn!("failed to persist collab `{}/{}`: {}", state.workspace_id, state.object_id, err); + fail_counter += 1; + if fail_counter >= retries { + tracing::info!("failed to persist `{}/{}` after {} consecutive tries. Shutting collab group down.", state.workspace_id, state.object_id, fail_counter); + state.shutdown.cancel(); + break; + } + } else { + fail_counter = 0; } }, _ = state.shutdown.cancelled() => { if let Err(err) = state.persister.save().await { - tracing::warn!("failed to persist document on shutdown `{}/{}`: {}", state.workspace_id, state.object_id, err); + tracing::warn!("failed to persist collab on shutdown `{}/{}`: {}", state.workspace_id, state.object_id, err); } break; } diff --git a/services/appflowy-collaborate/src/group/manager.rs b/services/appflowy-collaborate/src/group/manager.rs index 185d1e8b..d9d6ee16 100644 --- a/services/appflowy-collaborate/src/group/manager.rs +++ b/services/appflowy-collaborate/src/group/manager.rs @@ -35,6 +35,7 @@ pub struct GroupManager { control_event_stream: Arc>, persistence_interval: Duration, prune_grace_period: Duration, + save_snapshot_retries: u32, indexer_provider: Arc, } @@ -50,6 +51,7 @@ where collab_stream: CollabRedisStream, persistence_interval: Duration, prune_grace_period: Duration, + save_snapshot_retries: u32, indexer_provider: Arc, ) -> Result { let collab_stream = Arc::new(collab_stream); @@ -67,6 +69,7 @@ where control_event_stream, persistence_interval, prune_grace_period, + save_snapshot_retries, indexer_provider, }) } @@ -230,6 +233,7 @@ where self.collab_redis_stream.clone(), self.persistence_interval, self.prune_grace_period, + self.save_snapshot_retries, indexer, ) .await?, diff --git a/services/appflowy-collaborate/src/group/state.rs b/services/appflowy-collaborate/src/group/state.rs index e95dd634..0824f008 100644 --- a/services/appflowy-collaborate/src/group/state.rs +++ b/services/appflowy-collaborate/src/group/state.rs @@ -121,7 +121,12 @@ impl GroupManagementState { } pub(crate) async fn contains_group(&self, object_id: &str) -> bool { - self.group_by_object_id.contains_key(object_id) + if let Some(group) = self.group_by_object_id.get(object_id) { + let cancelled = group.is_cancelled(); + !cancelled + } else { + false + } } pub(crate) async fn remove_group(&self, object_id: &str) { diff --git a/services/appflowy-collaborate/src/rt_server.rs b/services/appflowy-collaborate/src/rt_server.rs index 0f4b1136..dac11138 100644 --- a/services/appflowy-collaborate/src/rt_server.rs +++ b/services/appflowy-collaborate/src/rt_server.rs @@ -55,6 +55,7 @@ where redis_connection_manager: RedisConnectionManager, group_persistence_interval: Duration, prune_grace_period: Duration, + save_snapshot_retries: u32, indexer_provider: Arc, ) -> Result { let enable_custom_runtime = get_env_var("APPFLOWY_COLLABORATE_MULTI_THREAD", "false") @@ -78,6 +79,7 @@ where collab_stream, group_persistence_interval, prune_grace_period, + save_snapshot_retries, indexer_provider.clone(), ) .await?, diff --git a/src/application.rs b/src/application.rs index 58b5f283..0a3681ea 100644 --- a/src/application.rs +++ b/src/application.rs @@ -137,6 +137,7 @@ pub async fn run_actix_server( state.redis_connection_manager.clone(), Duration::from_secs(config.collab.group_persistence_interval_secs), Duration::from_secs(config.collab.group_prune_grace_period_secs), + config.collab.save_snapshot_retries, state.indexer_provider.clone(), ) .await diff --git a/src/config/config.rs b/src/config/config.rs index 3fe8d330..c32b564a 100644 --- a/src/config/config.rs +++ b/src/config/config.rs @@ -146,6 +146,7 @@ pub struct CollabSetting { pub group_prune_grace_period_secs: u64, pub edit_state_max_count: u32, pub edit_state_max_secs: i64, + pub save_snapshot_retries: u32, } #[derive(Clone, Debug)] @@ -254,6 +255,7 @@ pub fn get_configuration() -> Result { .parse()?, edit_state_max_count: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_COUNT", "100").parse()?, edit_state_max_secs: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_SECS", "60").parse()?, + save_snapshot_retries: get_env_var("APPFLOWY_COLLAB_SAVE_SNAPSHOT_RETRIES", "10").parse()?, }, published_collab: PublishedCollabSetting { storage_backend: get_env_var("APPFLOWY_PUBLISHED_COLLAB_STORAGE_BACKEND", "postgres") diff --git a/tests/collab/stress_test.rs b/tests/collab/stress_test.rs index 5e33b01d..9133b98f 100644 --- a/tests/collab/stress_test.rs +++ b/tests/collab/stress_test.rs @@ -46,7 +46,7 @@ async fn run_multiple_text_edits() { // run test scenario let collab = writer.collabs.get(&object_id).unwrap().collab.clone(); - let expected = test_scenario.execute(collab, 50_000).await; + let expected = test_scenario.execute(collab, 20_000).await; // wait for the writer to complete sync writer.wait_object_sync_complete(&object_id).await.unwrap();