From 115d0460941bd770faab4ca3f7ab8f1d80d215cb Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Mon, 27 May 2024 14:09:57 +0800 Subject: [PATCH] chore: periodicall check redis stream capacity (#580) --- Cargo.lock | 1 + libs/collab-stream/Cargo.toml | 2 + libs/collab-stream/src/client.rs | 4 +- libs/collab-stream/src/stream_group.rs | 58 ++++++++++++++++++++++++-- 4 files changed, 59 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 179ac4a0..a2d8a319 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1731,6 +1731,7 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", + "tokio-util", "tracing", ] diff --git a/libs/collab-stream/Cargo.toml b/libs/collab-stream/Cargo.toml index bf4c081d..64d82bba 100644 --- a/libs/collab-stream/Cargo.toml +++ b/libs/collab-stream/Cargo.toml @@ -18,6 +18,8 @@ bincode = "1.3.3" collab-entity.workspace = true serde_json.workspace = true chrono = "0.4" +tokio-util = { version = "0.7" } + [dev-dependencies] futures = "0.3.30" diff --git a/libs/collab-stream/src/client.rs b/libs/collab-stream/src/client.rs index 60b164b9..ce976146 100644 --- a/libs/collab-stream/src/client.rs +++ b/libs/collab-stream/src/client.rs @@ -52,8 +52,8 @@ impl CollabRedisStream { group_name, self.connection_manager.clone(), StreamConfig::new() - // 1000 messages - .with_max_len(1000) + // 2000 messages + .with_max_len(2000) // 12 hours .with_expire_time(60 * 60 * 12), ); diff --git a/libs/collab-stream/src/stream_group.rs b/libs/collab-stream/src/stream_group.rs index afd891b7..6d67ed31 100644 --- a/libs/collab-stream/src/stream_group.rs +++ b/libs/collab-stream/src/stream_group.rs @@ -7,8 +7,11 @@ use redis::streams::{ StreamReadOptions, }; use redis::{pipe, AsyncCommands, RedisResult}; +use std::sync::Arc; +use std::time::Duration; +use tokio_util::sync::CancellationToken; -use tracing::{error, trace}; +use tracing::{error, trace, warn}; #[derive(Clone)] pub struct StreamGroup { @@ -17,8 +20,13 @@ pub struct StreamGroup { group_name: String, config: StreamConfig, last_expiration_set: Option>, + cancel_token: Arc, +} +impl Drop for StreamGroup { + fn drop(&mut self) { + self.cancel_token.cancel(); + } } - impl StreamGroup { pub fn new(stream_key: String, group_name: &str, connection_manager: ConnectionManager) -> Self { let config = StreamConfig { @@ -34,14 +42,18 @@ impl StreamGroup { connection_manager: ConnectionManager, config: StreamConfig, ) -> Self { + let cancel_token = Arc::new(CancellationToken::new()); let group_name = group_name.to_string(); - Self { + let group = Self { group_name, connection_manager, stream_key, config, last_expiration_set: None, - } + cancel_token, + }; + group.spawn_periodic_check(); + group } /// Ensures the consumer group exists, creating it if necessary. @@ -328,6 +340,7 @@ impl StreamGroup { let now = Utc::now(); if let Some(expire_time) = self.config.expire_time_in_secs { let should_set_expiration = match self.last_expiration_set { + // Set expiration if it has been more than an hour since the last set Some(last_set) => now.signed_duration_since(last_set) > chrono::Duration::seconds(60 * 60), None => true, }; @@ -343,6 +356,43 @@ impl StreamGroup { Ok(()) } + + /// Spawns a periodic task to check the stream length. + fn spawn_periodic_check(&self) { + if let Some(max_len) = self.config.max_len { + let stream_key = self.stream_key.clone(); + let mut connection_manager = self.connection_manager.clone(); + let cancel_token = self.cancel_token.clone(); + tokio::spawn(async move { + // Check every hour + let mut interval = tokio::time::interval(Duration::from_secs(3600)); + loop { + tokio::select! { + _ = interval.tick() => { + if let Ok(len) = get_stream_length(&mut connection_manager, &stream_key).await { + if len + 100 > max_len { + warn!("stream len is going to exceed the max len: {}, current: {}", max_len, len); + } + } + } + _ = cancel_token.cancelled() => { + trace!("Stream length check task cancelled."); + break; + } + } + } + }); + } + } +} + +/// Checks if the stream length exceeds the maximum length. +async fn get_stream_length( + connection_manager: &mut ConnectionManager, + stream_key: &str, +) -> Result { + let current_len: usize = connection_manager.xlen(stream_key).await?; + Ok(current_len) } pub enum ReadOption {