chore: add retry limit to which collab snapshot can be made

This commit is contained in:
Bartosz Sypytkowski 2024-11-07 13:12:17 +01:00
parent bc49d73b40
commit b9ca480143
9 changed files with 42 additions and 5 deletions

View File

@ -78,6 +78,7 @@ pub async fn run_actix_server(
state.redis_connection_manager.clone(),
Duration::from_secs(config.collab.group_persistence_interval_secs),
Duration::from_secs(config.collab.group_prune_grace_period_secs),
config.collab.save_snapshot_retries,
state.indexer_provider.clone(),
)
.await

View File

@ -117,6 +117,7 @@ pub struct CollabSetting {
pub group_prune_grace_period_secs: u64,
pub edit_state_max_count: u32,
pub edit_state_max_secs: i64,
pub save_snapshot_retries: u32,
}
pub fn get_env_var(key: &str, default: &str) -> String {
@ -169,6 +170,7 @@ pub fn get_configuration() -> Result<Config, anyhow::Error> {
.parse()?,
edit_state_max_count: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_COUNT", "100").parse()?,
edit_state_max_secs: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_SECS", "60").parse()?,
save_snapshot_retries: get_env_var("APPFLOWY_COLLAB_SAVE_SNAPSHOT_RETRIES", "10").parse()?,
},
redis_uri: get_env_var("APPFLOWY_REDIS_URI", "redis://localhost:6379").into(),
ai: AISettings {

View File

@ -82,6 +82,7 @@ impl CollabGroup {
collab_redis_stream: Arc<CollabRedisStream>,
persistence_interval: Duration,
prune_grace_period: Duration,
save_snapshot_retries: u32,
indexer: Option<Arc<dyn Indexer>>,
) -> Result<Self, StreamError>
where
@ -147,6 +148,7 @@ impl CollabGroup {
state.clone(),
persistence_interval,
is_new_collab,
save_snapshot_retries,
));
}
@ -164,6 +166,10 @@ impl CollabGroup {
&self.state.object_id
}
pub fn is_cancelled(&self) -> bool {
self.state.shutdown.is_cancelled()
}
/// Task used to receive collab updates from Redis.
async fn inbound_task(state: Arc<CollabGroupState>) -> Result<(), RealtimeError> {
let updates = state.persister.collab_redis_stream.live_collab_updates(
@ -293,7 +299,12 @@ impl CollabGroup {
}
}
async fn snapshot_task(state: Arc<CollabGroupState>, interval: Duration, is_new_collab: bool) {
async fn snapshot_task(
state: Arc<CollabGroupState>,
interval: Duration,
is_new_collab: bool,
retries: u32,
) {
if is_new_collab {
tracing::trace!("persisting new collab for {}", state.object_id);
if let Err(err) = state.persister.save().await {
@ -309,16 +320,25 @@ impl CollabGroup {
// if saving took longer than snapshot_tick, just skip it over and try in the next round
snapshot_tick.set_missed_tick_behavior(MissedTickBehavior::Skip);
let mut fail_counter = 0;
loop {
tokio::select! {
_ = snapshot_tick.tick() => {
if let Err(err) = state.persister.save().await {
tracing::warn!("failed to persist document `{}/{}`: {}", state.workspace_id, state.object_id, err);
tracing::warn!("failed to persist collab `{}/{}`: {}", state.workspace_id, state.object_id, err);
fail_counter += 1;
if fail_counter >= retries {
tracing::info!("failed to persist `{}/{}` after {} consecutive tries. Shutting collab group down.", state.workspace_id, state.object_id, fail_counter);
state.shutdown.cancel();
break;
}
} else {
fail_counter = 0;
}
},
_ = state.shutdown.cancelled() => {
if let Err(err) = state.persister.save().await {
tracing::warn!("failed to persist document on shutdown `{}/{}`: {}", state.workspace_id, state.object_id, err);
tracing::warn!("failed to persist collab on shutdown `{}/{}`: {}", state.workspace_id, state.object_id, err);
}
break;
}

View File

@ -35,6 +35,7 @@ pub struct GroupManager<S> {
control_event_stream: Arc<Mutex<StreamGroup>>,
persistence_interval: Duration,
prune_grace_period: Duration,
save_snapshot_retries: u32,
indexer_provider: Arc<IndexerProvider>,
}
@ -50,6 +51,7 @@ where
collab_stream: CollabRedisStream,
persistence_interval: Duration,
prune_grace_period: Duration,
save_snapshot_retries: u32,
indexer_provider: Arc<IndexerProvider>,
) -> Result<Self, RealtimeError> {
let collab_stream = Arc::new(collab_stream);
@ -67,6 +69,7 @@ where
control_event_stream,
persistence_interval,
prune_grace_period,
save_snapshot_retries,
indexer_provider,
})
}
@ -230,6 +233,7 @@ where
self.collab_redis_stream.clone(),
self.persistence_interval,
self.prune_grace_period,
self.save_snapshot_retries,
indexer,
)
.await?,

View File

@ -121,7 +121,12 @@ impl GroupManagementState {
}
pub(crate) async fn contains_group(&self, object_id: &str) -> bool {
self.group_by_object_id.contains_key(object_id)
if let Some(group) = self.group_by_object_id.get(object_id) {
let cancelled = group.is_cancelled();
!cancelled
} else {
false
}
}
pub(crate) async fn remove_group(&self, object_id: &str) {

View File

@ -55,6 +55,7 @@ where
redis_connection_manager: RedisConnectionManager,
group_persistence_interval: Duration,
prune_grace_period: Duration,
save_snapshot_retries: u32,
indexer_provider: Arc<IndexerProvider>,
) -> Result<Self, RealtimeError> {
let enable_custom_runtime = get_env_var("APPFLOWY_COLLABORATE_MULTI_THREAD", "false")
@ -78,6 +79,7 @@ where
collab_stream,
group_persistence_interval,
prune_grace_period,
save_snapshot_retries,
indexer_provider.clone(),
)
.await?,

View File

@ -137,6 +137,7 @@ pub async fn run_actix_server(
state.redis_connection_manager.clone(),
Duration::from_secs(config.collab.group_persistence_interval_secs),
Duration::from_secs(config.collab.group_prune_grace_period_secs),
config.collab.save_snapshot_retries,
state.indexer_provider.clone(),
)
.await

View File

@ -146,6 +146,7 @@ pub struct CollabSetting {
pub group_prune_grace_period_secs: u64,
pub edit_state_max_count: u32,
pub edit_state_max_secs: i64,
pub save_snapshot_retries: u32,
}
#[derive(Clone, Debug)]
@ -254,6 +255,7 @@ pub fn get_configuration() -> Result<Config, anyhow::Error> {
.parse()?,
edit_state_max_count: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_COUNT", "100").parse()?,
edit_state_max_secs: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_SECS", "60").parse()?,
save_snapshot_retries: get_env_var("APPFLOWY_COLLAB_SAVE_SNAPSHOT_RETRIES", "10").parse()?,
},
published_collab: PublishedCollabSetting {
storage_backend: get_env_var("APPFLOWY_PUBLISHED_COLLAB_STORAGE_BACKEND", "postgres")

View File

@ -46,7 +46,7 @@ async fn run_multiple_text_edits() {
// run test scenario
let collab = writer.collabs.get(&object_id).unwrap().collab.clone();
let expected = test_scenario.execute(collab, 50_000).await;
let expected = test_scenario.execute(collab, 20_000).await;
// wait for the writer to complete sync
writer.wait_object_sync_complete(&object_id).await.unwrap();