From d12086031272d8de9b4324e277d9529cff2a6d5d Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Tue, 17 Dec 2024 04:29:41 +0100 Subject: [PATCH] chore: remove state vector from redis udpates --- libs/collab-stream/src/collab_update_sink.rs | 4 --- libs/collab-stream/src/model.rs | 15 +---------- .../src/collab/cache/collab_cache.rs | 23 +++++++---------- .../src/collab/cache/disk_cache.rs | 3 ++- .../src/collab/cache/mem_cache.rs | 1 + .../src/group/group_init.rs | 25 +++++++++++++------ 6 files changed, 31 insertions(+), 40 deletions(-) diff --git a/libs/collab-stream/src/collab_update_sink.rs b/libs/collab-stream/src/collab_update_sink.rs index de2b2834..db1a82fd 100644 --- a/libs/collab-stream/src/collab_update_sink.rs +++ b/libs/collab-stream/src/collab_update_sink.rs @@ -1,6 +1,5 @@ use crate::error::StreamError; use crate::model::{AwarenessStreamUpdate, CollabStreamUpdate, MessageId}; -use collab::preclude::updates::encoder::Encode; use redis::aio::ConnectionManager; use redis::cmd; use tokio::sync::Mutex; @@ -19,7 +18,6 @@ impl CollabUpdateSink { } pub async fn send(&self, msg: &CollabStreamUpdate) -> Result { - let sv = msg.state_vector.encode_v1(); let mut lock = self.conn.lock().await; let msg_id: MessageId = cmd("XADD") .arg(&self.stream_key) @@ -28,8 +26,6 @@ impl CollabUpdateSink { .arg(msg.flags) .arg("sender") .arg(msg.sender.to_string()) - .arg("sv") - .arg(&sv) .arg("data") .arg(&*msg.data) .query_async(&mut *lock) diff --git a/libs/collab-stream/src/model.rs b/libs/collab-stream/src/model.rs index a5601fbc..d74eb77a 100644 --- a/libs/collab-stream/src/model.rs +++ b/libs/collab-stream/src/model.rs @@ -2,7 +2,6 @@ use crate::error::{internal, StreamError}; use bytes::Bytes; use collab::core::origin::{CollabClient, CollabOrigin}; use collab::preclude::updates::decoder::Decode; -use collab::preclude::StateVector; use collab_entity::proto::collab::collab_update_event::Update; use collab_entity::{proto, CollabType}; use prost::Message; @@ -369,13 +368,12 @@ impl TryFrom for StreamBinary { pub struct CollabStreamUpdate { pub data: Vec, // yrs::Update::encode_v1 - pub state_vector: StateVector, pub sender: CollabOrigin, pub flags: UpdateFlags, } impl CollabStreamUpdate { - pub fn new(data: B, state_vector: StateVector, sender: CollabOrigin, flags: F) -> Self + pub fn new(data: B, sender: CollabOrigin, flags: F) -> Self where B: Into>, F: Into, @@ -383,7 +381,6 @@ impl CollabStreamUpdate { CollabStreamUpdate { data: data.into(), sender, - state_vector, flags: flags.into(), } } @@ -420,15 +417,6 @@ impl TryFrom> for CollabStreamUpdate { collab_origin_from_str(&raw_origin)? }, }; - let state_vector = match fields.get("sv") { - Some(value) => { - let bytes = Bytes::from_redis_value(value)?; - let state_vector = - StateVector::decode_v1(&bytes).map_err(|err| internal(err.to_string()))?; - Ok(state_vector) - }, - None => Err(internal("expecting field `sv`")), - }?; let flags = match fields.get("flags") { None => UpdateFlags::default(), Some(flags) => u8::from_redis_value(flags).unwrap_or(0).into(), @@ -440,7 +428,6 @@ impl TryFrom> for CollabStreamUpdate { Ok(CollabStreamUpdate { data, sender, - state_vector, flags, }) } diff --git a/services/appflowy-collaborate/src/collab/cache/collab_cache.rs b/services/appflowy-collaborate/src/collab/cache/collab_cache.rs index b90f9b7c..92e69936 100644 --- a/services/appflowy-collaborate/src/collab/cache/collab_cache.rs +++ b/services/appflowy-collaborate/src/collab/cache/collab_cache.rs @@ -1,4 +1,6 @@ +use bytes::Bytes; use collab::entity::EncodedCollab; +use collab_entity::CollabType; use futures_util::{stream, StreamExt}; use itertools::{Either, Itertools}; use sqlx::{PgPool, Transaction}; @@ -186,6 +188,11 @@ impl CollabCache { // when the data is written to the disk cache but fails to be written to the memory cache // we log the error and continue. + self.cache_collab(object_id, collab_type, encode_collab_data); + Ok(()) + } + + fn cache_collab(&self, object_id: String, collab_type: CollabType, encode_collab_data: Bytes) { let mem_cache = self.mem_cache.clone(); tokio::spawn(async move { if let Err(err) = mem_cache @@ -203,20 +210,6 @@ impl CollabCache { ); } }); - - Ok(()) - } - - pub async fn get_encode_collab_from_disk( - &self, - workspace_id: &str, - query: QueryCollab, - ) -> Result { - let encode_collab = self - .disk_cache - .get_collab_encoded_from_disk(workspace_id, query) - .await?; - Ok(encode_collab) } pub async fn insert_encode_collab_to_disk( @@ -225,10 +218,12 @@ impl CollabCache { uid: &i64, params: CollabParams, ) -> Result<(), AppError> { + let p = params.clone(); self .disk_cache .upsert_collab(workspace_id, uid, params) .await?; + self.cache_collab(p.object_id, p.collab_type, p.encoded_collab_v1); Ok(()) } diff --git a/services/appflowy-collaborate/src/collab/cache/disk_cache.rs b/services/appflowy-collaborate/src/collab/cache/disk_cache.rs index ca342e43..aa095970 100644 --- a/services/appflowy-collaborate/src/collab/cache/disk_cache.rs +++ b/services/appflowy-collaborate/src/collab/cache/disk_cache.rs @@ -357,7 +357,7 @@ impl CollabDiskCache { while let Err(err) = s3.put_blob(&key, doc_state.clone().into(), None).await { match err { AppError::ServiceTemporaryUnavailable(err) if retries > 0 => { - tracing::info!( + tracing::debug!( "S3 service is temporarily unavailable: {}. Remaining retries: {}", err, retries @@ -371,6 +371,7 @@ impl CollabDiskCache { }, } } + tracing::trace!("saved collab to S3: {}", key); Ok(()) } diff --git a/services/appflowy-collaborate/src/collab/cache/mem_cache.rs b/services/appflowy-collaborate/src/collab/cache/mem_cache.rs index c79f8727..b74612ab 100644 --- a/services/appflowy-collaborate/src/collab/cache/mem_cache.rs +++ b/services/appflowy-collaborate/src/collab/cache/mem_cache.rs @@ -154,6 +154,7 @@ impl CollabMemCache { timestamp: i64, expiration_seconds: Option, ) -> redis::RedisResult<()> { + tracing::trace!("insert collab {} to memory cache", object_id); self .insert_data_with_timestamp(object_id, data, timestamp, expiration_seconds) .await diff --git a/services/appflowy-collaborate/src/group/group_init.rs b/services/appflowy-collaborate/src/group/group_init.rs index 67fb47a7..c39b9b96 100644 --- a/services/appflowy-collaborate/src/group/group_init.rs +++ b/services/appflowy-collaborate/src/group/group_init.rs @@ -203,9 +203,21 @@ impl CollabGroup { async fn handle_inbound_update(state: &CollabGroupState, update: CollabStreamUpdate) { // update state vector based on incoming message - let mut sv = state.state_vector.write().await; - sv.merge(update.state_vector); - drop(sv); + match Update::decode_v1(&update.data) { + Ok(update) => state + .state_vector + .write() + .await + .merge(update.state_vector()), + Err(err) => { + tracing::error!( + "received malformed update for collab `{}`: {}", + state.object_id, + err + ); + return; + }, + } let seq_num = state.seq_no.fetch_add(1, Ordering::SeqCst) + 1; tracing::trace!( @@ -740,10 +752,9 @@ impl CollabGroup { tracing::debug!("subscriber {} send update with missing data", origin); Ok(Some(msg.encode_v1())) } else { - let upper_state_vector = decoded_update.state_vector(); state .persister - .send_update(origin.clone(), update, upper_state_vector) + .send_update(origin.clone(), update) .await .map_err(|err| RTProtocolError::Internal(err.into()))?; let elapsed = start.elapsed(); @@ -940,11 +951,10 @@ impl CollabPersister { &self, sender: CollabOrigin, update: Vec, - state_vector: StateVector, ) -> Result { let len = update.len(); // send updates to redis queue - let update = CollabStreamUpdate::new(update, state_vector, sender, UpdateFlags::default()); + let update = CollabStreamUpdate::new(update, sender, UpdateFlags::default()); let msg_id = self.update_sink.send(&update).await?; tracing::trace!( "persisted update from {} ({} bytes) - msg id: {}", @@ -979,6 +989,7 @@ impl CollabPersister { /// Loads collab without its history. Used for handling y-sync protocol messages. async fn load_compact(&self) -> Result { + tracing::trace!("requested to load compact collab {}", self.object_id); // 1. Try to load the latest snapshot from storage let start = Instant::now(); let mut collab = match self.load_collab_full(false).await? {