chore: make collab group prune grace period configurable

This commit is contained in:
Bartosz Sypytkowski 2024-10-28 12:55:43 +01:00
parent 395424bfd2
commit ce793fa03e
7 changed files with 22 additions and 5 deletions

View File

@ -76,6 +76,7 @@ pub async fn run_actix_server(
rt_cmd_recv, rt_cmd_recv,
state.redis_connection_manager.clone(), state.redis_connection_manager.clone(),
Duration::from_secs(config.collab.group_persistence_interval_secs), Duration::from_secs(config.collab.group_persistence_interval_secs),
Duration::from_secs(config.collab.group_prune_grace_period_secs),
state.indexer_provider.clone(), state.indexer_provider.clone(),
) )
.await .await

View File

@ -114,6 +114,7 @@ pub struct GoTrueSetting {
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct CollabSetting { pub struct CollabSetting {
pub group_persistence_interval_secs: u64, pub group_persistence_interval_secs: u64,
pub group_prune_grace_period_secs: u64,
pub edit_state_max_count: u32, pub edit_state_max_count: u32,
pub edit_state_max_secs: i64, pub edit_state_max_secs: i64,
} }
@ -164,6 +165,8 @@ pub fn get_configuration() -> Result<Config, anyhow::Error> {
"60", "60",
) )
.parse()?, .parse()?,
group_prune_grace_period_secs: get_env_var("APPFLOWY_COLLAB_GROUP_GRACE_PERIOD_SECS", "60")
.parse()?,
edit_state_max_count: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_COUNT", "100").parse()?, edit_state_max_count: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_COUNT", "100").parse()?,
edit_state_max_secs: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_SECS", "60").parse()?, edit_state_max_secs: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_SECS", "60").parse()?,
}, },

View File

@ -81,6 +81,7 @@ impl CollabGroup {
is_new_collab: bool, is_new_collab: bool,
collab_redis_stream: Arc<CollabRedisStream>, collab_redis_stream: Arc<CollabRedisStream>,
persistence_interval: Duration, persistence_interval: Duration,
prune_grace_period: Duration,
indexer: Option<Arc<dyn Indexer>>, indexer: Option<Arc<dyn Indexer>>,
) -> Result<Self, StreamError> ) -> Result<Self, StreamError>
where where
@ -94,6 +95,7 @@ impl CollabGroup {
storage, storage,
collab_redis_stream, collab_redis_stream,
indexer, indexer,
prune_grace_period,
); );
let state = Arc::new(CollabGroupState { let state = Arc::new(CollabGroupState {
@ -846,13 +848,12 @@ struct CollabPersister {
indexer: Option<Arc<dyn Indexer>>, indexer: Option<Arc<dyn Indexer>>,
update_sink: CollabUpdateSink, update_sink: CollabUpdateSink,
awareness_sink: AwarenessUpdateSink, awareness_sink: AwarenessUpdateSink,
/// A grace period for prunning Redis collab updates. Instead of deleting all messages we
/// read right away, we give 1min for other potential client to catch up.
prune_grace_period: Duration,
} }
impl CollabPersister { impl CollabPersister {
/// A grace period for prunning Redis collab updates. Instead of deleting all messages we
/// read right away, we give 1min for other potential client to catch up.
pub const GRACE_PERIOD_MS: u64 = 1000 * 60; // 5min
pub fn new( pub fn new(
uid: i64, uid: i64,
workspace_id: String, workspace_id: String,
@ -861,6 +862,7 @@ impl CollabPersister {
storage: Arc<dyn CollabStorage>, storage: Arc<dyn CollabStorage>,
collab_redis_stream: Arc<CollabRedisStream>, collab_redis_stream: Arc<CollabRedisStream>,
indexer: Option<Arc<dyn Indexer>>, indexer: Option<Arc<dyn Indexer>>,
prune_grace_period: Duration,
) -> Self { ) -> Self {
let update_sink = collab_redis_stream.collab_update_sink(&workspace_id, &object_id); let update_sink = collab_redis_stream.collab_update_sink(&workspace_id, &object_id);
let awareness_sink = collab_redis_stream.awareness_update_sink(&workspace_id, &object_id); let awareness_sink = collab_redis_stream.awareness_update_sink(&workspace_id, &object_id);
@ -874,6 +876,7 @@ impl CollabPersister {
indexer, indexer,
update_sink, update_sink,
awareness_sink, awareness_sink,
prune_grace_period,
} }
} }
@ -1067,7 +1070,7 @@ impl CollabPersister {
// 3. finally we can drop Redis messages // 3. finally we can drop Redis messages
let msg_id = MessageId { let msg_id = MessageId {
timestamp_ms: message_id.timestamp_ms - Self::GRACE_PERIOD_MS, timestamp_ms: message_id.timestamp_ms - self.prune_grace_period.as_millis() as u64,
sequence_number: 0, sequence_number: 0,
}; };
let stream_key = CollabStreamUpdate::stream_key(&self.workspace_id, &self.object_id); let stream_key = CollabStreamUpdate::stream_key(&self.workspace_id, &self.object_id);

View File

@ -34,6 +34,7 @@ pub struct GroupManager<S> {
collab_redis_stream: Arc<CollabRedisStream>, collab_redis_stream: Arc<CollabRedisStream>,
control_event_stream: Arc<Mutex<StreamGroup>>, control_event_stream: Arc<Mutex<StreamGroup>>,
persistence_interval: Duration, persistence_interval: Duration,
prune_grace_period: Duration,
indexer_provider: Arc<IndexerProvider>, indexer_provider: Arc<IndexerProvider>,
} }
@ -48,6 +49,7 @@ where
metrics_calculate: Arc<CollabRealtimeMetrics>, metrics_calculate: Arc<CollabRealtimeMetrics>,
collab_stream: CollabRedisStream, collab_stream: CollabRedisStream,
persistence_interval: Duration, persistence_interval: Duration,
prune_grace_period: Duration,
indexer_provider: Arc<IndexerProvider>, indexer_provider: Arc<IndexerProvider>,
) -> Result<Self, RealtimeError> { ) -> Result<Self, RealtimeError> {
let collab_stream = Arc::new(collab_stream); let collab_stream = Arc::new(collab_stream);
@ -64,6 +66,7 @@ where
collab_redis_stream: collab_stream, collab_redis_stream: collab_stream,
control_event_stream, control_event_stream,
persistence_interval, persistence_interval,
prune_grace_period,
indexer_provider, indexer_provider,
}) })
} }
@ -226,6 +229,7 @@ where
is_new_collab, is_new_collab,
self.collab_redis_stream.clone(), self.collab_redis_stream.clone(),
self.persistence_interval, self.persistence_interval,
self.prune_grace_period,
indexer, indexer,
) )
.await?, .await?,

View File

@ -52,6 +52,7 @@ where
command_recv: CLCommandReceiver, command_recv: CLCommandReceiver,
redis_connection_manager: RedisConnectionManager, redis_connection_manager: RedisConnectionManager,
group_persistence_interval: Duration, group_persistence_interval: Duration,
prune_grace_period: Duration,
indexer_provider: Arc<IndexerProvider>, indexer_provider: Arc<IndexerProvider>,
) -> Result<Self, RealtimeError> { ) -> Result<Self, RealtimeError> {
let enable_custom_runtime = get_env_var("APPFLOWY_COLLABORATE_MULTI_THREAD", "false") let enable_custom_runtime = get_env_var("APPFLOWY_COLLABORATE_MULTI_THREAD", "false")
@ -73,6 +74,7 @@ where
metrics.clone(), metrics.clone(),
collab_stream, collab_stream,
group_persistence_interval, group_persistence_interval,
prune_grace_period,
indexer_provider.clone(), indexer_provider.clone(),
) )
.await?, .await?,

View File

@ -134,6 +134,7 @@ pub async fn run_actix_server(
rt_cmd_recv, rt_cmd_recv,
state.redis_connection_manager.clone(), state.redis_connection_manager.clone(),
Duration::from_secs(config.collab.group_persistence_interval_secs), Duration::from_secs(config.collab.group_persistence_interval_secs),
Duration::from_secs(config.collab.group_prune_grace_period_secs),
state.indexer_provider.clone(), state.indexer_provider.clone(),
) )
.await .await

View File

@ -143,6 +143,7 @@ pub struct GrpcHistorySetting {
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct CollabSetting { pub struct CollabSetting {
pub group_persistence_interval_secs: u64, pub group_persistence_interval_secs: u64,
pub group_prune_grace_period_secs: u64,
pub edit_state_max_count: u32, pub edit_state_max_count: u32,
pub edit_state_max_secs: i64, pub edit_state_max_secs: i64,
} }
@ -249,6 +250,8 @@ pub fn get_configuration() -> Result<Config, anyhow::Error> {
"60", "60",
) )
.parse()?, .parse()?,
group_prune_grace_period_secs: get_env_var("APPFLOWY_COLLAB_GROUP_GRACE_PERIOD_SECS", "60")
.parse()?,
edit_state_max_count: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_COUNT", "100").parse()?, edit_state_max_count: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_COUNT", "100").parse()?,
edit_state_max_secs: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_SECS", "60").parse()?, edit_state_max_secs: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_SECS", "60").parse()?,
}, },