chore: make collab group prune grace period configurable
This commit is contained in:
parent
50b49a72a8
commit
ae6d8b1313
|
|
@ -77,6 +77,7 @@ pub async fn run_actix_server(
|
|||
rt_cmd_recv,
|
||||
state.redis_connection_manager.clone(),
|
||||
Duration::from_secs(config.collab.group_persistence_interval_secs),
|
||||
Duration::from_secs(config.collab.group_prune_grace_period_secs),
|
||||
state.indexer_provider.clone(),
|
||||
)
|
||||
.await
|
||||
|
|
|
|||
|
|
@ -114,6 +114,7 @@ pub struct GoTrueSetting {
|
|||
#[derive(Clone, Debug)]
|
||||
pub struct CollabSetting {
|
||||
pub group_persistence_interval_secs: u64,
|
||||
pub group_prune_grace_period_secs: u64,
|
||||
pub edit_state_max_count: u32,
|
||||
pub edit_state_max_secs: i64,
|
||||
}
|
||||
|
|
@ -164,6 +165,8 @@ pub fn get_configuration() -> Result<Config, anyhow::Error> {
|
|||
"60",
|
||||
)
|
||||
.parse()?,
|
||||
group_prune_grace_period_secs: get_env_var("APPFLOWY_COLLAB_GROUP_GRACE_PERIOD_SECS", "60")
|
||||
.parse()?,
|
||||
edit_state_max_count: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_COUNT", "100").parse()?,
|
||||
edit_state_max_secs: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_SECS", "60").parse()?,
|
||||
},
|
||||
|
|
|
|||
|
|
@ -81,6 +81,7 @@ impl CollabGroup {
|
|||
is_new_collab: bool,
|
||||
collab_redis_stream: Arc<CollabRedisStream>,
|
||||
persistence_interval: Duration,
|
||||
prune_grace_period: Duration,
|
||||
indexer: Option<Arc<dyn Indexer>>,
|
||||
) -> Result<Self, StreamError>
|
||||
where
|
||||
|
|
@ -94,6 +95,7 @@ impl CollabGroup {
|
|||
storage,
|
||||
collab_redis_stream,
|
||||
indexer,
|
||||
prune_grace_period,
|
||||
);
|
||||
|
||||
let state = Arc::new(CollabGroupState {
|
||||
|
|
@ -846,13 +848,12 @@ struct CollabPersister {
|
|||
indexer: Option<Arc<dyn Indexer>>,
|
||||
update_sink: CollabUpdateSink,
|
||||
awareness_sink: AwarenessUpdateSink,
|
||||
/// A grace period for prunning Redis collab updates. Instead of deleting all messages we
|
||||
/// read right away, we give 1min for other potential client to catch up.
|
||||
prune_grace_period: Duration,
|
||||
}
|
||||
|
||||
impl CollabPersister {
|
||||
/// A grace period for prunning Redis collab updates. Instead of deleting all messages we
|
||||
/// read right away, we give 1min for other potential client to catch up.
|
||||
pub const GRACE_PERIOD_MS: u64 = 1000 * 60; // 5min
|
||||
|
||||
pub fn new(
|
||||
uid: i64,
|
||||
workspace_id: String,
|
||||
|
|
@ -861,6 +862,7 @@ impl CollabPersister {
|
|||
storage: Arc<dyn CollabStorage>,
|
||||
collab_redis_stream: Arc<CollabRedisStream>,
|
||||
indexer: Option<Arc<dyn Indexer>>,
|
||||
prune_grace_period: Duration,
|
||||
) -> Self {
|
||||
let update_sink = collab_redis_stream.collab_update_sink(&workspace_id, &object_id);
|
||||
let awareness_sink = collab_redis_stream.awareness_update_sink(&workspace_id, &object_id);
|
||||
|
|
@ -874,6 +876,7 @@ impl CollabPersister {
|
|||
indexer,
|
||||
update_sink,
|
||||
awareness_sink,
|
||||
prune_grace_period,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1067,7 +1070,7 @@ impl CollabPersister {
|
|||
|
||||
// 3. finally we can drop Redis messages
|
||||
let msg_id = MessageId {
|
||||
timestamp_ms: message_id.timestamp_ms - Self::GRACE_PERIOD_MS,
|
||||
timestamp_ms: message_id.timestamp_ms - self.prune_grace_period.as_millis() as u64,
|
||||
sequence_number: 0,
|
||||
};
|
||||
let stream_key = CollabStreamUpdate::stream_key(&self.workspace_id, &self.object_id);
|
||||
|
|
|
|||
|
|
@ -34,6 +34,7 @@ pub struct GroupManager<S> {
|
|||
collab_redis_stream: Arc<CollabRedisStream>,
|
||||
control_event_stream: Arc<Mutex<StreamGroup>>,
|
||||
persistence_interval: Duration,
|
||||
prune_grace_period: Duration,
|
||||
indexer_provider: Arc<IndexerProvider>,
|
||||
}
|
||||
|
||||
|
|
@ -48,6 +49,7 @@ where
|
|||
metrics_calculate: Arc<CollabRealtimeMetrics>,
|
||||
collab_stream: CollabRedisStream,
|
||||
persistence_interval: Duration,
|
||||
prune_grace_period: Duration,
|
||||
indexer_provider: Arc<IndexerProvider>,
|
||||
) -> Result<Self, RealtimeError> {
|
||||
let collab_stream = Arc::new(collab_stream);
|
||||
|
|
@ -64,6 +66,7 @@ where
|
|||
collab_redis_stream: collab_stream,
|
||||
control_event_stream,
|
||||
persistence_interval,
|
||||
prune_grace_period,
|
||||
indexer_provider,
|
||||
})
|
||||
}
|
||||
|
|
@ -226,6 +229,7 @@ where
|
|||
is_new_collab,
|
||||
self.collab_redis_stream.clone(),
|
||||
self.persistence_interval,
|
||||
self.prune_grace_period,
|
||||
indexer,
|
||||
)
|
||||
.await?,
|
||||
|
|
|
|||
|
|
@ -53,6 +53,7 @@ where
|
|||
command_recv: CLCommandReceiver,
|
||||
redis_connection_manager: RedisConnectionManager,
|
||||
group_persistence_interval: Duration,
|
||||
prune_grace_period: Duration,
|
||||
indexer_provider: Arc<IndexerProvider>,
|
||||
) -> Result<Self, RealtimeError> {
|
||||
let enable_custom_runtime = get_env_var("APPFLOWY_COLLABORATE_MULTI_THREAD", "false")
|
||||
|
|
@ -74,6 +75,7 @@ where
|
|||
metrics.clone(),
|
||||
collab_stream,
|
||||
group_persistence_interval,
|
||||
prune_grace_period,
|
||||
indexer_provider.clone(),
|
||||
)
|
||||
.await?,
|
||||
|
|
|
|||
|
|
@ -135,6 +135,7 @@ pub async fn run_actix_server(
|
|||
rt_cmd_recv,
|
||||
state.redis_connection_manager.clone(),
|
||||
Duration::from_secs(config.collab.group_persistence_interval_secs),
|
||||
Duration::from_secs(config.collab.group_prune_grace_period_secs),
|
||||
state.indexer_provider.clone(),
|
||||
)
|
||||
.await
|
||||
|
|
|
|||
|
|
@ -143,6 +143,7 @@ pub struct GrpcHistorySetting {
|
|||
#[derive(Clone, Debug)]
|
||||
pub struct CollabSetting {
|
||||
pub group_persistence_interval_secs: u64,
|
||||
pub group_prune_grace_period_secs: u64,
|
||||
pub edit_state_max_count: u32,
|
||||
pub edit_state_max_secs: i64,
|
||||
}
|
||||
|
|
@ -249,6 +250,8 @@ pub fn get_configuration() -> Result<Config, anyhow::Error> {
|
|||
"60",
|
||||
)
|
||||
.parse()?,
|
||||
group_prune_grace_period_secs: get_env_var("APPFLOWY_COLLAB_GROUP_GRACE_PERIOD_SECS", "60")
|
||||
.parse()?,
|
||||
edit_state_max_count: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_COUNT", "100").parse()?,
|
||||
edit_state_max_secs: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_SECS", "60").parse()?,
|
||||
},
|
||||
|
|
|
|||
Loading…
Reference in New Issue