From 5ea7f5e482d05fd659cc6f7e4e83fe019e762ece Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Tue, 29 Oct 2024 07:57:02 +0100 Subject: [PATCH] chore: add exponential backoff to collab update stream --- libs/collab-stream/src/client.rs | 38 ++++++++++++++++++++++++++++++-- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/libs/collab-stream/src/client.rs b/libs/collab-stream/src/client.rs index 35b27bc7..881b867a 100644 --- a/libs/collab-stream/src/client.rs +++ b/libs/collab-stream/src/client.rs @@ -106,6 +106,11 @@ impl CollabRedisStream { let stream_key = CollabStreamUpdate::stream_key(workspace_id, object_id); let read_options = StreamReadOptions::default().count(100); let mut since = since.unwrap_or_default(); + + // instead of doing active blocking xread, use an exponential backoff to wait asynchronously + // the longer Redis stream is inactive the longer the pause will become, and it will + // reset once we get new data onboard + let mut backoff = ExponentialBackoff::new(Duration::from_millis(100), Duration::from_secs(10)); async_stream::try_stream! { let until = if live { None @@ -123,8 +128,9 @@ impl CollabRedisStream { .await?; if batch.updates.is_empty() { - tokio::time::sleep(Duration::from_millis(1000)).await; + backoff.sleep().await; // stream has no new messages, phase out } else { + backoff.reset(); for (message_id, update) in batch.updates { since = since.max(message_id); yield (message_id, update); @@ -145,6 +151,7 @@ impl CollabRedisStream { let stream_key = AwarenessStreamUpdate::stream_key(workspace_id, object_id); let read_options = StreamReadOptions::default().count(100); let mut since = since.unwrap_or_default(); + let mut backoff = ExponentialBackoff::new(Duration::from_millis(100), Duration::from_secs(5)); async_stream::try_stream! { loop { let last_id = since.to_string(); @@ -152,8 +159,9 @@ impl CollabRedisStream { .xread_options(&[&stream_key], &[&last_id], &read_options) .await?; if batch.updates.is_empty() { - tokio::time::sleep(Duration::from_millis(1000)).await; + backoff.sleep().await; } else { + backoff.reset(); for (message_id, update) in batch.updates { since = since.max(message_id); yield update; @@ -190,3 +198,29 @@ impl CollabRedisStream { Ok(count) } } + +struct ExponentialBackoff { + start: Duration, + end: Duration, + current: Duration, +} + +impl ExponentialBackoff { + pub fn new(start: Duration, end: Duration) -> Self { + ExponentialBackoff { + start, + end, + current: start, + } + } + + pub fn sleep(&mut self) -> tokio::time::Sleep { + let t = self.current; + self.current = Duration::from_millis(self.current.as_millis() as u64 * 2).min(self.end); + tokio::time::sleep(t) + } + + pub fn reset(&mut self) { + self.current = self.start; + } +}