From 3f2d5f07850c1080d84c3121606eab2ffeea378e Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Mon, 27 May 2024 13:22:14 +0800 Subject: [PATCH] chore: optimize redis stream space usage (#579) * chore: set maxlen for redis stream * chore: update log * chore: set expiration time --- Cargo.lock | 1 + libs/collab-stream/Cargo.toml | 1 + libs/collab-stream/src/client.rs | 20 ++- libs/collab-stream/src/stream_group.rs | 121 +++++++++++++++++- services/appflowy-history/src/core/manager.rs | 7 +- 5 files changed, 141 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d68f7170..179ac4a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1721,6 +1721,7 @@ version = "0.1.0" dependencies = [ "anyhow", "bincode", + "chrono", "collab-entity", "futures", "rand 0.8.5", diff --git a/libs/collab-stream/Cargo.toml b/libs/collab-stream/Cargo.toml index 788ead87..bf4c081d 100644 --- a/libs/collab-stream/Cargo.toml +++ b/libs/collab-stream/Cargo.toml @@ -17,6 +17,7 @@ serde = { version = "1", features = ["derive"] } bincode = "1.3.3" collab-entity.workspace = true serde_json.workspace = true +chrono = "0.4" [dev-dependencies] futures = "0.3.30" diff --git a/libs/collab-stream/src/client.rs b/libs/collab-stream/src/client.rs index c72c37e2..60b164b9 100644 --- a/libs/collab-stream/src/client.rs +++ b/libs/collab-stream/src/client.rs @@ -1,7 +1,7 @@ use crate::error::StreamError; use crate::pubsub::{CollabStreamPub, CollabStreamSub}; use crate::stream::CollabStream; -use crate::stream_group::StreamGroup; +use crate::stream_group::{StreamConfig, StreamGroup}; use redis::aio::ConnectionManager; pub const CONTROL_STREAM_KEY: &str = "af_collab_control"; @@ -30,7 +30,12 @@ impl CollabRedisStream { key: &str, group_name: &str, ) -> Result { - let mut group = StreamGroup::new(key.to_string(), group_name, self.connection_manager.clone()); + let mut group = StreamGroup::new_with_config( + key.to_string(), + group_name, + self.connection_manager.clone(), + StreamConfig::new().with_max_len(1000), + ); group.ensure_consumer_group().await?; Ok(group) } @@ -42,7 +47,16 @@ impl CollabRedisStream { group_name: &str, ) -> Result { let stream_key = format!("af_collab_update-{}-{}", workspace_id, oid); - let mut group = StreamGroup::new(stream_key, group_name, self.connection_manager.clone()); + let mut group = StreamGroup::new_with_config( + stream_key, + group_name, + self.connection_manager.clone(), + StreamConfig::new() + // 1000 messages + .with_max_len(1000) + // 12 hours + .with_expire_time(60 * 60 * 12), + ); group.ensure_consumer_group().await?; Ok(group) } diff --git a/libs/collab-stream/src/stream_group.rs b/libs/collab-stream/src/stream_group.rs index 80c2c815..afd891b7 100644 --- a/libs/collab-stream/src/stream_group.rs +++ b/libs/collab-stream/src/stream_group.rs @@ -1,11 +1,13 @@ use crate::error::StreamError; use crate::model::{MessageId, StreamBinary, StreamMessage, StreamMessageByStreamKey}; +use chrono::{DateTime, Utc}; use redis::aio::ConnectionManager; use redis::streams::{ StreamClaimOptions, StreamClaimReply, StreamMaxlen, StreamPendingData, StreamPendingReply, StreamReadOptions, }; use redis::{pipe, AsyncCommands, RedisResult}; + use tracing::{error, trace}; #[derive(Clone)] @@ -13,15 +15,32 @@ pub struct StreamGroup { connection_manager: ConnectionManager, stream_key: String, group_name: String, + config: StreamConfig, + last_expiration_set: Option>, } impl StreamGroup { pub fn new(stream_key: String, group_name: &str, connection_manager: ConnectionManager) -> Self { + let config = StreamConfig { + max_len: Some(1000), + expire_time_in_secs: None, + }; + Self::new_with_config(stream_key, group_name, connection_manager, config) + } + + pub fn new_with_config( + stream_key: String, + group_name: &str, + connection_manager: ConnectionManager, + config: StreamConfig, + ) -> Self { let group_name = group_name.to_string(); Self { group_name, connection_manager, stream_key, + config, + last_expiration_set: None, } } @@ -98,9 +117,24 @@ impl StreamGroup { for message in messages { let message = message.into(); let tuple = message.into_tuple_array(); - pipe.xadd(&self.stream_key, "*", tuple.as_slice()); + if let Some(len) = self.config.max_len { + pipe + .cmd("XADD") + .arg(&self.stream_key) + .arg("MAXLEN") + .arg("~") + .arg(len) + .arg("*") + .arg(&tuple); + } else { + pipe.cmd("XADD").arg(&self.stream_key).arg("*").arg(&tuple); + } } + pipe.query_async(&mut self.connection_manager).await?; + if let Err(err) = self.set_expiration().await { + error!("set expiration fail: {:?}", err); + } Ok(()) } @@ -115,10 +149,24 @@ impl StreamGroup { pub async fn insert_binary(&mut self, message: StreamBinary) -> Result<(), StreamError> { let tuple = message.into_tuple_array(); - self - .connection_manager - .xadd(&self.stream_key, "*", tuple.as_slice()) - .await?; + match self.config.max_len { + Some(max_len) => { + self + .connection_manager + .xadd_maxlen(&self.stream_key, StreamMaxlen::Approx(max_len), "*", &tuple) + .await?; + }, + None => { + self + .connection_manager + .xadd(&self.stream_key, "*", tuple.as_slice()) + .await?; + }, + } + + if let Err(err) = self.set_expiration().await { + error!("set expiration fail: {:?}", err); + } Ok(()) } @@ -268,6 +316,33 @@ impl StreamGroup { .await?; Ok(()) } + + /// For now, we only call this function after inserting messages into the stream. + /// + /// Inserting a Redis EXPIRE command after adding messages ensures that the stream's + /// TTL (time-to-live) is updated whenever new data is inserted. This is crucial for streams + /// where data needs to be automatically cleaned up after a certain period. Setting the expiration + /// after every insertion guarantees that the stream will expire correctly, even if messages are + /// inserted at irregular intervals. + async fn set_expiration(&mut self) -> Result<(), StreamError> { + let now = Utc::now(); + if let Some(expire_time) = self.config.expire_time_in_secs { + let should_set_expiration = match self.last_expiration_set { + Some(last_set) => now.signed_duration_since(last_set) > chrono::Duration::seconds(60 * 60), + None => true, + }; + + if should_set_expiration { + self + .connection_manager + .expire(&self.stream_key, expire_time) + .await?; + self.last_expiration_set = Some(now); + } + } + + Ok(()) + } } pub enum ReadOption { @@ -275,3 +350,39 @@ pub enum ReadOption { Count(usize), After(MessageId), } + +#[derive(Clone)] +pub struct StreamConfig { + /// Sets the maximum length of the stream. + max_len: Option, + /// Set the time in secs for the stream to expire. After the time has passed, the stream will be + /// automatically deleted. All messages in the stream will be removed. + /// + /// If the stream does not exist (e.g., it has expired), inserting a message will automatically + /// create the stream. + expire_time_in_secs: Option, +} + +impl Default for StreamConfig { + fn default() -> Self { + Self::new() + } +} + +impl StreamConfig { + pub fn new() -> Self { + Self { + max_len: None, + expire_time_in_secs: None, + } + } + pub fn with_max_len(mut self, max_len: usize) -> Self { + self.max_len = Some(max_len); + self + } + + pub fn with_expire_time(mut self, expire_time_in_secs: i64) -> Self { + self.expire_time_in_secs = Some(expire_time_in_secs); + self + } +} diff --git a/services/appflowy-history/src/core/manager.rs b/services/appflowy-history/src/core/manager.rs index 5916c2e5..84c14bdd 100644 --- a/services/appflowy-history/src/core/manager.rs +++ b/services/appflowy-history/src/core/manager.rs @@ -97,6 +97,7 @@ async fn spawn_control_group( .await { if let Some(handles) = weak_handles.upgrade() { + trace!("[History] received {} control messages", messages.len()); for message in &messages { if let Ok(event) = CollabControlEvent::decode(&message.data) { handle_control_event(&redis_stream, event, &handles, &pg_pool).await; @@ -127,7 +128,11 @@ async fn handle_control_event( } => match handles.entry(object_id.clone()) { Entry::Occupied(_) => {}, Entry::Vacant(entry) => { - trace!("[History] create collab: {}", object_id); + trace!( + "[History] create collab: {}, collab_type:{}", + object_id, + collab_type + ); match init_collab_handle( redis_stream, pg_pool,