From 2d8579caaba4a843c461aa36ad3681883e46b8ee Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Mon, 5 Feb 2024 01:37:28 +0800 Subject: [PATCH] Logs for apply update (#293) * chore: add logs * chore: add logs * chore: clippy --- libs/client-api/src/collab_sync/sync.rs | 4 +- libs/database/src/collab/collab_storage.rs | 2 - libs/realtime-entity/src/collab_msg.rs | 2 +- libs/realtime-protocol/src/protocol.rs | 11 ++- libs/realtime/src/collaborate/broadcast.rs | 18 ++--- libs/realtime/src/collaborate/plugin.rs | 90 ++++++++-------------- libs/realtime/src/collaborate/server.rs | 1 + 7 files changed, 51 insertions(+), 77 deletions(-) diff --git a/libs/client-api/src/collab_sync/sync.rs b/libs/client-api/src/collab_sync/sync.rs index 9c47eb44..8ae3b09f 100644 --- a/libs/client-api/src/collab_sync/sync.rs +++ b/libs/client-api/src/collab_sync/sync.rs @@ -9,7 +9,7 @@ use collab::core::collab_state::SyncState; use collab::core::origin::CollabOrigin; use futures_util::{SinkExt, StreamExt}; use realtime_entity::collab_msg::{CollabMessage, InitSync, ServerInit, UpdateSync}; -use realtime_protocol::{handle_msg, ClientSyncProtocol, CollabSyncProtocol}; +use realtime_protocol::{handle_collab_message, ClientSyncProtocol, CollabSyncProtocol}; use realtime_protocol::{Message, MessageReader, SyncMessage}; use std::marker::PhantomData; use std::ops::Deref; @@ -321,7 +321,7 @@ where let msg = msg?; trace!(" {}", msg); let is_sync_step_1 = matches!(msg, Message::Sync(SyncMessage::SyncStep1(_))); - if let Some(payload) = handle_msg(&cloned_origin, protocol, collab, msg)? { + if let Some(payload) = handle_collab_message(&cloned_origin, protocol, collab, msg)? { if is_sync_step_1 { // flush match collab.try_lock() { diff --git a/libs/database/src/collab/collab_storage.rs b/libs/database/src/collab/collab_storage.rs index e3980156..095e9059 100644 --- a/libs/database/src/collab/collab_storage.rs +++ b/libs/database/src/collab/collab_storage.rs @@ -194,14 +194,12 @@ where #[derive(Debug, Clone)] pub struct WriteConfig { pub flush_per_update: u32, - pub flush_per_seconds: u32, } impl Default for WriteConfig { fn default() -> Self { Self { flush_per_update: 100, - flush_per_seconds: 3 * 60, } } } diff --git a/libs/realtime-entity/src/collab_msg.rs b/libs/realtime-entity/src/collab_msg.rs index 9038ae0a..03670dff 100644 --- a/libs/realtime-entity/src/collab_msg.rs +++ b/libs/realtime-entity/src/collab_msg.rs @@ -346,9 +346,9 @@ impl UpdateSync { Some(Message::Sync(SyncMessage::Update(right))), ) = (self.as_update(), other.as_update()) { - let mut encoder = EncoderV1::new(); let update = merge_updates_v1(&[&left, &right])?; let msg = Message::Sync(SyncMessage::Update(update)); + let mut encoder = EncoderV1::new(); msg.encode(&mut encoder); self.payload = Bytes::from(encoder.to_vec()); Ok(true) diff --git a/libs/realtime-protocol/src/protocol.rs b/libs/realtime-protocol/src/protocol.rs index 3915b021..3c6108c0 100644 --- a/libs/realtime-protocol/src/protocol.rs +++ b/libs/realtime-protocol/src/protocol.rs @@ -88,11 +88,10 @@ pub trait CollabSyncProtocol { } else { retry_txn.try_get_write_txn() } - .map_err(|err| Error::YrsTransaction(format!("fail to handle sync step2. error: {}", err)))?; - - txn.try_apply_update(update).map_err(|err| { - Error::YrsTransaction(format!("fail to apply sync step2 update. error: {}", err)) - })?; + .map_err(|err| Error::YrsTransaction(format!("sync step2 transaction acquire: {}", err)))?; + txn + .try_apply_update(update) + .map_err(|err| Error::YrsTransaction(format!("sync step2 apply update: {}", err)))?; Ok(None) } @@ -140,7 +139,7 @@ pub trait CollabSyncProtocol { } /// Handles incoming messages from the client/server -pub fn handle_msg( +pub fn handle_collab_message( origin: &Option, protocol: &P, collab: &MutexCollab, diff --git a/libs/realtime/src/collaborate/broadcast.rs b/libs/realtime/src/collaborate/broadcast.rs index 66d2208d..25ebe289 100644 --- a/libs/realtime/src/collaborate/broadcast.rs +++ b/libs/realtime/src/collaborate/broadcast.rs @@ -6,7 +6,7 @@ use collab::core::awareness::{Awareness, AwarenessUpdate}; use collab::core::collab::MutexCollab; use collab::core::origin::CollabOrigin; use futures_util::{SinkExt, StreamExt}; -use realtime_protocol::handle_msg; +use realtime_protocol::handle_collab_message; use realtime_protocol::{Message, MessageReader, MSG_SYNC, MSG_SYNC_UPDATE}; use tokio::select; use tokio::sync::broadcast::error::SendError; @@ -194,9 +194,7 @@ impl CollabBroadcast { continue; } - // TODO(nathan): the handle_user_ws_msg should run very fast, otherwise it will block - // the current loop. So create a task for it. - handle_user_ws_msg(&object_id, &sink, &collab_msg, &collab).await; + handle_user_collab_message(&object_id, &sink, &collab_msg, &collab).await; } } } @@ -212,7 +210,7 @@ impl CollabBroadcast { } } -async fn handle_user_ws_msg( +async fn handle_user_collab_message( object_id: &str, sink: &Arc>, collab_msg: &CollabMessage, @@ -221,7 +219,6 @@ async fn handle_user_ws_msg( Sink: SinkExt + Send + Sync + Unpin + 'static, >::Error: std::error::Error + Send + Sync, { - // safety: payload is not none match collab_msg.payload() { None => {}, Some(payload) => { @@ -235,7 +232,7 @@ async fn handle_user_ws_msg( let cloned_collab = collab.clone(); let cloned_origin = origin.clone(); let result = tokio::task::spawn_blocking(move || { - handle_msg(&cloned_origin, &ServerSyncProtocol, &cloned_collab, msg) + handle_collab_message(&cloned_origin, &ServerSyncProtocol, &cloned_collab, msg) }) .await; @@ -265,7 +262,7 @@ async fn handle_user_ws_msg( }, }, Ok(Err(err)) => { - error!("handle user ws message fail: {}", err); + error!("object id:{} =>{}", object_id, err); }, Err(err) => { error!("internal error when handle user ws message: {}", err); @@ -273,7 +270,10 @@ async fn handle_user_ws_msg( } }, Err(e) => { - error!("Parser sync message failed: {:?}", e); + error!( + "object id:{} => parser sync message failed: {:?}", + object_id, e + ); break; }, } diff --git a/libs/realtime/src/collaborate/plugin.rs b/libs/realtime/src/collaborate/plugin.rs index 0f24cae7..c9255296 100644 --- a/libs/realtime/src/collaborate/plugin.rs +++ b/libs/realtime/src/collaborate/plugin.rs @@ -19,8 +19,7 @@ use database_entity::dto::{ }; use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, Ordering}; use std::sync::{Arc, Weak}; -use std::time::Duration; -use tokio::time::interval; + use tracing::{error, event, info, instrument, trace}; use yrs::updates::decoder::Decode; @@ -53,7 +52,7 @@ where let storage = Arc::new(storage); let workspace_id = workspace_id.to_string(); let edit_state = Arc::new(CollabEditState::new()); - let plugin = Self { + Self { uid, workspace_id, storage, @@ -61,9 +60,7 @@ where group, collab_type, access_control, - }; - spawn_period_check(&plugin); - plugin + } } #[instrument(level = "info", skip(self,doc), err, fields(object_id = %object_id))] @@ -100,43 +97,7 @@ where } } -/// Spawns an asynchronous task to periodically check and perform flush operations for collaboration groups. -/// -/// This function sets up a loop that, at regular intervals, checks if a flush operation is needed -/// based on edit count and time interval. If so, it triggers the flush operation. -/// -fn spawn_period_check(plugin: &CollabStoragePlugin) -where - S: CollabStorage, - U: RealtimeUser, -{ - let weak_edit_state = Arc::downgrade(&plugin.edit_state); - let weak_storage = Arc::downgrade(&plugin.storage); - let weak_group = plugin.group.clone(); - tokio::spawn(async move { - let mut interval = interval(Duration::from_secs(30)); - loop { - interval.tick().await; - match ( - weak_group.upgrade(), - weak_storage.upgrade(), - weak_edit_state.upgrade(), - ) { - (Some(group), Some(storage), Some(edit_state)) => { - let max_edit_count = storage.config().flush_per_update; - let max_interval = storage.config().flush_per_seconds as i64; - if edit_state.should_flush(max_edit_count, max_interval) { - edit_state.flush_edit(); - group.flush_collab(); - } - }, - _ => break, - } - } - }); -} - -async fn init_collab_with_raw_data( +async fn init_collab( oid: &str, encoded_collab: &EncodedCollab, doc: &Doc, @@ -144,16 +105,22 @@ async fn init_collab_with_raw_data( if encoded_collab.doc_state.is_empty() { return Err(RealtimeError::UnexpectedData("doc state is empty")); } + + // Can turn off INFO level into DEBUG. For now, just to see the log event!( - tracing::Level::DEBUG, - "start decoding {}: doc state with len: {}", + tracing::Level::INFO, + "start decoding:{} state len: {}, sv len: {}, v: {:?}", oid, - encoded_collab.doc_state.len() + encoded_collab.doc_state.len(), + encoded_collab.state_vector.len(), + encoded_collab.version ); - let mut txn = doc.transact_mut(); let update = Update::decode_v1(&encoded_collab.doc_state)?; + let mut txn = doc.transact_mut(); txn.try_apply_update(update)?; - event!(tracing::Level::DEBUG, "finish decoding {}: doc state", oid,); + drop(txn); + + event!(tracing::Level::INFO, "finish decoding:{}: doc state", oid,); Ok(()) } @@ -171,9 +138,7 @@ where .get_collab_encoded(&self.uid, params, true) .await { - Ok(encoded_collab_v1) => match init_collab_with_raw_data(object_id, &encoded_collab_v1, doc) - .await - { + Ok(encoded_collab_v1) => match init_collab(object_id, &encoded_collab_v1, doc).await { Ok(_) => { // Attempt to create a snapshot for the collaboration object. When creating this snapshot, it is // assumed that the 'encoded_collab_v1' is already in a valid format. Therefore, there is no need @@ -207,13 +172,23 @@ where Err(err) => { // When initializing a collaboration object, if the 'init_collab_with_raw_data' operation fails, attempt to // restore the collaboration object from the latest snapshot. - if let Some(encoded_collab_v1) = get_latest_snapshot(object_id, &self.storage).await { - if let Err(err) = init_collab_with_raw_data(object_id, &encoded_collab_v1, doc).await { - error!("restore collab with snapshot failed: {:?}", err); - return; - } + error!( + "init collab:{} error: {:?}, try to restore from snapshot", + object_id, err + ); + + match get_latest_snapshot(object_id, &self.storage).await { + None => error!("No snapshot found for collab: {}", object_id), + Some(encoded_collab) => match init_collab(object_id, &encoded_collab, doc).await { + Ok(_) => info!("restore collab:{} with snapshot success", object_id), + Err(err) => { + error!( + "restore collab:{} with snapshot failed: {:?}", + object_id, err + ); + }, + }, } - error!("init collab failed: {:?}", err) }, }, Err(err) => match &err { @@ -343,6 +318,7 @@ impl CollabEditState { /// # Arguments /// * `max_edit_count` - The maximum number of edits allowed before a flush is triggered. /// * `max_interval` - The maximum time interval (in seconds) allowed before a flush is triggered. + #[allow(dead_code)] fn should_flush(&self, max_edit_count: u32, max_interval: i64) -> bool { // compare current time with last flush time let current = chrono::Utc::now().timestamp(); diff --git a/libs/realtime/src/collaborate/server.rs b/libs/realtime/src/collaborate/server.rs index dc6456ad..ced0d85f 100644 --- a/libs/realtime/src/collaborate/server.rs +++ b/libs/realtime/src/collaborate/server.rs @@ -365,6 +365,7 @@ async fn remove_user_from_group( let should_remove = group.is_empty().await; if should_remove { group.flush_collab(); + event!(tracing::Level::INFO, "Remove group: {}", editing.object_id); groups.remove_group(&editing.object_id).await; }