diff --git a/libs/client-api/src/collab_sync/collab_stream.rs b/libs/client-api/src/collab_sync/collab_stream.rs index fdfadf37..a3094509 100644 --- a/libs/client-api/src/collab_sync/collab_stream.rs +++ b/libs/client-api/src/collab_sync/collab_stream.rs @@ -20,7 +20,7 @@ use yrs::ReadTxn; use client_api_entity::{validate_data_for_folder, CollabType}; use collab_rt_entity::{AckCode, ClientCollabMessage, ServerCollabMessage, ServerInit, UpdateSync}; use collab_rt_protocol::{ - handle_message_follow_protocol, ClientSyncProtocol, Message, MessageReader, SyncMessage, + ClientSyncProtocol, CollabSyncProtocol, Message, MessageReader, SyncMessage, }; use crate::af_spawn; @@ -344,8 +344,9 @@ where .map_err(|err| SyncError::OverrideWithIncorrectData(err.to_string()))?; } - if let Some(return_payload) = - handle_message_follow_protocol(&message_origin, &ClientSyncProtocol, &collab, msg).await? + if let Some(return_payload) = ClientSyncProtocol + .handle_message(&message_origin, &collab, msg) + .await? { let object_id = sync_object.object_id.clone(); sink.queue_msg(|msg_id| { diff --git a/libs/collab-rt-protocol/src/protocol.rs b/libs/collab-rt-protocol/src/protocol.rs index 956a980e..936d9a89 100644 --- a/libs/collab-rt-protocol/src/protocol.rs +++ b/libs/collab-rt-protocol/src/protocol.rs @@ -1,6 +1,7 @@ use std::borrow::BorrowMut; use std::sync::Arc; +use async_trait::async_trait; use collab::core::awareness::{Awareness, AwarenessUpdate}; use collab::core::collab::{TransactionExt, TransactionMutExt}; use collab::core::origin::CollabOrigin; @@ -34,6 +35,8 @@ use crate::message::{CustomMessage, Message, RTProtocolError, SyncMessage, SyncM /// A implementation of [CollabSyncProtocol]. #[derive(Clone)] pub struct ClientSyncProtocol; + +#[async_trait] impl CollabSyncProtocol for ClientSyncProtocol { fn check(&self, encoder: &mut E, last_sync_at: i64) -> Result<(), RTProtocolError> { let meta = SyncMeta { last_sync_at }; @@ -43,13 +46,17 @@ impl CollabSyncProtocol for ClientSyncProtocol { /// Handle reply for a sync-step-1 send from this replica previously. By default just apply /// an update to current `awareness` document instance. - fn handle_sync_step2( + async fn handle_sync_step2( &self, origin: &CollabOrigin, - awareness: &mut Awareness, - update: Update, - ) -> Result<(), RTProtocolError> { - let mut txn = awareness + collab: &CollabRef, + update: Vec, + ) -> Result>, RTProtocolError> { + let update = decode_update(update).await?; + let mut lock = collab.write().await; + let collab = (*lock).borrow_mut(); + let mut txn = collab + .get_awareness() .doc() .try_transact_mut_with(origin.clone()) .map_err(|err| { @@ -77,12 +84,41 @@ impl CollabSyncProtocol for ClientSyncProtocol { reason: "client miss updates".to_string(), }) }, - None => Ok(()), + None => Ok(None), } } } +pub type CollabRef = Arc + Send + Sync + 'static>>; + +#[async_trait] pub trait CollabSyncProtocol { + /// Handles incoming messages from the client/server + async fn handle_message( + &self, + message_origin: &CollabOrigin, + collab: &CollabRef, + msg: Message, + ) -> Result>, RTProtocolError> { + match msg { + Message::Sync(msg) => match msg { + SyncMessage::SyncStep1(sv) => self.handle_sync_step1(collab, sv).await, + SyncMessage::SyncStep2(update) => { + self.handle_sync_step2(message_origin, collab, update).await + }, + SyncMessage::Update(update) => self.handle_update(message_origin, collab, update).await, + }, + Message::Auth(reason) => self.handle_auth(collab, reason).await, + //FIXME: where is the QueryAwareness protocol? + Message::Awareness(update) => { + self + .handle_awareness_update(message_origin, collab, update) + .await + }, + Message::Custom(msg) => self.handle_custom_message(collab, msg).await, + } + } + fn check(&self, _encoder: &mut E, _last_sync_at: i64) -> Result<(), RTProtocolError> { Ok(()) } @@ -121,22 +157,27 @@ pub trait CollabSyncProtocol { /// Given a [StateVector] of a remote side, calculate missing /// updates. Returns a sync-step-2 message containing a calculated update. - fn handle_sync_step1( + async fn handle_sync_step1( &self, - awareness: &Awareness, + collab: &CollabRef, sv: StateVector, ) -> Result>, RTProtocolError> { - let txn = awareness.doc().try_transact().map_err(|err| { - RTProtocolError::YrsTransaction(format!("fail to handle sync step1. error: {}", err)) - })?; - let update = txn.try_encode_state_as_update_v1(&sv).map_err(|err| { - RTProtocolError::YrsEncodeState(format!( - "fail to encode state as update. error: {}\ninit state vector: {:?}\ndocument state: {:#?}", - err, - sv, - txn.store() - )) - })?; + // calculate missing updates base on the input state vector + let update = { + let lock = collab.read().await; + let collab = lock.borrow(); + let txn = collab.get_awareness().doc().try_transact().map_err(|err| { + RTProtocolError::YrsTransaction(format!("fail to handle sync step1. error: {}", err)) + })?; + txn.try_encode_state_as_update_v1(&sv).map_err(|err| { + RTProtocolError::YrsEncodeState(format!( + "fail to encode state as update. error: {}\ninit state vector: {:?}\ndocument state: {:#?}", + err, + sv, + txn.store() + )) + })? + }; Ok(Some( Message::Sync(SyncMessage::SyncStep2(update)).encode_v1(), )) @@ -144,27 +185,27 @@ pub trait CollabSyncProtocol { /// Handle reply for a sync-step-1 send from this replica previously. By default just apply /// an update to current `awareness` document instance. - fn handle_sync_step2( + async fn handle_sync_step2( &self, origin: &CollabOrigin, - awareness: &mut Awareness, - update: Update, - ) -> Result<(), RTProtocolError>; + collab: &CollabRef, + update: Vec, + ) -> Result>, RTProtocolError>; /// Handle continuous update send from the client. By default just apply an update to a current /// `awareness` document instance. - fn handle_update( + async fn handle_update( &self, origin: &CollabOrigin, - awareness: &mut Awareness, - update: Update, - ) -> Result<(), RTProtocolError> { - self.handle_sync_step2(origin, awareness, update) + collab: &CollabRef, + update: Vec, + ) -> Result>, RTProtocolError> { + self.handle_sync_step2(origin, collab, update).await } - fn handle_auth( + async fn handle_auth( &self, - _awareness: &Awareness, + _collab: &CollabRef, deny_reason: Option, ) -> Result>, RTProtocolError> { if let Some(reason) = deny_reason { @@ -176,19 +217,21 @@ pub trait CollabSyncProtocol { /// Reply to awareness query or just incoming [AwarenessUpdate], where current `awareness` /// instance is being updated with incoming data. - fn handle_awareness_update( + async fn handle_awareness_update( &self, _message_origin: &CollabOrigin, - awareness: &mut Awareness, + collab: &CollabRef, update: AwarenessUpdate, ) -> Result>, RTProtocolError> { - awareness.apply_update(update)?; + let mut lock = collab.write().await; + let collab = (*lock).borrow_mut(); + collab.get_awareness().apply_update(update)?; Ok(None) } - fn handle_custom_message( + async fn handle_custom_message( &self, - _awareness: &mut Awareness, + _collab: &CollabRef, _msg: CustomMessage, ) -> Result>, RTProtocolError> { Ok(None) @@ -198,7 +241,7 @@ pub trait CollabSyncProtocol { const LARGE_UPDATE_THRESHOLD: usize = 1024 * 1024; // 1MB #[inline] -async fn decode_update(update: Vec) -> Result { +pub async fn decode_update(update: Vec) -> Result { let update = if update.len() > LARGE_UPDATE_THRESHOLD { spawn_blocking(move || Update::decode_v1(&update)) .await @@ -208,56 +251,3 @@ async fn decode_update(update: Vec) -> Result { }?; Ok(update) } - -/// Handles incoming messages from the client/server -pub async fn handle_message_follow_protocol

( - message_origin: &CollabOrigin, - protocol: &P, - collab: &Arc + Send + Sync + 'static>>, - msg: Message, -) -> Result>, RTProtocolError> -where - P: CollabSyncProtocol, -{ - match msg { - Message::Sync(msg) => match msg { - SyncMessage::SyncStep1(sv) => { - // calculate missing updates base on the input state vector - let lock = collab.read().await; - let collab = lock.borrow(); - let update = protocol.handle_sync_step1(collab.get_awareness(), sv)?; - Ok(update) - }, - SyncMessage::SyncStep2(update) => { - let update = decode_update(update).await?; - let mut lock = collab.write().await; - let collab = (*lock).borrow_mut(); - protocol.handle_sync_step2(message_origin, collab.get_mut_awareness(), update)?; - Ok(None) - }, - SyncMessage::Update(update) => { - let update = decode_update(update).await?; - let mut lock = collab.write().await; - let collab = (*lock).borrow_mut(); - protocol.handle_update(message_origin, collab.get_mut_awareness(), update)?; - Ok(None) - }, - }, - Message::Auth(reason) => { - let lock = collab.read().await; - let collab = lock.borrow(); - protocol.handle_auth(collab.get_awareness(), reason) - }, - //FIXME: where is the QueryAwareness protocol? - Message::Awareness(update) => { - let mut lock = collab.write().await; - let collab = (*lock).borrow_mut(); - protocol.handle_awareness_update(message_origin, collab.get_mut_awareness(), update) - }, - Message::Custom(msg) => { - let mut lock = collab.write().await; - let collab = (*lock).borrow_mut(); - protocol.handle_custom_message(collab.get_mut_awareness(), msg) - }, - } -} diff --git a/services/appflowy-collaborate/src/group/broadcast.rs b/services/appflowy-collaborate/src/group/broadcast.rs index eced8274..d2cf37f3 100644 --- a/services/appflowy-collaborate/src/group/broadcast.rs +++ b/services/appflowy-collaborate/src/group/broadcast.rs @@ -22,8 +22,8 @@ use collab_rt_entity::{AckCode, MsgId}; use collab_rt_entity::{ AwarenessSync, BroadcastSync, ClientCollabMessage, CollabAck, CollabMessage, }; -use collab_rt_protocol::{handle_message_follow_protocol, RTProtocolError, SyncMessage}; -use collab_rt_protocol::{Message, MessageReader, MSG_SYNC, MSG_SYNC_UPDATE}; +use collab_rt_protocol::{CollabSyncProtocol, Message, MessageReader, MSG_SYNC, MSG_SYNC_UPDATE}; +use collab_rt_protocol::{RTProtocolError, SyncMessage}; use crate::error::RealtimeError; use crate::group::group_init::EditState; @@ -279,7 +279,7 @@ async fn handle_client_messages( message_map: MessageByObjectId, sink: &mut Sink, collab: Arc + Send + Sync + 'static>>, - metrics_calculate: &CollabRealtimeMetrics, + metrics_calculate: &Arc, edit_state: &Arc, ) where Sink: SinkExt + Unpin + 'static, @@ -337,7 +337,7 @@ async fn handle_one_client_message( object_id: &str, collab_msg: &ClientCollabMessage, collab: &Arc + Send + Sync + 'static>>, - metrics_calculate: &CollabRealtimeMetrics, + metrics_calculate: &Arc, edit_state: &Arc, ) -> Result { let msg_id = collab_msg.msg_id(); @@ -382,7 +382,7 @@ async fn handle_one_message_payload( msg_id: MsgId, payload: &Bytes, collab: &Arc + Send + Sync + 'static>>, - metrics_calculate: &CollabRealtimeMetrics, + metrics_calculate: &Arc, edit_state: &Arc, ) -> Result { let payload = payload.clone(); @@ -416,7 +416,7 @@ async fn handle_message( payload: &Bytes, message_origin: &CollabOrigin, collab: &Arc + Send + Sync + 'static>>, - metrics_calculate: &CollabRealtimeMetrics, + metrics_calculate: &Arc, object_id: &str, msg_id: MsgId, edit_state: &Arc, @@ -430,7 +430,9 @@ async fn handle_message( match msg { Ok(msg) => { is_sync_step2 = matches!(msg, Message::Sync(SyncMessage::SyncStep2(_))); - match handle_message_follow_protocol(message_origin, &ServerSyncProtocol, collab, msg).await + match ServerSyncProtocol::new(metrics_calculate.clone()) + .handle_message(message_origin, collab, msg) + .await { Ok(payload) => { metrics_calculate.apply_update_count.inc(); diff --git a/services/appflowy-collaborate/src/group/protocol.rs b/services/appflowy-collaborate/src/group/protocol.rs index 28bdf5dc..ec33792f 100644 --- a/services/appflowy-collaborate/src/group/protocol.rs +++ b/services/appflowy-collaborate/src/group/protocol.rs @@ -1,85 +1,125 @@ -use collab::core::awareness::Awareness; +use std::sync::Arc; + +use async_trait::async_trait; use collab::core::collab::{TransactionExt, TransactionMutExt}; use collab::core::origin::CollabOrigin; +use tokio::time::Instant; use yrs::updates::encoder::{Encode, Encoder, EncoderV1}; -use yrs::{ReadTxn, StateVector, Transact, Update}; +use yrs::{ReadTxn, StateVector, Transact}; use collab_rt_protocol::CollabSyncProtocol; -use collab_rt_protocol::{CustomMessage, Message, RTProtocolError, SyncMessage}; +use collab_rt_protocol::{ + decode_update, CollabRef, CustomMessage, Message, RTProtocolError, SyncMessage, +}; + +use crate::CollabRealtimeMetrics; #[derive(Clone)] -pub struct ServerSyncProtocol; +pub struct ServerSyncProtocol { + metrics: Arc, +} + +impl ServerSyncProtocol { + pub fn new(metrics: Arc) -> Self { + Self { metrics } + } +} + +#[async_trait] impl CollabSyncProtocol for ServerSyncProtocol { - fn handle_sync_step1( + async fn handle_sync_step1( &self, - awareness: &Awareness, + collab: &CollabRef, sv: StateVector, ) -> Result>, RTProtocolError> { - let txn = awareness.doc().try_transact().map_err(|err| { - RTProtocolError::YrsTransaction(format!("fail to handle sync step1. error: {}", err)) - })?; + let (doc_state, state_vector) = { + let lock = collab.read().await; + let collab = (*lock).borrow(); - let client_step2_update = txn.try_encode_state_as_update_v1(&sv).map_err(|err| { - RTProtocolError::YrsEncodeState(format!( + let txn = collab.get_awareness().doc().try_transact().map_err(|err| { + RTProtocolError::YrsTransaction(format!("fail to handle sync step1. error: {}", err)) + })?; + + let doc_state = txn.try_encode_state_as_update_v1(&sv).map_err(|err| { + RTProtocolError::YrsEncodeState(format!( "fail to encode state as update. error: {}\ninit state vector: {:?}\ndocument state: {:#?}", err, sv, txn.store() )) - })?; + })?; + (doc_state, txn.state_vector()) + }; // Retrieve the latest document state from the client after they return online from offline editing. let mut encoder = EncoderV1::new(); - Message::Sync(SyncMessage::SyncStep2(client_step2_update)).encode(&mut encoder); + Message::Sync(SyncMessage::SyncStep2(doc_state)).encode(&mut encoder); //FIXME: this should never happen as response to sync step 1 from the client, but rather be // send when a connection is established - Message::Sync(SyncMessage::SyncStep1(txn.state_vector())).encode(&mut encoder); + Message::Sync(SyncMessage::SyncStep1(state_vector)).encode(&mut encoder); Ok(Some(encoder.to_vec())) } - fn handle_sync_step2( + async fn handle_sync_step2( &self, origin: &CollabOrigin, - awareness: &mut Awareness, - update: Update, - ) -> Result<(), RTProtocolError> { - let mut txn = awareness - .doc() - .try_transact_mut_with(origin.clone()) - .map_err(|err| { - RTProtocolError::YrsTransaction(format!("sync step2 transaction acquire: {}", err)) - })?; - txn.try_apply_update(update).map_err(|err| { - RTProtocolError::YrsApplyUpdate(format!( - "sync step2 apply update: {}\ndocument state: {:#?}", - err, - txn.store() - )) - })?; + collab: &CollabRef, + update: Vec, + ) -> Result>, RTProtocolError> { + self.metrics.apply_update_size.observe(update.len() as f64); + let start = Instant::now(); + let result = { + let update = decode_update(update).await?; + let mut lock = collab.write().await; + let collab = (*lock).borrow_mut(); - // If server can't apply updates sent by client, which means the server is missing some updates - // from the client or the client is missing some updates from the server. - // If the client can't apply broadcast from server, which means the client is missing some - // updates. - match txn.store().pending_update() { - Some(_update) => { - // let state_vector_v1 = update.missing.encode_v1(); - // for the moment, we don't need to send missing updates to the client. passing None - // instead, which will trigger a sync step 0 on client - let state_vector_v1 = txn.state_vector().encode_v1(); - Err(RTProtocolError::MissUpdates { - state_vector_v1: Some(state_vector_v1), - reason: "server miss updates".to_string(), - }) - }, - None => Ok(()), - } + let mut txn = collab + .get_awareness() + .doc() + .try_transact_mut_with(origin.clone()) + .map_err(|err| { + RTProtocolError::YrsTransaction(format!("sync step2 transaction acquire: {}", err)) + })?; + txn.try_apply_update(update).map_err(|err| { + RTProtocolError::YrsApplyUpdate(format!( + "sync step2 apply update: {}\ndocument state: {:#?}", + err, + txn.store() + )) + })?; + + // If server can't apply updates sent by client, which means the server is missing some updates + // from the client or the client is missing some updates from the server. + // If the client can't apply broadcast from server, which means the client is missing some + // updates. + match txn.store().pending_update() { + Some(_update) => { + // let state_vector_v1 = update.missing.encode_v1(); + // for the moment, we don't need to send missing updates to the client. passing None + // instead, which will trigger a sync step 0 on client + let state_vector_v1 = txn.state_vector().encode_v1(); + Err(RTProtocolError::MissUpdates { + state_vector_v1: Some(state_vector_v1), + reason: "server miss updates".to_string(), + }) + }, + None => Ok(None), + } + }; + + let elapsed = start.elapsed(); + self + .metrics + .apply_update_time + .observe(elapsed.as_millis() as f64); + + result } - fn handle_custom_message( + async fn handle_custom_message( &self, - _awareness: &mut Awareness, + _collab: &CollabRef, _msg: CustomMessage, ) -> Result>, RTProtocolError> { Ok(None) diff --git a/services/appflowy-collaborate/src/group/state.rs b/services/appflowy-collaborate/src/group/state.rs index ab1f15e0..59b1e8e6 100644 --- a/services/appflowy-collaborate/src/group/state.rs +++ b/services/appflowy-collaborate/src/group/state.rs @@ -14,7 +14,7 @@ use crate::error::RealtimeError; use crate::group::group_init::CollabGroup; use crate::metrics::CollabRealtimeMetrics; -#[derive(Default, Clone)] +#[derive(Clone)] pub(crate) struct GroupManagementState { group_by_object_id: Arc>>, /// Keep track of all [Collab] objects that a user is subscribed to. diff --git a/services/appflowy-collaborate/src/metrics.rs b/services/appflowy-collaborate/src/metrics.rs index 2e8e6f22..3a37c40c 100644 --- a/services/appflowy-collaborate/src/metrics.rs +++ b/services/appflowy-collaborate/src/metrics.rs @@ -2,12 +2,13 @@ use std::sync::Arc; use std::time::Duration; use prometheus_client::metrics::gauge::Gauge; +use prometheus_client::metrics::histogram::Histogram; use prometheus_client::registry::Registry; use tokio::time::interval; use database::collab::CollabStorage; -#[derive(Clone, Default)] +#[derive(Clone)] pub struct CollabRealtimeMetrics { pub(crate) connected_users: Gauge, pub(crate) total_success_get_encode_collab_from_redis: Gauge, @@ -20,10 +21,14 @@ pub struct CollabRealtimeMetrics { pub(crate) apply_update_failed_count: Gauge, pub(crate) acquire_collab_lock_count: Gauge, pub(crate) acquire_collab_lock_fail_count: Gauge, + /// How long it takes to apply update in milliseconds. + pub(crate) apply_update_time: Histogram, + /// How big the update is in bytes. + pub(crate) apply_update_size: Histogram, } impl CollabRealtimeMetrics { - fn init() -> Self { + fn new() -> Self { Self { connected_users: Gauge::default(), total_success_get_encode_collab_from_redis: Gauge::default(), @@ -34,11 +39,26 @@ impl CollabRealtimeMetrics { apply_update_failed_count: Default::default(), acquire_collab_lock_count: Default::default(), acquire_collab_lock_fail_count: Default::default(), + + // when it comes to histograms we organize them by buckets or specific sizes - since our + // prometheus client doesn't support Summary type, we use Histogram type instead + + // time spent on apply_update in milliseconds: 1ms, 5ms, 15ms, 30ms, 100ms, 200ms, 500ms, 1s + apply_update_time: Histogram::new( + [1.0, 5.0, 15.0, 30.0, 100.0, 200.0, 500.0, 1000.0].into_iter(), + ), + // update size in bytes: 128B, 512B, 1KB, 64KB, 512KB, 1MB, 5MB, 10MB + apply_update_size: Histogram::new( + [ + 128.0, 512.0, 1024.0, 65536.0, 524288.0, 1048576.0, 5242880.0, 10485760.0, + ] + .into_iter(), + ), } } pub fn register(registry: &mut Registry) -> Self { - let metrics = Self::init(); + let metrics = Self::new(); let realtime_registry = registry.sub_registry_with_prefix("realtime"); realtime_registry.register( "connected_users", @@ -86,6 +106,16 @@ impl CollabRealtimeMetrics { "number of acquire collab lock failed", metrics.acquire_collab_lock_fail_count.clone(), ); + realtime_registry.register( + "apply_update_time", + "time spent on applying collab updates in milliseconds", + metrics.apply_update_time.clone(), + ); + realtime_registry.register( + "apply_update_size", + "size of updates applied to collab in bytes", + metrics.apply_update_size.clone(), + ); metrics }