diff --git a/libs/collab-stream/src/client.rs b/libs/collab-stream/src/client.rs index 8a7be256..234e6b80 100644 --- a/libs/collab-stream/src/client.rs +++ b/libs/collab-stream/src/client.rs @@ -6,7 +6,7 @@ use crate::stream_group::{StreamConfig, StreamGroup}; use crate::stream_router::{StreamRouter, StreamRouterOptions}; use futures::Stream; use redis::aio::ConnectionManager; -use redis::streams::StreamReadOptions; +use redis::streams::StreamReadReply; use redis::{AsyncCommands, FromRedisValue}; use std::sync::Arc; use std::time::Duration; @@ -111,18 +111,46 @@ impl CollabRedisStream { AwarenessUpdateSink::new(self.connection_manager.clone(), stream_key) } - pub fn collab_updates( + /// Reads all collab updates for a given `workspace_id`:`object_id` entry, starting + /// from a given message id. Once Redis stream return no more results, the stream will be closed. + pub async fn current_collab_updates( + &self, + workspace_id: &str, + object_id: &str, + since: Option, + ) -> Result, StreamError> { + let stream_key = CollabStreamUpdate::stream_key(workspace_id, object_id); + let since = since.unwrap_or_default().to_string(); + let mut conn = self.connection_manager.clone(); + let mut result = Vec::new(); + let mut reply: StreamReadReply = conn.xread(&[&stream_key], &[&since]).await?; + if let Some(key) = reply.keys.pop() { + if key.key == stream_key { + for stream_id in key.ids { + let message_id = MessageId::try_from(stream_id.id)?; + let stream_update = CollabStreamUpdate::try_from(stream_id.map)?; + result.push((message_id, stream_update)); + } + } + } + Ok(result) + } + + /// Reads all collab updates for a given `workspace_id`:`object_id` entry, starting + /// from a given message id. This stream will be kept alive and pass over all future messages + /// coming from corresponding Redis stream until explicitly closed. + pub fn live_collab_updates( &self, workspace_id: &str, object_id: &str, since: Option, - keep_alive: bool, ) -> impl Stream> { let stream_key = CollabStreamUpdate::stream_key(workspace_id, object_id); let since = since.map(|id| id.to_string()); let mut reader = self.stream_router.observe(stream_key, since); async_stream::try_stream! { while let Some((message_id, fields)) = reader.recv().await { + tracing::trace!("incoming collab update `{}`", message_id); let message_id = MessageId::try_from(message_id).map_err(|e| internal(e.to_string()))?; let collab_update = CollabStreamUpdate::try_from(fields)?; yield (message_id, collab_update); @@ -140,7 +168,8 @@ impl CollabRedisStream { let since = since.map(|id| id.to_string()); let mut reader = self.stream_router.observe(stream_key, since); async_stream::try_stream! { - while let Some((_message_id, fields)) = reader.recv().await { + while let Some((message_id, fields)) = reader.recv().await { + tracing::trace!("incoming awareness update `{}`", message_id); let awareness_update = AwarenessStreamUpdate::try_from(fields)?; yield awareness_update; } diff --git a/libs/collab-stream/src/stream_router.rs b/libs/collab-stream/src/stream_router.rs index 5e42945c..5963726b 100644 --- a/libs/collab-stream/src/stream_router.rs +++ b/libs/collab-stream/src/stream_router.rs @@ -24,8 +24,9 @@ pub type StreamReader = tokio::sync::mpsc::UnboundedReceiver<(String, RedisMap)> /// fixed number of Redis connections. pub struct StreamRouter { buf: Sender, - workers: Vec, alive: Arc, + #[allow(dead_code)] + workers: Vec, } impl StreamRouter { @@ -59,7 +60,7 @@ impl StreamRouter { pub fn observe(&self, stream_key: StreamKey, last_id: Option) -> StreamReader { let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); - let last_id = last_id.unwrap_or_default(); + let last_id = last_id.unwrap_or_else(|| "0".to_string()); let h = StreamHandle::new(stream_key, last_id, tx); self.buf.send(h).unwrap(); rx @@ -106,7 +107,7 @@ impl Default for StreamRouterOptions { } struct Worker { - handle: JoinHandle<()>, + _handle: JoinHandle<()>, } impl Worker { @@ -131,7 +132,7 @@ impl Worker { tracing::error!("worker {} failed: {}", worker_id, err); } }); - Self { handle } + Self { _handle: handle } } fn process_streams( @@ -149,7 +150,9 @@ impl Worker { if !Self::read_buf(&rx, &mut stream_keys, &mut message_ids, &mut senders) { break; // rx channel has closed } - if stream_keys.is_empty() { + + let key_count = stream_keys.len(); + if key_count == 0 { tracing::warn!("Bug: read empty buf"); sleep(Duration::from_millis(100)); continue; @@ -157,6 +160,7 @@ impl Worker { let result: StreamReadReply = conn.xread_options(&stream_keys, &message_ids, &options)?; + let mut msgs = 0; for stream in result.keys { let mut remove_sender = false; if let Some((sender, idx)) = senders.get(stream.key.as_str()) { @@ -164,6 +168,7 @@ impl Worker { let message_id = id.id; let value = id.map; message_ids[*idx] = message_id.clone(); //TODO: optimize + msgs += 1; if let Err(err) = sender.send((message_id, value)) { tracing::warn!("failed to send: {}", err); remove_sender = true; @@ -176,20 +181,27 @@ impl Worker { } } + if msgs > 0 { + tracing::trace!( + "XREAD: read total of {} messages for {} streams", + msgs, + key_count + ); + } Self::schedule_back(&tx, &mut stream_keys, &mut message_ids, &mut senders); } Ok(()) } - fn schedule_back<'a>( + fn schedule_back( tx: &Sender, keys: &mut Vec, ids: &mut Vec, - senders: &mut HashMap<&'a str, (StreamSender, usize)>, + senders: &mut HashMap<&str, (StreamSender, usize)>, ) { - let mut keys = keys.drain(..); + let keys = keys.drain(..); let mut ids = ids.drain(..); - while let Some(key) = keys.next() { + for key in keys { if let Some(last_id) = ids.next() { if let Some((sender, _)) = senders.remove(key.as_str()) { let h = StreamHandle::new(key, last_id, sender); diff --git a/services/appflowy-collaborate/src/group/group_init.rs b/services/appflowy-collaborate/src/group/group_init.rs index 054bbc6a..43177710 100644 --- a/services/appflowy-collaborate/src/group/group_init.rs +++ b/services/appflowy-collaborate/src/group/group_init.rs @@ -166,11 +166,10 @@ impl CollabGroup { /// Task used to receive collab updates from Redis. async fn inbound_task(state: Arc) -> Result<(), RealtimeError> { - let updates = state.persister.collab_redis_stream.collab_updates( + let updates = state.persister.collab_redis_stream.live_collab_updates( &state.workspace_id, &state.object_id, None, - true, ); pin_mut!(updates); loop { @@ -936,34 +935,22 @@ impl CollabPersister { // 2. consume all Redis updates on top of it (keep redis msg id) let mut last_message_id = None; let mut tx = collab.transact_mut(); - let stream = self.collab_redis_stream.collab_updates( - &self.workspace_id, - &self.object_id, - None, //TODO: store Redis last msg id somewhere in doc state snapshot and replay from there - false, // read only data currently existing in the stream - ); - pin_mut!(stream); + let updates = self + .collab_redis_stream + .current_collab_updates( + &self.workspace_id, + &self.object_id, + None, //TODO: store Redis last msg id somewhere in doc state snapshot and replay from there + ) + .await?; let mut i = 0; - while let Some(res) = stream.next().await { - match res { - Ok((message_id, update)) => { - i += 1; - let update: Update = update.into_update()?; - tx.apply_update(update) - .map_err(|err| RTProtocolError::YrsApplyUpdate(err.to_string()))?; - last_message_id = Some(message_id); //TODO: shouldn't this happen before decoding? - self.metrics.apply_update_count.inc(); - }, - Err(err) => { - self.metrics.apply_update_failed_count.inc(); - tracing::error!( - "`{}` failed to resolve collab update: {}", - self.object_id, - err - ); - break; - }, - } + for (message_id, update) in updates { + i += 1; + let update: Update = update.into_update()?; + tx.apply_update(update) + .map_err(|err| RTProtocolError::YrsApplyUpdate(err.to_string()))?; + last_message_id = Some(message_id); //TODO: shouldn't this happen before decoding? + self.metrics.apply_update_count.inc(); } drop(tx); tracing::trace!( @@ -988,49 +975,33 @@ impl CollabPersister { /// waiting to be merged into main document state. async fn load_if_changed(&self) -> Result, RealtimeError> { // 1. load pending Redis updates - let stream = self.collab_redis_stream.collab_updates( - &self.workspace_id, - &self.object_id, - None, //TODO: store Redis last msg id somewhere in doc state snapshot and replay from there - false, // read only data currently existing in the stream - ); - pin_mut!(stream); + let updates = self + .collab_redis_stream + .current_collab_updates(&self.workspace_id, &self.object_id, None) + .await?; let start = Instant::now(); let mut i = 0; let mut collab = None; let mut last_message_id = None; - while let Some(res) = stream.next().await { - match res { - Ok((message_id, update)) => { - i += 1; - let update: Update = update.into_update()?; - if collab.is_none() { - collab = Some(match self.load_collab_full(true).await? { - Some(collab) => collab, - None => { - Collab::new_with_origin(CollabOrigin::Server, self.object_id.clone(), vec![], true) - }, - }) - }; - let collab = collab.as_mut().unwrap(); - collab - .transact_mut() - .apply_update(update) - .map_err(|err| RTProtocolError::YrsApplyUpdate(err.to_string()))?; - last_message_id = Some(message_id); //TODO: shouldn't this happen before decoding? - self.metrics.apply_update_count.inc(); - }, - Err(err) => { - self.metrics.apply_update_failed_count.inc(); - tracing::error!( - "`{}` failed to resolve collab update: {}", - self.object_id, - err - ); - break; - }, - } + for (message_id, update) in updates { + i += 1; + let update: Update = update.into_update()?; + if collab.is_none() { + collab = Some(match self.load_collab_full(true).await? { + Some(collab) => collab, + None => { + Collab::new_with_origin(CollabOrigin::Server, self.object_id.clone(), vec![], true) + }, + }) + }; + let collab = collab.as_mut().unwrap(); + collab + .transact_mut() + .apply_update(update) + .map_err(|err| RTProtocolError::YrsApplyUpdate(err.to_string()))?; + last_message_id = Some(message_id); //TODO: shouldn't this happen before decoding? + self.metrics.apply_update_count.inc(); } // if there were no Redis updates, collab is still not initialized diff --git a/tests/collab/stress_test.rs b/tests/collab/stress_test.rs index 6a2ea446..5e33b01d 100644 --- a/tests/collab/stress_test.rs +++ b/tests/collab/stress_test.rs @@ -46,7 +46,7 @@ async fn run_multiple_text_edits() { // run test scenario let collab = writer.collabs.get(&object_id).unwrap().collab.clone(); - let expected = test_scenario.execute(collab, 10_000).await; + let expected = test_scenario.execute(collab, 50_000).await; // wait for the writer to complete sync writer.wait_object_sync_complete(&object_id).await.unwrap();