From a1546909c3efe7fc7f4e2c3e70e0649eba4bcf58 Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Mon, 14 Oct 2024 08:47:07 +0200 Subject: [PATCH] chore: remove server sync protocol (not used anymore) --- .../src/group/group_init.rs | 63 +-------- .../appflowy-collaborate/src/group/mod.rs | 1 - .../src/group/protocol.rs | 127 ------------------ 3 files changed, 2 insertions(+), 189 deletions(-) delete mode 100644 services/appflowy-collaborate/src/group/protocol.rs diff --git a/services/appflowy-collaborate/src/group/group_init.rs b/services/appflowy-collaborate/src/group/group_init.rs index d95bcf23..a0cb22c3 100644 --- a/services/appflowy-collaborate/src/group/group_init.rs +++ b/services/appflowy-collaborate/src/group/group_init.rs @@ -19,11 +19,7 @@ use collab_rt_protocol::{Message, MessageReader, RTProtocolError, SyncMessage}; use collab_stream::client::CollabRedisStream; use collab_stream::collab_update_sink::{AwarenessUpdateSink, CollabUpdateSink}; use collab_stream::error::StreamError; -use collab_stream::model::{ - AwarenessStreamUpdate, CollabStreamUpdate, CollabUpdateEvent, MessageId, StreamBinary, - UpdateFlags, -}; -use collab_stream::stream_group::StreamGroup; +use collab_stream::model::{AwarenessStreamUpdate, CollabStreamUpdate, MessageId, UpdateFlags}; use dashmap::DashMap; use database::collab::{CollabStorage, GetCollabOrigin}; use database_entity::dto::{ @@ -31,11 +27,9 @@ use database_entity::dto::{ }; use futures::{pin_mut, Sink, Stream}; use futures_util::{SinkExt, StreamExt}; -use std::collections::VecDeque; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; -use tokio::sync::mpsc; use tokio::time::MissedTickBehavior; use tokio_util::sync::CancellationToken; use tracing::{error, info, trace}; @@ -162,6 +156,7 @@ impl CollabGroup { } #[inline] + #[allow(dead_code)] pub fn object_id(&self) -> &str { &self.state.object_id } @@ -754,60 +749,6 @@ impl CollabGroup { } } -struct CollabUpdateStreamingImpl { - sender: mpsc::UnboundedSender>, -} - -impl CollabUpdateStreamingImpl { - async fn new( - workspace_id: &str, - object_id: &str, - collab_redis_stream: &CollabRedisStream, - ) -> Result { - let stream = collab_redis_stream - .collab_update_stream_group(workspace_id, object_id, "collaborate_update_producer") - .await?; - let (sender, receiver) = mpsc::unbounded_channel(); - tokio::spawn(async move { - if let Err(err) = Self::consume_messages(receiver, stream).await { - error!("Failed to consume incoming updates: {}", err); - } - }); - Ok(Self { sender }) - } - - async fn consume_messages( - mut receiver: mpsc::UnboundedReceiver>, - mut stream: StreamGroup, - ) -> Result<(), RealtimeError> { - while let Some(update) = receiver.recv().await { - let mut update_count = 1; - let update = { - let mut updates = VecDeque::new(); - // there may be already more messages inside waiting, try to read them all right away - while let Ok(update) = receiver.try_recv() { - updates.push_back(Update::decode_v1(&update)?); - } - if updates.is_empty() { - update // no following messages - } else { - update_count += updates.len(); - // prepend first update and merge them all together - updates.push_front(Update::decode_v1(&update)?); - Update::merge_updates(updates).encode_v1() - } - }; - - let msg = StreamBinary::try_from(CollabUpdateEvent::UpdateV1 { - encode_update: update, - })?; - stream.insert_messages(vec![msg]).await?; - trace!("Sent cumulative ({}) collab update to redis", update_count); - } - Ok(()) - } -} - pub trait SubscriptionSink: Sink + Send + Sync + Unpin { diff --git a/services/appflowy-collaborate/src/group/mod.rs b/services/appflowy-collaborate/src/group/mod.rs index 0acedf57..0f51acc1 100644 --- a/services/appflowy-collaborate/src/group/mod.rs +++ b/services/appflowy-collaborate/src/group/mod.rs @@ -2,5 +2,4 @@ pub(crate) mod cmd; pub(crate) mod group_init; pub(crate) mod manager; mod plugin; -pub(crate) mod protocol; mod state; diff --git a/services/appflowy-collaborate/src/group/protocol.rs b/services/appflowy-collaborate/src/group/protocol.rs deleted file mode 100644 index ec33792f..00000000 --- a/services/appflowy-collaborate/src/group/protocol.rs +++ /dev/null @@ -1,127 +0,0 @@ -use std::sync::Arc; - -use async_trait::async_trait; -use collab::core::collab::{TransactionExt, TransactionMutExt}; -use collab::core::origin::CollabOrigin; -use tokio::time::Instant; -use yrs::updates::encoder::{Encode, Encoder, EncoderV1}; -use yrs::{ReadTxn, StateVector, Transact}; - -use collab_rt_protocol::CollabSyncProtocol; -use collab_rt_protocol::{ - decode_update, CollabRef, CustomMessage, Message, RTProtocolError, SyncMessage, -}; - -use crate::CollabRealtimeMetrics; - -#[derive(Clone)] -pub struct ServerSyncProtocol { - metrics: Arc, -} - -impl ServerSyncProtocol { - pub fn new(metrics: Arc) -> Self { - Self { metrics } - } -} - -#[async_trait] -impl CollabSyncProtocol for ServerSyncProtocol { - async fn handle_sync_step1( - &self, - collab: &CollabRef, - sv: StateVector, - ) -> Result>, RTProtocolError> { - let (doc_state, state_vector) = { - let lock = collab.read().await; - let collab = (*lock).borrow(); - - let txn = collab.get_awareness().doc().try_transact().map_err(|err| { - RTProtocolError::YrsTransaction(format!("fail to handle sync step1. error: {}", err)) - })?; - - let doc_state = txn.try_encode_state_as_update_v1(&sv).map_err(|err| { - RTProtocolError::YrsEncodeState(format!( - "fail to encode state as update. error: {}\ninit state vector: {:?}\ndocument state: {:#?}", - err, - sv, - txn.store() - )) - })?; - (doc_state, txn.state_vector()) - }; - - // Retrieve the latest document state from the client after they return online from offline editing. - let mut encoder = EncoderV1::new(); - Message::Sync(SyncMessage::SyncStep2(doc_state)).encode(&mut encoder); - - //FIXME: this should never happen as response to sync step 1 from the client, but rather be - // send when a connection is established - Message::Sync(SyncMessage::SyncStep1(state_vector)).encode(&mut encoder); - Ok(Some(encoder.to_vec())) - } - - async fn handle_sync_step2( - &self, - origin: &CollabOrigin, - collab: &CollabRef, - update: Vec, - ) -> Result>, RTProtocolError> { - self.metrics.apply_update_size.observe(update.len() as f64); - let start = Instant::now(); - let result = { - let update = decode_update(update).await?; - let mut lock = collab.write().await; - let collab = (*lock).borrow_mut(); - - let mut txn = collab - .get_awareness() - .doc() - .try_transact_mut_with(origin.clone()) - .map_err(|err| { - RTProtocolError::YrsTransaction(format!("sync step2 transaction acquire: {}", err)) - })?; - txn.try_apply_update(update).map_err(|err| { - RTProtocolError::YrsApplyUpdate(format!( - "sync step2 apply update: {}\ndocument state: {:#?}", - err, - txn.store() - )) - })?; - - // If server can't apply updates sent by client, which means the server is missing some updates - // from the client or the client is missing some updates from the server. - // If the client can't apply broadcast from server, which means the client is missing some - // updates. - match txn.store().pending_update() { - Some(_update) => { - // let state_vector_v1 = update.missing.encode_v1(); - // for the moment, we don't need to send missing updates to the client. passing None - // instead, which will trigger a sync step 0 on client - let state_vector_v1 = txn.state_vector().encode_v1(); - Err(RTProtocolError::MissUpdates { - state_vector_v1: Some(state_vector_v1), - reason: "server miss updates".to_string(), - }) - }, - None => Ok(None), - } - }; - - let elapsed = start.elapsed(); - self - .metrics - .apply_update_time - .observe(elapsed.as_millis() as f64); - - result - } - - async fn handle_custom_message( - &self, - _collab: &CollabRef, - _msg: CustomMessage, - ) -> Result>, RTProtocolError> { - Ok(None) - } -}