From e86c9de31658e9b35019136c20522a15857ee9d7 Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Wed, 30 Oct 2024 06:53:31 +0100 Subject: [PATCH] chore: when saving collab snapshot, do not load it unless there are pending updates in redis --- libs/collab-stream/src/client.rs | 17 +-- .../src/group/group_init.rs | 117 ++++++++++++++---- 2 files changed, 98 insertions(+), 36 deletions(-) diff --git a/libs/collab-stream/src/client.rs b/libs/collab-stream/src/client.rs index 881b867a..0ed2f1f5 100644 --- a/libs/collab-stream/src/client.rs +++ b/libs/collab-stream/src/client.rs @@ -100,7 +100,7 @@ impl CollabRedisStream { workspace_id: &str, object_id: &str, since: Option, - live: bool, + keep_alive: bool, ) -> impl Stream> { let mut conn = self.connection_manager.clone(); let stream_key = CollabStreamUpdate::stream_key(workspace_id, object_id); @@ -112,22 +112,17 @@ impl CollabRedisStream { // reset once we get new data onboard let mut backoff = ExponentialBackoff::new(Duration::from_millis(100), Duration::from_secs(10)); async_stream::try_stream! { - let until = if live { - None - } else { - let mut reply: StreamRangeReply = conn.xrevrange_count(&stream_key, "+", "-", 1).await?; - match reply.ids.pop().map(|stream_id| stream_id.id) { - None => Some(MessageId::default()), - Some(id) => Some(MessageId::try_from(id)?), - } - }; - while until.is_none() || since < until.unwrap() { + loop { let last_id = since.to_string(); let batch: CollabStreamUpdateBatch = conn .xread_options(&[&stream_key], &[&last_id], &read_options) .await?; if batch.updates.is_empty() { + if !keep_alive { + // if stream is not set to keep alive, we finish it once we get all current messages + return; + } backoff.sleep().await; // stream has no new messages, phase out } else { backoff.reset(); diff --git a/services/appflowy-collaborate/src/group/group_init.rs b/services/appflowy-collaborate/src/group/group_init.rs index c367ba99..457557c7 100644 --- a/services/appflowy-collaborate/src/group/group_init.rs +++ b/services/appflowy-collaborate/src/group/group_init.rs @@ -327,7 +327,7 @@ impl CollabGroup { } pub async fn encode_collab(&self) -> Result { - let snapshot = self.state.persister.load().await?; + let snapshot = self.state.persister.load_compact().await?; let encode_collab = snapshot.collab.encode_collab_v1(|collab| { self .state @@ -635,7 +635,7 @@ impl CollabGroup { tracing::debug!("loading collab {}", state.object_id); let snapshot = state .persister - .load() + .load_compact() .await .map_err(|err| RTProtocolError::Internal(err.into()))?; @@ -922,20 +922,12 @@ impl CollabPersister { Ok(msg_id) } - async fn load(&self) -> Result { - self.load_internal(false).await - } - - async fn load_internal(&self, skip_gc: bool) -> Result { + /// Loads collab without its history. Used for handling y-sync protocol messages. + async fn load_compact(&self) -> Result { // 1. Try to load the latest snapshot from storage - let mut collab = match self.load_collab_full(skip_gc).await? { + let mut collab = match self.load_collab_full(false).await? { Some(collab) => collab, - None => Collab::new_with_origin( - CollabOrigin::Server, - self.object_id.clone(), - vec![], - skip_gc, - ), + None => Collab::new_with_origin(CollabOrigin::Server, self.object_id.clone(), vec![], false), }; // 2. consume all Redis updates on top of it (keep redis msg id) @@ -968,11 +960,10 @@ impl CollabPersister { }, } } - drop(tx); // apply transaction to compress the state (without GC) + drop(tx); tracing::trace!( - "loaded collab {} state: {} replaying {} updates", + "loaded collab compact state: {} replaying {} updates", self.object_id, - if skip_gc { "full" } else { "compact" }, i ); @@ -984,14 +975,90 @@ impl CollabPersister { Ok(snapshot) } + /// Returns a collab state (with GC turned off), but only if there were any pending updates + /// waiting to be merged into main document state. + async fn load_if_changed(&self) -> Result, RealtimeError> { + // 1. load pending Redis updates + let stream = self.collab_redis_stream.collab_updates( + &self.workspace_id, + &self.object_id, + None, //TODO: store Redis last msg id somewhere in doc state snapshot and replay from there + false, // read only data currently existing in the stream + ); + pin_mut!(stream); + + let mut i = 0; + let mut collab = None; + let mut last_message_id = None; + while let Some(res) = stream.next().await { + match res { + Ok((message_id, update)) => { + i += 1; + let update: Update = update.into_update()?; + if collab.is_none() { + collab = Some(match self.load_collab_full(true).await? { + Some(collab) => collab, + None => { + Collab::new_with_origin(CollabOrigin::Server, self.object_id.clone(), vec![], true) + }, + }) + }; + let collab = collab.as_mut().unwrap(); + collab + .transact_mut() + .apply_update(update) + .map_err(|err| RTProtocolError::YrsApplyUpdate(err.to_string()))?; + last_message_id = Some(message_id); //TODO: shouldn't this happen before decoding? + }, + Err(err) => { + tracing::error!( + "`{}` failed to resolve collab update: {}", + self.object_id, + err + ); + break; + }, + } + } + + // if there were no Redis updates, collab is still not initialized + match collab { + Some(collab) => { + tracing::trace!( + "loaded collab full state: {} replaying {} updates", + self.object_id, + i + ); + { + let tx = collab.transact(); + if tx.store().pending_update().is_some() || tx.store().pending_ds().is_some() { + tracing::trace!( + "loaded collab {} is incomplete: has pending data", + self.object_id + ); + } + } + Ok(Some(CollabSnapshot { + collab, + last_message_id, + })) + }, + None => Ok(None), + } + } + async fn save(&self) -> Result<(), RealtimeError> { - tracing::debug!("requesting save for collab {}", self.object_id); - let mut snapshot = self.load_internal(true).await?; - if let Some(message_id) = &snapshot.last_message_id { - // non-nil message_id means that we had to update the most recent collab state snapshot - // with new updates from Redis. This means that our snapshot state is newer than the last - // persisted one in the database - self.save_attempt(&mut snapshot.collab, message_id).await?; + // load collab but only if there were pending updates in Redis + if let Some(mut snapshot) = self.load_if_changed().await? { + tracing::debug!("requesting save for collab {}", self.object_id); + if let Some(message_id) = snapshot.last_message_id { + // non-nil message_id means that we had to update the most recent collab state snapshot + // with new updates from Redis. This means that our snapshot state is newer than the last + // persisted one in the database + self.save_attempt(&mut snapshot.collab, message_id).await?; + } + } else { + tracing::trace!("collab {} state has not changed", self.object_id); } Ok(()) } @@ -1003,7 +1070,7 @@ impl CollabPersister { async fn save_attempt( &self, collab: &mut Collab, - message_id: &MessageId, + message_id: MessageId, ) -> Result<(), RealtimeError> { if !collab.get_awareness().doc().skip_gc() { return Err(RealtimeError::UnexpectedData(