From 961c482995365a1b89acbed44c88776745a9d9b1 Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Tue, 29 Oct 2024 07:31:57 +0100 Subject: [PATCH] chore: prevent hot loops in collab updates stream --- libs/collab-stream/src/client.rs | 62 +++++++++++-------- .../src/group/group_init.rs | 5 +- 2 files changed, 38 insertions(+), 29 deletions(-) diff --git a/libs/collab-stream/src/client.rs b/libs/collab-stream/src/client.rs index cff4ace3..35b27bc7 100644 --- a/libs/collab-stream/src/client.rs +++ b/libs/collab-stream/src/client.rs @@ -121,9 +121,14 @@ impl CollabRedisStream { let batch: CollabStreamUpdateBatch = conn .xread_options(&[&stream_key], &[&last_id], &read_options) .await?; - for (message_id, update) in batch.updates { - since = since.max(message_id); - yield (message_id, update); + + if batch.updates.is_empty() { + tokio::time::sleep(Duration::from_millis(1000)).await; + } else { + for (message_id, update) in batch.updates { + since = since.max(message_id); + yield (message_id, update); + } } } } @@ -146,9 +151,13 @@ impl CollabRedisStream { let batch: AwarenessStreamUpdateBatch = conn .xread_options(&[&stream_key], &[&last_id], &read_options) .await?; - for (message_id, update) in batch.updates { - since = since.max(message_id); - yield update; + if batch.updates.is_empty() { + tokio::time::sleep(Duration::from_millis(1000)).await; + } else { + for (message_id, update) in batch.updates { + since = since.max(message_id); + yield update; + } } } } @@ -157,28 +166,27 @@ impl CollabRedisStream { pub async fn prune_stream( &self, stream_key: &str, - message_id: MessageId, + mut message_id: MessageId, ) -> Result { let mut conn = self.connection_manager.clone(); - let value = conn.xrange(stream_key, "-", message_id.to_string()).await?; - let value = StreamRangeReply::from_owned_redis_value(value)?; - let msg_ids: Vec<_> = value - .ids - .into_iter() - .map(|stream_id| stream_id.id) - .collect(); - if !msg_ids.is_empty() { - let count: usize = conn.xdel(stream_key, &msg_ids).await?; - drop(conn); - tracing::debug!( - "Pruned redis stream `{}` <= `{}` ({} objects)", - stream_key, - message_id, - count - ); - Ok(count) - } else { - Ok(0) - } + // we want to delete everything <= message_id + message_id.sequence_number += 1; + let value = conn + .send_packed_command( + redis::cmd("XTRIM") + .arg(stream_key) + .arg("MINID") + .arg(format!("{}", message_id)), + ) + .await?; + let count = usize::from_redis_value(&value)?; + drop(conn); + tracing::info!( + "pruned redis stream `{}` <= `{}` ({} objects)", + stream_key, + message_id, + count + ); + Ok(count) } } diff --git a/services/appflowy-collaborate/src/group/group_init.rs b/services/appflowy-collaborate/src/group/group_init.rs index 8971b144..c367ba99 100644 --- a/services/appflowy-collaborate/src/group/group_init.rs +++ b/services/appflowy-collaborate/src/group/group_init.rs @@ -29,7 +29,7 @@ use futures::{pin_mut, Sink, Stream}; use futures_util::{SinkExt, StreamExt}; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::{Duration, Instant, SystemTime}; use tokio::time::MissedTickBehavior; use tokio_util::sync::CancellationToken; use tracing::{error, info, trace}; @@ -1070,8 +1070,9 @@ impl CollabPersister { ); // 3. finally we can drop Redis messages + let now = SystemTime::UNIX_EPOCH.elapsed().unwrap().as_millis(); let msg_id = MessageId { - timestamp_ms: message_id.timestamp_ms - self.prune_grace_period.as_millis() as u64, + timestamp_ms: (now - self.prune_grace_period.as_millis()) as u64, sequence_number: 0, }; let stream_key = CollabStreamUpdate::stream_key(&self.workspace_id, &self.object_id);