diff --git a/Cargo.lock b/Cargo.lock index 6a58f83e..7748a1fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2469,6 +2469,7 @@ dependencies = [ "tokio-stream", "tokio-util", "tracing", + "zstd", ] [[package]] diff --git a/libs/collab-stream/Cargo.toml b/libs/collab-stream/Cargo.toml index c8d50407..d7b1c1da 100644 --- a/libs/collab-stream/Cargo.toml +++ b/libs/collab-stream/Cargo.toml @@ -24,7 +24,7 @@ tokio-util = { version = "0.7" } prost.workspace = true async-stream.workspace = true async-trait.workspace = true - +zstd = "0.13" [dev-dependencies] futures = "0.3.30" diff --git a/libs/collab-stream/src/client.rs b/libs/collab-stream/src/client.rs index ee44d06d..3fe891f5 100644 --- a/libs/collab-stream/src/client.rs +++ b/libs/collab-stream/src/client.rs @@ -38,7 +38,7 @@ impl CollabRedisStream { workspace_id: &str, object_id: &str, ) -> Result, StreamError> { - let lease_key = format!("af_snapshot:lease:{}:{}", workspace_id, object_id); + let lease_key = format!("af:{}:{}:snapshot_lease", workspace_id, object_id); self .connection_manager .lease(lease_key, Self::LEASE_TTL) @@ -101,7 +101,7 @@ impl CollabRedisStream { workspace_id: &str, object_id: &str, since: Option, - ) -> impl Stream> { + ) -> impl Stream> { // use `:` separator as it adheres to Redis naming conventions let mut conn = self.connection_manager.clone(); let stream_key = CollabStreamUpdate::stream_key(workspace_id, object_id); @@ -115,7 +115,7 @@ impl CollabRedisStream { .await?; for (message_id, update) in batch.updates { since = since.max(message_id); - yield update; + yield (message_id, update); } } } diff --git a/libs/collab-stream/src/error.rs b/libs/collab-stream/src/error.rs index 06da1932..b0241a93 100644 --- a/libs/collab-stream/src/error.rs +++ b/libs/collab-stream/src/error.rs @@ -32,6 +32,12 @@ pub enum StreamError { #[error(transparent)] BinCodeSerde(#[from] bincode::Error), + #[error("failed to decode update: {0}")] + UpdateError(#[from] collab::preclude::encoding::read::Error), + + #[error("I/O error: {0}")] + IO(#[from] std::io::Error), + #[error("Internal error: {0}")] Internal(anyhow::Error), } diff --git a/libs/collab-stream/src/model.rs b/libs/collab-stream/src/model.rs index cdaac31a..9f2d6b01 100644 --- a/libs/collab-stream/src/model.rs +++ b/libs/collab-stream/src/model.rs @@ -392,6 +392,20 @@ impl CollabStreamUpdate { pub fn stream_key(workspace_id: &str, object_id: &str) -> String { format!("af:{}:{}:updates", workspace_id, object_id) } + + pub fn into_update(self) -> Result { + let bytes = if self.flags.is_compressed() { + zstd::decode_all(std::io::Cursor::new(self.data))? + } else { + self.data + }; + let update = if self.flags.is_v1_encoded() { + collab::preclude::Update::decode_v1(&bytes)? + } else { + collab::preclude::Update::decode_v2(&bytes)? + }; + Ok(update) + } } pub(crate) struct CollabStreamUpdateBatch { diff --git a/services/appflowy-collaborate/src/group/group_init.rs b/services/appflowy-collaborate/src/group/group_init.rs index ea5914e0..a453ba54 100644 --- a/services/appflowy-collaborate/src/group/group_init.rs +++ b/services/appflowy-collaborate/src/group/group_init.rs @@ -27,7 +27,7 @@ use collab_stream::model::{ use collab_stream::stream_group::StreamGroup; use dashmap::DashMap; use database::collab::{CollabStorage, GetCollabOrigin}; -use database_entity::dto::QueryCollabParams; +use database_entity::dto::{CollabParams, QueryCollabParams}; use futures::{pin_mut, Sink, Stream}; use futures_util::{SinkExt, StreamExt}; use std::collections::VecDeque; @@ -184,7 +184,7 @@ impl CollabGroup { } res = updates.next() => { match res { - Some(Ok(update)) => { + Some(Ok((_message_id, update))) => { Self::handle_inbound_update(&state, update).await; }, Some(Err(err)) => { @@ -866,6 +866,10 @@ struct CollabPersister { } impl CollabPersister { + /// A grace period for prunning Redis collab updates. Instead of deleting all messages we + /// read right away, we give 1min for other potential client to catch up. + pub const GRACE_PERIOD_MS: u64 = 1000 * 60; // 5min + pub fn new( workspace_id: String, object_id: String, @@ -947,30 +951,112 @@ impl CollabPersister { let encoded_collab = self .storage .get_encode_collab(GetCollabOrigin::Server, params, false) - .await?; - let collab = Collab::new_with_source( + .await + .map_err(|err| RealtimeError::Internal(err.into()))?; + let mut collab: Collab = Collab::new_with_source( CollabOrigin::Server, &self.object_id, DataSource::DocStateV1(encoded_collab.doc_state.into()), vec![], - false, + true, // here we use history-remembering version )?; + // 2. consume all Redis updates on top of it (keep redis msg id) - todo!() + let mut last_message_id = None; + let mut stream = self.collab_redis_stream.collab_updates( + &self.workspace_id, + &self.object_id, + None, //TODO: store Redis last msg id somewhere in doc state snapshot and replay from there + ); + let mut tx = collab.transact_mut(); + while let Some(res) = stream.next().await { + match res { + Ok((message_id, update)) => { + let update: Update = update.into_update()?; + tx.apply_update(update) + .map_err(|err| RTProtocolError::YrsApplyUpdate(err.to_string()))?; + last_message_id = Some(message_id); //TODO: shouldn't this happen before decoding? + }, + Err(err) => { + tracing::error!( + "`{}` failed to resolve collab update: {}", + self.object_id, + err + ); + break; + }, + } + } + drop(tx); // apply transaction to compress the state (without GC) + + // now we have the most recent version of the document + let snapshot = Arc::new(CollabSnapshot { + collab, + last_message_id, + }); + if self.temp_collab.swap(Some(snapshot.clone())).is_none() { + // if previous value was none it means that it didn't exist yet or is newer than the persisted + // state. Since we already have a collab with loaded updates in memory, we can as well try + // to save its state + self.save_attempt(snapshot.clone()).await?; + } + + Ok(snapshot) } - async fn save(&self) -> Result<(), RealtimeError> { - // 1. try to acquire lease - if let Some(lease) = self - .collab_redis_stream - .lease(&self.workspace_id, &self.object_id) - .await? - { - // 3. if collab has any changes (any redis updates were applied): - // 4. generate embeddings - // 5. store collab - // 6. prune any redis msg ids older than 5 min. since collab snapshot time - todo!() + async fn save_attempt(&self, snapshot: Arc) -> Result<(), RealtimeError> { + if let Some(last_message_id) = snapshot.last_message_id { + if let Some(mut lease) = self + .collab_redis_stream + .lease(&self.workspace_id, &self.object_id) + .await? + { + let doc_state_full = snapshot + .collab + .transact() + .encode_state_as_update_v1(&StateVector::default()); + let uid = 0; //FIXME: what UID should go there? + let params = CollabParams::new( + &self.object_id, + self.collab_type.clone(), + doc_state_full.clone(), + ); + self + .storage + .insert_or_update_collab(&self.workspace_id, &uid, params, true) + .await + .map_err(|err| RealtimeError::Internal(err.into()))?; + + // document state with historical data has been saved - now do GC + let mut collab: Collab = Collab::new_with_source( + CollabOrigin::Server, + &self.object_id, + DataSource::DocStateV1(doc_state_full), + vec![], + false, // here we use history-pruning version + )?; + let doc_state_gced = collab + .transact() + .encode_state_as_update_v1(&StateVector::default()); + let params = CollabParams::new( + &self.object_id, + self.collab_type.clone(), + doc_state_gced.clone(), + ); + self + .storage + .insert_or_update_collab(&self.workspace_id, &uid, params, true) + .await + .map_err(|err| RealtimeError::Internal(err.into()))?; + + // finally we can drop Redis messages + let message_id = MessageId { + timestamp_ms: last_message_id.timestamp_ms - Self::GRACE_PERIOD_MS, + sequence_number: 0, + }; + self.prune_updates(message_id); + let _ = lease.release().await; + } } Ok(()) } @@ -978,5 +1064,5 @@ impl CollabPersister { pub struct CollabSnapshot { pub collab: Collab, - pub last_msg_id: MessageId, + pub last_message_id: Option, }