chore: add config option for redis stream group workers count

This commit is contained in:
Bartosz Sypytkowski 2024-11-08 04:39:44 +01:00
parent b9ca480143
commit eb54947ab7
8 changed files with 15 additions and 37 deletions

View File

@ -25,9 +25,9 @@ impl CollabRedisStream {
pub async fn new(redis_client: redis::Client) -> Result<Self, redis::RedisError> {
let router_options = StreamRouterOptions {
worker_count: 10,
worker_count: 60,
xread_streams: 100,
xread_block_millis: Some(100),
xread_block_millis: Some(5000),
xread_count: None,
};
let stream_router = Arc::new(StreamRouter::with_options(&redis_client, router_options)?);

View File

@ -78,7 +78,6 @@ pub async fn run_actix_server(
state.redis_connection_manager.clone(),
Duration::from_secs(config.collab.group_persistence_interval_secs),
Duration::from_secs(config.collab.group_prune_grace_period_secs),
config.collab.save_snapshot_retries,
state.indexer_provider.clone(),
)
.await
@ -108,7 +107,7 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result<A
info!("Connecting to Redis...");
let (redis_conn_manager, redis_stream_router) =
get_redis_client(config.redis_uri.expose_secret()).await?;
get_redis_client(config.redis_uri.expose_secret(), config.redis_worker_count).await?;
// Pg listeners
info!("Setting up Pg listeners...");
@ -163,6 +162,7 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result<A
async fn get_redis_client(
redis_uri: &str,
worker_count: usize,
) -> Result<(redis::aio::ConnectionManager, Arc<StreamRouter>), Error> {
info!("Connecting to redis with uri: {}", redis_uri);
let client = redis::Client::open(redis_uri).context("failed to connect to redis")?;
@ -170,9 +170,9 @@ async fn get_redis_client(
let router = StreamRouter::with_options(
&client,
StreamRouterOptions {
worker_count: 10,
worker_count,
xread_streams: 100,
xread_block_millis: Some(100),
xread_block_millis: Some(5000),
xread_count: None,
},
)?;

View File

@ -15,6 +15,7 @@ pub struct Config {
pub gotrue: GoTrueSetting,
pub collab: CollabSetting,
pub redis_uri: Secret<String>,
pub redis_worker_count: usize,
pub ai: AISettings,
}
@ -117,7 +118,6 @@ pub struct CollabSetting {
pub group_prune_grace_period_secs: u64,
pub edit_state_max_count: u32,
pub edit_state_max_secs: i64,
pub save_snapshot_retries: u32,
}
pub fn get_env_var(key: &str, default: &str) -> String {
@ -170,9 +170,9 @@ pub fn get_configuration() -> Result<Config, anyhow::Error> {
.parse()?,
edit_state_max_count: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_COUNT", "100").parse()?,
edit_state_max_secs: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_SECS", "60").parse()?,
save_snapshot_retries: get_env_var("APPFLOWY_COLLAB_SAVE_SNAPSHOT_RETRIES", "10").parse()?,
},
redis_uri: get_env_var("APPFLOWY_REDIS_URI", "redis://localhost:6379").into(),
redis_worker_count: get_env_var("APPFLOWY_REDIS_WORKERS", "60").parse()?,
ai: AISettings {
port: get_env_var("APPFLOWY_AI_SERVER_PORT", "5001").parse()?,
host: get_env_var("APPFLOWY_AI_SERVER_HOST", "localhost"),

View File

@ -82,7 +82,6 @@ impl CollabGroup {
collab_redis_stream: Arc<CollabRedisStream>,
persistence_interval: Duration,
prune_grace_period: Duration,
save_snapshot_retries: u32,
indexer: Option<Arc<dyn Indexer>>,
) -> Result<Self, StreamError>
where
@ -148,7 +147,6 @@ impl CollabGroup {
state.clone(),
persistence_interval,
is_new_collab,
save_snapshot_retries,
));
}
@ -299,12 +297,7 @@ impl CollabGroup {
}
}
async fn snapshot_task(
state: Arc<CollabGroupState>,
interval: Duration,
is_new_collab: bool,
retries: u32,
) {
async fn snapshot_task(state: Arc<CollabGroupState>, interval: Duration, is_new_collab: bool) {
if is_new_collab {
tracing::trace!("persisting new collab for {}", state.object_id);
if let Err(err) = state.persister.save().await {
@ -320,20 +313,11 @@ impl CollabGroup {
// if saving took longer than snapshot_tick, just skip it over and try in the next round
snapshot_tick.set_missed_tick_behavior(MissedTickBehavior::Skip);
let mut fail_counter = 0;
loop {
tokio::select! {
_ = snapshot_tick.tick() => {
if let Err(err) = state.persister.save().await {
tracing::warn!("failed to persist collab `{}/{}`: {}", state.workspace_id, state.object_id, err);
fail_counter += 1;
if fail_counter >= retries {
tracing::info!("failed to persist `{}/{}` after {} consecutive tries. Shutting collab group down.", state.workspace_id, state.object_id, fail_counter);
state.shutdown.cancel();
break;
}
} else {
fail_counter = 0;
}
},
_ = state.shutdown.cancelled() => {

View File

@ -35,7 +35,6 @@ pub struct GroupManager<S> {
control_event_stream: Arc<Mutex<StreamGroup>>,
persistence_interval: Duration,
prune_grace_period: Duration,
save_snapshot_retries: u32,
indexer_provider: Arc<IndexerProvider>,
}
@ -51,7 +50,6 @@ where
collab_stream: CollabRedisStream,
persistence_interval: Duration,
prune_grace_period: Duration,
save_snapshot_retries: u32,
indexer_provider: Arc<IndexerProvider>,
) -> Result<Self, RealtimeError> {
let collab_stream = Arc::new(collab_stream);
@ -69,7 +67,6 @@ where
control_event_stream,
persistence_interval,
prune_grace_period,
save_snapshot_retries,
indexer_provider,
})
}
@ -233,7 +230,6 @@ where
self.collab_redis_stream.clone(),
self.persistence_interval,
self.prune_grace_period,
self.save_snapshot_retries,
indexer,
)
.await?,

View File

@ -55,7 +55,6 @@ where
redis_connection_manager: RedisConnectionManager,
group_persistence_interval: Duration,
prune_grace_period: Duration,
save_snapshot_retries: u32,
indexer_provider: Arc<IndexerProvider>,
) -> Result<Self, RealtimeError> {
let enable_custom_runtime = get_env_var("APPFLOWY_COLLABORATE_MULTI_THREAD", "false")
@ -79,7 +78,6 @@ where
collab_stream,
group_persistence_interval,
prune_grace_period,
save_snapshot_retries,
indexer_provider.clone(),
)
.await?,

View File

@ -137,7 +137,6 @@ pub async fn run_actix_server(
state.redis_connection_manager.clone(),
Duration::from_secs(config.collab.group_persistence_interval_secs),
Duration::from_secs(config.collab.group_prune_grace_period_secs),
config.collab.save_snapshot_retries,
state.indexer_provider.clone(),
)
.await
@ -248,7 +247,7 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result<A
// Redis
info!("Connecting to Redis...");
let (redis_conn_manager, redis_stream_router) =
get_redis_client(config.redis_uri.expose_secret()).await?;
get_redis_client(config.redis_uri.expose_secret(), config.redis_worker_count).await?;
info!("Setup AppFlowy AI: {}", config.appflowy_ai.url());
let appflowy_ai_client = AppFlowyAIClient::new(&config.appflowy_ai.url());
@ -426,6 +425,7 @@ async fn setup_admin_account(
async fn get_redis_client(
redis_uri: &str,
worker_count: usize,
) -> Result<(redis::aio::ConnectionManager, Arc<StreamRouter>), Error> {
info!("Connecting to redis with uri: {}", redis_uri);
let client = redis::Client::open(redis_uri).context("failed to connect to redis")?;
@ -433,9 +433,9 @@ async fn get_redis_client(
let router = StreamRouter::with_options(
&client,
StreamRouterOptions {
worker_count: 10,
worker_count,
xread_streams: 100,
xread_block_millis: Some(100),
xread_block_millis: Some(5000),
xread_count: None,
},
)?;

View File

@ -19,6 +19,7 @@ pub struct Config {
pub application: ApplicationSetting,
pub websocket: WebsocketSetting,
pub redis_uri: Secret<String>,
pub redis_worker_count: usize,
pub s3: S3Setting,
pub appflowy_ai: AppFlowyAISetting,
pub grpc_history: GrpcHistorySetting,
@ -146,7 +147,6 @@ pub struct CollabSetting {
pub group_prune_grace_period_secs: u64,
pub edit_state_max_count: u32,
pub edit_state_max_secs: i64,
pub save_snapshot_retries: u32,
}
#[derive(Clone, Debug)]
@ -225,6 +225,7 @@ pub fn get_configuration() -> Result<Config, anyhow::Error> {
min_client_version: get_env_var("APPFLOWY_WEBSOCKET_CLIENT_MIN_VERSION", "0.5.0").parse()?,
},
redis_uri: get_env_var("APPFLOWY_REDIS_URI", "redis://localhost:6379").into(),
redis_worker_count: get_env_var("APPFLOWY_REDIS_WORKERS", "60").parse()?,
s3: S3Setting {
create_bucket: get_env_var("APPFLOWY_S3_CREATE_BUCKET", "true")
.parse()
@ -255,7 +256,6 @@ pub fn get_configuration() -> Result<Config, anyhow::Error> {
.parse()?,
edit_state_max_count: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_COUNT", "100").parse()?,
edit_state_max_secs: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_SECS", "60").parse()?,
save_snapshot_retries: get_env_var("APPFLOWY_COLLAB_SAVE_SNAPSHOT_RETRIES", "10").parse()?,
},
published_collab: PublishedCollabSetting {
storage_backend: get_env_var("APPFLOWY_PUBLISHED_COLLAB_STORAGE_BACKEND", "postgres")