chore: periodicall check redis stream capacity (#580)
This commit is contained in:
parent
3f2d5f0785
commit
115d046094
|
|
@ -1731,6 +1731,7 @@ dependencies = [
|
||||||
"thiserror",
|
"thiserror",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-stream",
|
"tokio-stream",
|
||||||
|
"tokio-util",
|
||||||
"tracing",
|
"tracing",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -18,6 +18,8 @@ bincode = "1.3.3"
|
||||||
collab-entity.workspace = true
|
collab-entity.workspace = true
|
||||||
serde_json.workspace = true
|
serde_json.workspace = true
|
||||||
chrono = "0.4"
|
chrono = "0.4"
|
||||||
|
tokio-util = { version = "0.7" }
|
||||||
|
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
futures = "0.3.30"
|
futures = "0.3.30"
|
||||||
|
|
|
||||||
|
|
@ -52,8 +52,8 @@ impl CollabRedisStream {
|
||||||
group_name,
|
group_name,
|
||||||
self.connection_manager.clone(),
|
self.connection_manager.clone(),
|
||||||
StreamConfig::new()
|
StreamConfig::new()
|
||||||
// 1000 messages
|
// 2000 messages
|
||||||
.with_max_len(1000)
|
.with_max_len(2000)
|
||||||
// 12 hours
|
// 12 hours
|
||||||
.with_expire_time(60 * 60 * 12),
|
.with_expire_time(60 * 60 * 12),
|
||||||
);
|
);
|
||||||
|
|
|
||||||
|
|
@ -7,8 +7,11 @@ use redis::streams::{
|
||||||
StreamReadOptions,
|
StreamReadOptions,
|
||||||
};
|
};
|
||||||
use redis::{pipe, AsyncCommands, RedisResult};
|
use redis::{pipe, AsyncCommands, RedisResult};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration;
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
|
|
||||||
use tracing::{error, trace};
|
use tracing::{error, trace, warn};
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct StreamGroup {
|
pub struct StreamGroup {
|
||||||
|
|
@ -17,8 +20,13 @@ pub struct StreamGroup {
|
||||||
group_name: String,
|
group_name: String,
|
||||||
config: StreamConfig,
|
config: StreamConfig,
|
||||||
last_expiration_set: Option<DateTime<Utc>>,
|
last_expiration_set: Option<DateTime<Utc>>,
|
||||||
|
cancel_token: Arc<CancellationToken>,
|
||||||
|
}
|
||||||
|
impl Drop for StreamGroup {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.cancel_token.cancel();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl StreamGroup {
|
impl StreamGroup {
|
||||||
pub fn new(stream_key: String, group_name: &str, connection_manager: ConnectionManager) -> Self {
|
pub fn new(stream_key: String, group_name: &str, connection_manager: ConnectionManager) -> Self {
|
||||||
let config = StreamConfig {
|
let config = StreamConfig {
|
||||||
|
|
@ -34,14 +42,18 @@ impl StreamGroup {
|
||||||
connection_manager: ConnectionManager,
|
connection_manager: ConnectionManager,
|
||||||
config: StreamConfig,
|
config: StreamConfig,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
|
let cancel_token = Arc::new(CancellationToken::new());
|
||||||
let group_name = group_name.to_string();
|
let group_name = group_name.to_string();
|
||||||
Self {
|
let group = Self {
|
||||||
group_name,
|
group_name,
|
||||||
connection_manager,
|
connection_manager,
|
||||||
stream_key,
|
stream_key,
|
||||||
config,
|
config,
|
||||||
last_expiration_set: None,
|
last_expiration_set: None,
|
||||||
}
|
cancel_token,
|
||||||
|
};
|
||||||
|
group.spawn_periodic_check();
|
||||||
|
group
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Ensures the consumer group exists, creating it if necessary.
|
/// Ensures the consumer group exists, creating it if necessary.
|
||||||
|
|
@ -328,6 +340,7 @@ impl StreamGroup {
|
||||||
let now = Utc::now();
|
let now = Utc::now();
|
||||||
if let Some(expire_time) = self.config.expire_time_in_secs {
|
if let Some(expire_time) = self.config.expire_time_in_secs {
|
||||||
let should_set_expiration = match self.last_expiration_set {
|
let should_set_expiration = match self.last_expiration_set {
|
||||||
|
// Set expiration if it has been more than an hour since the last set
|
||||||
Some(last_set) => now.signed_duration_since(last_set) > chrono::Duration::seconds(60 * 60),
|
Some(last_set) => now.signed_duration_since(last_set) > chrono::Duration::seconds(60 * 60),
|
||||||
None => true,
|
None => true,
|
||||||
};
|
};
|
||||||
|
|
@ -343,6 +356,43 @@ impl StreamGroup {
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Spawns a periodic task to check the stream length.
|
||||||
|
fn spawn_periodic_check(&self) {
|
||||||
|
if let Some(max_len) = self.config.max_len {
|
||||||
|
let stream_key = self.stream_key.clone();
|
||||||
|
let mut connection_manager = self.connection_manager.clone();
|
||||||
|
let cancel_token = self.cancel_token.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
// Check every hour
|
||||||
|
let mut interval = tokio::time::interval(Duration::from_secs(3600));
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
_ = interval.tick() => {
|
||||||
|
if let Ok(len) = get_stream_length(&mut connection_manager, &stream_key).await {
|
||||||
|
if len + 100 > max_len {
|
||||||
|
warn!("stream len is going to exceed the max len: {}, current: {}", max_len, len);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ = cancel_token.cancelled() => {
|
||||||
|
trace!("Stream length check task cancelled.");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Checks if the stream length exceeds the maximum length.
|
||||||
|
async fn get_stream_length(
|
||||||
|
connection_manager: &mut ConnectionManager,
|
||||||
|
stream_key: &str,
|
||||||
|
) -> Result<usize, StreamError> {
|
||||||
|
let current_len: usize = connection_manager.xlen(stream_key).await?;
|
||||||
|
Ok(current_len)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub enum ReadOption {
|
pub enum ReadOption {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue