chore: prevent hot loops in collab updates stream

This commit is contained in:
Bartosz Sypytkowski 2024-10-29 07:31:57 +01:00
parent 3ad2c3dacb
commit 961c482995
2 changed files with 38 additions and 29 deletions

View File

@ -121,6 +121,10 @@ impl CollabRedisStream {
let batch: CollabStreamUpdateBatch = conn let batch: CollabStreamUpdateBatch = conn
.xread_options(&[&stream_key], &[&last_id], &read_options) .xread_options(&[&stream_key], &[&last_id], &read_options)
.await?; .await?;
if batch.updates.is_empty() {
tokio::time::sleep(Duration::from_millis(1000)).await;
} else {
for (message_id, update) in batch.updates { for (message_id, update) in batch.updates {
since = since.max(message_id); since = since.max(message_id);
yield (message_id, update); yield (message_id, update);
@ -128,6 +132,7 @@ impl CollabRedisStream {
} }
} }
} }
}
pub fn awareness_updates( pub fn awareness_updates(
&self, &self,
@ -146,6 +151,9 @@ impl CollabRedisStream {
let batch: AwarenessStreamUpdateBatch = conn let batch: AwarenessStreamUpdateBatch = conn
.xread_options(&[&stream_key], &[&last_id], &read_options) .xread_options(&[&stream_key], &[&last_id], &read_options)
.await?; .await?;
if batch.updates.is_empty() {
tokio::time::sleep(Duration::from_millis(1000)).await;
} else {
for (message_id, update) in batch.updates { for (message_id, update) in batch.updates {
since = since.max(message_id); since = since.max(message_id);
yield update; yield update;
@ -153,32 +161,32 @@ impl CollabRedisStream {
} }
} }
} }
}
pub async fn prune_stream( pub async fn prune_stream(
&self, &self,
stream_key: &str, stream_key: &str,
message_id: MessageId, mut message_id: MessageId,
) -> Result<usize, StreamError> { ) -> Result<usize, StreamError> {
let mut conn = self.connection_manager.clone(); let mut conn = self.connection_manager.clone();
let value = conn.xrange(stream_key, "-", message_id.to_string()).await?; // we want to delete everything <= message_id
let value = StreamRangeReply::from_owned_redis_value(value)?; message_id.sequence_number += 1;
let msg_ids: Vec<_> = value let value = conn
.ids .send_packed_command(
.into_iter() redis::cmd("XTRIM")
.map(|stream_id| stream_id.id) .arg(stream_key)
.collect(); .arg("MINID")
if !msg_ids.is_empty() { .arg(format!("{}", message_id)),
let count: usize = conn.xdel(stream_key, &msg_ids).await?; )
.await?;
let count = usize::from_redis_value(&value)?;
drop(conn); drop(conn);
tracing::debug!( tracing::info!(
"Pruned redis stream `{}` <= `{}` ({} objects)", "pruned redis stream `{}` <= `{}` ({} objects)",
stream_key, stream_key,
message_id, message_id,
count count
); );
Ok(count) Ok(count)
} else {
Ok(0)
}
} }
} }

View File

@ -29,7 +29,7 @@ use futures::{pin_mut, Sink, Stream};
use futures_util::{SinkExt, StreamExt}; use futures_util::{SinkExt, StreamExt};
use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant, SystemTime};
use tokio::time::MissedTickBehavior; use tokio::time::MissedTickBehavior;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{error, info, trace}; use tracing::{error, info, trace};
@ -1070,8 +1070,9 @@ impl CollabPersister {
); );
// 3. finally we can drop Redis messages // 3. finally we can drop Redis messages
let now = SystemTime::UNIX_EPOCH.elapsed().unwrap().as_millis();
let msg_id = MessageId { let msg_id = MessageId {
timestamp_ms: message_id.timestamp_ms - self.prune_grace_period.as_millis() as u64, timestamp_ms: (now - self.prune_grace_period.as_millis()) as u64,
sequence_number: 0, sequence_number: 0,
}; };
let stream_key = CollabStreamUpdate::stream_key(&self.workspace_id, &self.object_id); let stream_key = CollabStreamUpdate::stream_key(&self.workspace_id, &self.object_id);