diff --git a/libs/collab-stream/src/client.rs b/libs/collab-stream/src/client.rs index eba44cca..4d33e2bf 100644 --- a/libs/collab-stream/src/client.rs +++ b/libs/collab-stream/src/client.rs @@ -100,14 +100,23 @@ impl CollabRedisStream { workspace_id: &str, object_id: &str, since: Option, + live: bool, ) -> impl Stream> { - // use `:` separator as it adheres to Redis naming conventions let mut conn = self.connection_manager.clone(); let stream_key = CollabStreamUpdate::stream_key(workspace_id, object_id); let read_options = StreamReadOptions::default().count(100); let mut since = since.unwrap_or_default(); async_stream::try_stream! { - loop { + let until = if live { + None + } else { + let mut reply: StreamRangeReply = conn.xrevrange_count(&stream_key, "+", "-", 1).await?; + match reply.ids.pop().map(|stream_id| stream_id.id) { + None => Some(MessageId::default()), + Some(id) => Some(MessageId::try_from(id)?), + } + }; + while Some(since) < until { let last_id = since.to_string(); let batch: CollabStreamUpdateBatch = conn .xread_options(&[&stream_key], &[&last_id], &read_options) diff --git a/libs/collab-stream/src/model.rs b/libs/collab-stream/src/model.rs index 9f2d6b01..f41fdc1c 100644 --- a/libs/collab-stream/src/model.rs +++ b/libs/collab-stream/src/model.rs @@ -390,6 +390,7 @@ impl CollabStreamUpdate { /// Returns Redis stream key, that's storing entries mapped to/from [CollabStreamUpdate]. pub fn stream_key(workspace_id: &str, object_id: &str) -> String { + // use `:` separator as it adheres to Redis naming conventions format!("af:{}:{}:updates", workspace_id, object_id) } diff --git a/services/appflowy-collaborate/src/group/group_init.rs b/services/appflowy-collaborate/src/group/group_init.rs index dfec05dd..c5340e87 100644 --- a/services/appflowy-collaborate/src/group/group_init.rs +++ b/services/appflowy-collaborate/src/group/group_init.rs @@ -167,6 +167,7 @@ impl CollabGroup { &state.workspace_id, &state.object_id, None, + true, ); pin_mut!(updates); loop { @@ -550,6 +551,7 @@ impl CollabGroup { } }, Err(err) => { + tracing::warn!("[realtime]: failed to handled message: {}", msg_id); state.metrics.apply_update_failed_count.inc(); let code = Self::ack_code_from_error(&err); @@ -902,6 +904,7 @@ impl CollabPersister { &self.workspace_id, &self.object_id, None, //TODO: store Redis last msg id somewhere in doc state snapshot and replay from there + false, // read only data currently existing in the stream ); pin_mut!(stream); let mut i = 0;