chore: optimize redis stream space usage (#579)

* chore: set maxlen for redis stream

* chore: update log

* chore: set expiration time
This commit is contained in:
Nathan.fooo 2024-05-27 13:22:14 +08:00 committed by GitHub
parent 93b4a1516c
commit 3f2d5f0785
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 141 additions and 9 deletions

1
Cargo.lock generated
View File

@ -1721,6 +1721,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"bincode",
"chrono",
"collab-entity",
"futures",
"rand 0.8.5",

View File

@ -17,6 +17,7 @@ serde = { version = "1", features = ["derive"] }
bincode = "1.3.3"
collab-entity.workspace = true
serde_json.workspace = true
chrono = "0.4"
[dev-dependencies]
futures = "0.3.30"

View File

@ -1,7 +1,7 @@
use crate::error::StreamError;
use crate::pubsub::{CollabStreamPub, CollabStreamSub};
use crate::stream::CollabStream;
use crate::stream_group::StreamGroup;
use crate::stream_group::{StreamConfig, StreamGroup};
use redis::aio::ConnectionManager;
pub const CONTROL_STREAM_KEY: &str = "af_collab_control";
@ -30,7 +30,12 @@ impl CollabRedisStream {
key: &str,
group_name: &str,
) -> Result<StreamGroup, StreamError> {
let mut group = StreamGroup::new(key.to_string(), group_name, self.connection_manager.clone());
let mut group = StreamGroup::new_with_config(
key.to_string(),
group_name,
self.connection_manager.clone(),
StreamConfig::new().with_max_len(1000),
);
group.ensure_consumer_group().await?;
Ok(group)
}
@ -42,7 +47,16 @@ impl CollabRedisStream {
group_name: &str,
) -> Result<StreamGroup, StreamError> {
let stream_key = format!("af_collab_update-{}-{}", workspace_id, oid);
let mut group = StreamGroup::new(stream_key, group_name, self.connection_manager.clone());
let mut group = StreamGroup::new_with_config(
stream_key,
group_name,
self.connection_manager.clone(),
StreamConfig::new()
// 1000 messages
.with_max_len(1000)
// 12 hours
.with_expire_time(60 * 60 * 12),
);
group.ensure_consumer_group().await?;
Ok(group)
}

View File

@ -1,11 +1,13 @@
use crate::error::StreamError;
use crate::model::{MessageId, StreamBinary, StreamMessage, StreamMessageByStreamKey};
use chrono::{DateTime, Utc};
use redis::aio::ConnectionManager;
use redis::streams::{
StreamClaimOptions, StreamClaimReply, StreamMaxlen, StreamPendingData, StreamPendingReply,
StreamReadOptions,
};
use redis::{pipe, AsyncCommands, RedisResult};
use tracing::{error, trace};
#[derive(Clone)]
@ -13,15 +15,32 @@ pub struct StreamGroup {
connection_manager: ConnectionManager,
stream_key: String,
group_name: String,
config: StreamConfig,
last_expiration_set: Option<DateTime<Utc>>,
}
impl StreamGroup {
pub fn new(stream_key: String, group_name: &str, connection_manager: ConnectionManager) -> Self {
let config = StreamConfig {
max_len: Some(1000),
expire_time_in_secs: None,
};
Self::new_with_config(stream_key, group_name, connection_manager, config)
}
pub fn new_with_config(
stream_key: String,
group_name: &str,
connection_manager: ConnectionManager,
config: StreamConfig,
) -> Self {
let group_name = group_name.to_string();
Self {
group_name,
connection_manager,
stream_key,
config,
last_expiration_set: None,
}
}
@ -98,9 +117,24 @@ impl StreamGroup {
for message in messages {
let message = message.into();
let tuple = message.into_tuple_array();
pipe.xadd(&self.stream_key, "*", tuple.as_slice());
if let Some(len) = self.config.max_len {
pipe
.cmd("XADD")
.arg(&self.stream_key)
.arg("MAXLEN")
.arg("~")
.arg(len)
.arg("*")
.arg(&tuple);
} else {
pipe.cmd("XADD").arg(&self.stream_key).arg("*").arg(&tuple);
}
}
pipe.query_async(&mut self.connection_manager).await?;
if let Err(err) = self.set_expiration().await {
error!("set expiration fail: {:?}", err);
}
Ok(())
}
@ -115,10 +149,24 @@ impl StreamGroup {
pub async fn insert_binary(&mut self, message: StreamBinary) -> Result<(), StreamError> {
let tuple = message.into_tuple_array();
self
.connection_manager
.xadd(&self.stream_key, "*", tuple.as_slice())
.await?;
match self.config.max_len {
Some(max_len) => {
self
.connection_manager
.xadd_maxlen(&self.stream_key, StreamMaxlen::Approx(max_len), "*", &tuple)
.await?;
},
None => {
self
.connection_manager
.xadd(&self.stream_key, "*", tuple.as_slice())
.await?;
},
}
if let Err(err) = self.set_expiration().await {
error!("set expiration fail: {:?}", err);
}
Ok(())
}
@ -268,6 +316,33 @@ impl StreamGroup {
.await?;
Ok(())
}
/// For now, we only call this function after inserting messages into the stream.
///
/// Inserting a Redis EXPIRE command after adding messages ensures that the stream's
/// TTL (time-to-live) is updated whenever new data is inserted. This is crucial for streams
/// where data needs to be automatically cleaned up after a certain period. Setting the expiration
/// after every insertion guarantees that the stream will expire correctly, even if messages are
/// inserted at irregular intervals.
async fn set_expiration(&mut self) -> Result<(), StreamError> {
let now = Utc::now();
if let Some(expire_time) = self.config.expire_time_in_secs {
let should_set_expiration = match self.last_expiration_set {
Some(last_set) => now.signed_duration_since(last_set) > chrono::Duration::seconds(60 * 60),
None => true,
};
if should_set_expiration {
self
.connection_manager
.expire(&self.stream_key, expire_time)
.await?;
self.last_expiration_set = Some(now);
}
}
Ok(())
}
}
pub enum ReadOption {
@ -275,3 +350,39 @@ pub enum ReadOption {
Count(usize),
After(MessageId),
}
#[derive(Clone)]
pub struct StreamConfig {
/// Sets the maximum length of the stream.
max_len: Option<usize>,
/// Set the time in secs for the stream to expire. After the time has passed, the stream will be
/// automatically deleted. All messages in the stream will be removed.
///
/// If the stream does not exist (e.g., it has expired), inserting a message will automatically
/// create the stream.
expire_time_in_secs: Option<i64>,
}
impl Default for StreamConfig {
fn default() -> Self {
Self::new()
}
}
impl StreamConfig {
pub fn new() -> Self {
Self {
max_len: None,
expire_time_in_secs: None,
}
}
pub fn with_max_len(mut self, max_len: usize) -> Self {
self.max_len = Some(max_len);
self
}
pub fn with_expire_time(mut self, expire_time_in_secs: i64) -> Self {
self.expire_time_in_secs = Some(expire_time_in_secs);
self
}
}

View File

@ -97,6 +97,7 @@ async fn spawn_control_group(
.await
{
if let Some(handles) = weak_handles.upgrade() {
trace!("[History] received {} control messages", messages.len());
for message in &messages {
if let Ok(event) = CollabControlEvent::decode(&message.data) {
handle_control_event(&redis_stream, event, &handles, &pg_pool).await;
@ -127,7 +128,11 @@ async fn handle_control_event(
} => match handles.entry(object_id.clone()) {
Entry::Occupied(_) => {},
Entry::Vacant(entry) => {
trace!("[History] create collab: {}", object_id);
trace!(
"[History] create collab: {}, collab_type:{}",
object_id,
collab_type
);
match init_collab_handle(
redis_stream,
pg_pool,