From eb54947ab713498635e997a86719cee1c5affd3b Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Fri, 8 Nov 2024 04:39:44 +0100 Subject: [PATCH] chore: add config option for redis stream group workers count --- libs/collab-stream/src/client.rs | 4 ++-- .../appflowy-collaborate/src/application.rs | 8 ++++---- services/appflowy-collaborate/src/config.rs | 4 ++-- .../src/group/group_init.rs | 18 +----------------- .../appflowy-collaborate/src/group/manager.rs | 4 ---- services/appflowy-collaborate/src/rt_server.rs | 2 -- src/application.rs | 8 ++++---- src/config/config.rs | 4 ++-- 8 files changed, 15 insertions(+), 37 deletions(-) diff --git a/libs/collab-stream/src/client.rs b/libs/collab-stream/src/client.rs index 234e6b80..c989d933 100644 --- a/libs/collab-stream/src/client.rs +++ b/libs/collab-stream/src/client.rs @@ -25,9 +25,9 @@ impl CollabRedisStream { pub async fn new(redis_client: redis::Client) -> Result { let router_options = StreamRouterOptions { - worker_count: 10, + worker_count: 60, xread_streams: 100, - xread_block_millis: Some(100), + xread_block_millis: Some(5000), xread_count: None, }; let stream_router = Arc::new(StreamRouter::with_options(&redis_client, router_options)?); diff --git a/services/appflowy-collaborate/src/application.rs b/services/appflowy-collaborate/src/application.rs index 9e238944..3759e57e 100644 --- a/services/appflowy-collaborate/src/application.rs +++ b/services/appflowy-collaborate/src/application.rs @@ -78,7 +78,6 @@ pub async fn run_actix_server( state.redis_connection_manager.clone(), Duration::from_secs(config.collab.group_persistence_interval_secs), Duration::from_secs(config.collab.group_prune_grace_period_secs), - config.collab.save_snapshot_retries, state.indexer_provider.clone(), ) .await @@ -108,7 +107,7 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result Result Result<(redis::aio::ConnectionManager, Arc), Error> { info!("Connecting to redis with uri: {}", redis_uri); let client = redis::Client::open(redis_uri).context("failed to connect to redis")?; @@ -170,9 +170,9 @@ async fn get_redis_client( let router = StreamRouter::with_options( &client, StreamRouterOptions { - worker_count: 10, + worker_count, xread_streams: 100, - xread_block_millis: Some(100), + xread_block_millis: Some(5000), xread_count: None, }, )?; diff --git a/services/appflowy-collaborate/src/config.rs b/services/appflowy-collaborate/src/config.rs index 25d92881..d334cdf8 100644 --- a/services/appflowy-collaborate/src/config.rs +++ b/services/appflowy-collaborate/src/config.rs @@ -15,6 +15,7 @@ pub struct Config { pub gotrue: GoTrueSetting, pub collab: CollabSetting, pub redis_uri: Secret, + pub redis_worker_count: usize, pub ai: AISettings, } @@ -117,7 +118,6 @@ pub struct CollabSetting { pub group_prune_grace_period_secs: u64, pub edit_state_max_count: u32, pub edit_state_max_secs: i64, - pub save_snapshot_retries: u32, } pub fn get_env_var(key: &str, default: &str) -> String { @@ -170,9 +170,9 @@ pub fn get_configuration() -> Result { .parse()?, edit_state_max_count: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_COUNT", "100").parse()?, edit_state_max_secs: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_SECS", "60").parse()?, - save_snapshot_retries: get_env_var("APPFLOWY_COLLAB_SAVE_SNAPSHOT_RETRIES", "10").parse()?, }, redis_uri: get_env_var("APPFLOWY_REDIS_URI", "redis://localhost:6379").into(), + redis_worker_count: get_env_var("APPFLOWY_REDIS_WORKERS", "60").parse()?, ai: AISettings { port: get_env_var("APPFLOWY_AI_SERVER_PORT", "5001").parse()?, host: get_env_var("APPFLOWY_AI_SERVER_HOST", "localhost"), diff --git a/services/appflowy-collaborate/src/group/group_init.rs b/services/appflowy-collaborate/src/group/group_init.rs index f10cad51..2d90ff0a 100644 --- a/services/appflowy-collaborate/src/group/group_init.rs +++ b/services/appflowy-collaborate/src/group/group_init.rs @@ -82,7 +82,6 @@ impl CollabGroup { collab_redis_stream: Arc, persistence_interval: Duration, prune_grace_period: Duration, - save_snapshot_retries: u32, indexer: Option>, ) -> Result where @@ -148,7 +147,6 @@ impl CollabGroup { state.clone(), persistence_interval, is_new_collab, - save_snapshot_retries, )); } @@ -299,12 +297,7 @@ impl CollabGroup { } } - async fn snapshot_task( - state: Arc, - interval: Duration, - is_new_collab: bool, - retries: u32, - ) { + async fn snapshot_task(state: Arc, interval: Duration, is_new_collab: bool) { if is_new_collab { tracing::trace!("persisting new collab for {}", state.object_id); if let Err(err) = state.persister.save().await { @@ -320,20 +313,11 @@ impl CollabGroup { // if saving took longer than snapshot_tick, just skip it over and try in the next round snapshot_tick.set_missed_tick_behavior(MissedTickBehavior::Skip); - let mut fail_counter = 0; loop { tokio::select! { _ = snapshot_tick.tick() => { if let Err(err) = state.persister.save().await { tracing::warn!("failed to persist collab `{}/{}`: {}", state.workspace_id, state.object_id, err); - fail_counter += 1; - if fail_counter >= retries { - tracing::info!("failed to persist `{}/{}` after {} consecutive tries. Shutting collab group down.", state.workspace_id, state.object_id, fail_counter); - state.shutdown.cancel(); - break; - } - } else { - fail_counter = 0; } }, _ = state.shutdown.cancelled() => { diff --git a/services/appflowy-collaborate/src/group/manager.rs b/services/appflowy-collaborate/src/group/manager.rs index d9d6ee16..185d1e8b 100644 --- a/services/appflowy-collaborate/src/group/manager.rs +++ b/services/appflowy-collaborate/src/group/manager.rs @@ -35,7 +35,6 @@ pub struct GroupManager { control_event_stream: Arc>, persistence_interval: Duration, prune_grace_period: Duration, - save_snapshot_retries: u32, indexer_provider: Arc, } @@ -51,7 +50,6 @@ where collab_stream: CollabRedisStream, persistence_interval: Duration, prune_grace_period: Duration, - save_snapshot_retries: u32, indexer_provider: Arc, ) -> Result { let collab_stream = Arc::new(collab_stream); @@ -69,7 +67,6 @@ where control_event_stream, persistence_interval, prune_grace_period, - save_snapshot_retries, indexer_provider, }) } @@ -233,7 +230,6 @@ where self.collab_redis_stream.clone(), self.persistence_interval, self.prune_grace_period, - self.save_snapshot_retries, indexer, ) .await?, diff --git a/services/appflowy-collaborate/src/rt_server.rs b/services/appflowy-collaborate/src/rt_server.rs index dac11138..0f4b1136 100644 --- a/services/appflowy-collaborate/src/rt_server.rs +++ b/services/appflowy-collaborate/src/rt_server.rs @@ -55,7 +55,6 @@ where redis_connection_manager: RedisConnectionManager, group_persistence_interval: Duration, prune_grace_period: Duration, - save_snapshot_retries: u32, indexer_provider: Arc, ) -> Result { let enable_custom_runtime = get_env_var("APPFLOWY_COLLABORATE_MULTI_THREAD", "false") @@ -79,7 +78,6 @@ where collab_stream, group_persistence_interval, prune_grace_period, - save_snapshot_retries, indexer_provider.clone(), ) .await?, diff --git a/src/application.rs b/src/application.rs index 0a3681ea..7e64a1d3 100644 --- a/src/application.rs +++ b/src/application.rs @@ -137,7 +137,6 @@ pub async fn run_actix_server( state.redis_connection_manager.clone(), Duration::from_secs(config.collab.group_persistence_interval_secs), Duration::from_secs(config.collab.group_prune_grace_period_secs), - config.collab.save_snapshot_retries, state.indexer_provider.clone(), ) .await @@ -248,7 +247,7 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result Result<(redis::aio::ConnectionManager, Arc), Error> { info!("Connecting to redis with uri: {}", redis_uri); let client = redis::Client::open(redis_uri).context("failed to connect to redis")?; @@ -433,9 +433,9 @@ async fn get_redis_client( let router = StreamRouter::with_options( &client, StreamRouterOptions { - worker_count: 10, + worker_count, xread_streams: 100, - xread_block_millis: Some(100), + xread_block_millis: Some(5000), xread_count: None, }, )?; diff --git a/src/config/config.rs b/src/config/config.rs index c32b564a..ee74dc87 100644 --- a/src/config/config.rs +++ b/src/config/config.rs @@ -19,6 +19,7 @@ pub struct Config { pub application: ApplicationSetting, pub websocket: WebsocketSetting, pub redis_uri: Secret, + pub redis_worker_count: usize, pub s3: S3Setting, pub appflowy_ai: AppFlowyAISetting, pub grpc_history: GrpcHistorySetting, @@ -146,7 +147,6 @@ pub struct CollabSetting { pub group_prune_grace_period_secs: u64, pub edit_state_max_count: u32, pub edit_state_max_secs: i64, - pub save_snapshot_retries: u32, } #[derive(Clone, Debug)] @@ -225,6 +225,7 @@ pub fn get_configuration() -> Result { min_client_version: get_env_var("APPFLOWY_WEBSOCKET_CLIENT_MIN_VERSION", "0.5.0").parse()?, }, redis_uri: get_env_var("APPFLOWY_REDIS_URI", "redis://localhost:6379").into(), + redis_worker_count: get_env_var("APPFLOWY_REDIS_WORKERS", "60").parse()?, s3: S3Setting { create_bucket: get_env_var("APPFLOWY_S3_CREATE_BUCKET", "true") .parse() @@ -255,7 +256,6 @@ pub fn get_configuration() -> Result { .parse()?, edit_state_max_count: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_COUNT", "100").parse()?, edit_state_max_secs: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_SECS", "60").parse()?, - save_snapshot_retries: get_env_var("APPFLOWY_COLLAB_SAVE_SNAPSHOT_RETRIES", "10").parse()?, }, published_collab: PublishedCollabSetting { storage_backend: get_env_var("APPFLOWY_PUBLISHED_COLLAB_STORAGE_BACKEND", "postgres")