Logs for apply update (#293)

* chore: add logs

* chore: add logs

* chore: clippy
This commit is contained in:
Nathan.fooo 2024-02-05 01:37:28 +08:00 committed by GitHub
parent d23ad1c4de
commit 2d8579caab
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 51 additions and 77 deletions

View File

@ -9,7 +9,7 @@ use collab::core::collab_state::SyncState;
use collab::core::origin::CollabOrigin;
use futures_util::{SinkExt, StreamExt};
use realtime_entity::collab_msg::{CollabMessage, InitSync, ServerInit, UpdateSync};
use realtime_protocol::{handle_msg, ClientSyncProtocol, CollabSyncProtocol};
use realtime_protocol::{handle_collab_message, ClientSyncProtocol, CollabSyncProtocol};
use realtime_protocol::{Message, MessageReader, SyncMessage};
use std::marker::PhantomData;
use std::ops::Deref;
@ -321,7 +321,7 @@ where
let msg = msg?;
trace!(" {}", msg);
let is_sync_step_1 = matches!(msg, Message::Sync(SyncMessage::SyncStep1(_)));
if let Some(payload) = handle_msg(&cloned_origin, protocol, collab, msg)? {
if let Some(payload) = handle_collab_message(&cloned_origin, protocol, collab, msg)? {
if is_sync_step_1 {
// flush
match collab.try_lock() {

View File

@ -194,14 +194,12 @@ where
#[derive(Debug, Clone)]
pub struct WriteConfig {
pub flush_per_update: u32,
pub flush_per_seconds: u32,
}
impl Default for WriteConfig {
fn default() -> Self {
Self {
flush_per_update: 100,
flush_per_seconds: 3 * 60,
}
}
}

View File

@ -346,9 +346,9 @@ impl UpdateSync {
Some(Message::Sync(SyncMessage::Update(right))),
) = (self.as_update(), other.as_update())
{
let mut encoder = EncoderV1::new();
let update = merge_updates_v1(&[&left, &right])?;
let msg = Message::Sync(SyncMessage::Update(update));
let mut encoder = EncoderV1::new();
msg.encode(&mut encoder);
self.payload = Bytes::from(encoder.to_vec());
Ok(true)

View File

@ -88,11 +88,10 @@ pub trait CollabSyncProtocol {
} else {
retry_txn.try_get_write_txn()
}
.map_err(|err| Error::YrsTransaction(format!("fail to handle sync step2. error: {}", err)))?;
txn.try_apply_update(update).map_err(|err| {
Error::YrsTransaction(format!("fail to apply sync step2 update. error: {}", err))
})?;
.map_err(|err| Error::YrsTransaction(format!("sync step2 transaction acquire: {}", err)))?;
txn
.try_apply_update(update)
.map_err(|err| Error::YrsTransaction(format!("sync step2 apply update: {}", err)))?;
Ok(None)
}
@ -140,7 +139,7 @@ pub trait CollabSyncProtocol {
}
/// Handles incoming messages from the client/server
pub fn handle_msg<P: CollabSyncProtocol>(
pub fn handle_collab_message<P: CollabSyncProtocol>(
origin: &Option<CollabOrigin>,
protocol: &P,
collab: &MutexCollab,

View File

@ -6,7 +6,7 @@ use collab::core::awareness::{Awareness, AwarenessUpdate};
use collab::core::collab::MutexCollab;
use collab::core::origin::CollabOrigin;
use futures_util::{SinkExt, StreamExt};
use realtime_protocol::handle_msg;
use realtime_protocol::handle_collab_message;
use realtime_protocol::{Message, MessageReader, MSG_SYNC, MSG_SYNC_UPDATE};
use tokio::select;
use tokio::sync::broadcast::error::SendError;
@ -194,9 +194,7 @@ impl CollabBroadcast {
continue;
}
// TODO(nathan): the handle_user_ws_msg should run very fast, otherwise it will block
// the current loop. So create a task for it.
handle_user_ws_msg(&object_id, &sink, &collab_msg, &collab).await;
handle_user_collab_message(&object_id, &sink, &collab_msg, &collab).await;
}
}
}
@ -212,7 +210,7 @@ impl CollabBroadcast {
}
}
async fn handle_user_ws_msg<Sink>(
async fn handle_user_collab_message<Sink>(
object_id: &str,
sink: &Arc<Mutex<Sink>>,
collab_msg: &CollabMessage,
@ -221,7 +219,6 @@ async fn handle_user_ws_msg<Sink>(
Sink: SinkExt<CollabMessage> + Send + Sync + Unpin + 'static,
<Sink as futures_util::Sink<CollabMessage>>::Error: std::error::Error + Send + Sync,
{
// safety: payload is not none
match collab_msg.payload() {
None => {},
Some(payload) => {
@ -235,7 +232,7 @@ async fn handle_user_ws_msg<Sink>(
let cloned_collab = collab.clone();
let cloned_origin = origin.clone();
let result = tokio::task::spawn_blocking(move || {
handle_msg(&cloned_origin, &ServerSyncProtocol, &cloned_collab, msg)
handle_collab_message(&cloned_origin, &ServerSyncProtocol, &cloned_collab, msg)
})
.await;
@ -265,7 +262,7 @@ async fn handle_user_ws_msg<Sink>(
},
},
Ok(Err(err)) => {
error!("handle user ws message fail: {}", err);
error!("object id:{} =>{}", object_id, err);
},
Err(err) => {
error!("internal error when handle user ws message: {}", err);
@ -273,7 +270,10 @@ async fn handle_user_ws_msg<Sink>(
}
},
Err(e) => {
error!("Parser sync message failed: {:?}", e);
error!(
"object id:{} => parser sync message failed: {:?}",
object_id, e
);
break;
},
}

View File

@ -19,8 +19,7 @@ use database_entity::dto::{
};
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, Ordering};
use std::sync::{Arc, Weak};
use std::time::Duration;
use tokio::time::interval;
use tracing::{error, event, info, instrument, trace};
use yrs::updates::decoder::Decode;
@ -53,7 +52,7 @@ where
let storage = Arc::new(storage);
let workspace_id = workspace_id.to_string();
let edit_state = Arc::new(CollabEditState::new());
let plugin = Self {
Self {
uid,
workspace_id,
storage,
@ -61,9 +60,7 @@ where
group,
collab_type,
access_control,
};
spawn_period_check(&plugin);
plugin
}
}
#[instrument(level = "info", skip(self,doc), err, fields(object_id = %object_id))]
@ -100,43 +97,7 @@ where
}
}
/// Spawns an asynchronous task to periodically check and perform flush operations for collaboration groups.
///
/// This function sets up a loop that, at regular intervals, checks if a flush operation is needed
/// based on edit count and time interval. If so, it triggers the flush operation.
///
fn spawn_period_check<S, U, AC>(plugin: &CollabStoragePlugin<S, U, AC>)
where
S: CollabStorage,
U: RealtimeUser,
{
let weak_edit_state = Arc::downgrade(&plugin.edit_state);
let weak_storage = Arc::downgrade(&plugin.storage);
let weak_group = plugin.group.clone();
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(30));
loop {
interval.tick().await;
match (
weak_group.upgrade(),
weak_storage.upgrade(),
weak_edit_state.upgrade(),
) {
(Some(group), Some(storage), Some(edit_state)) => {
let max_edit_count = storage.config().flush_per_update;
let max_interval = storage.config().flush_per_seconds as i64;
if edit_state.should_flush(max_edit_count, max_interval) {
edit_state.flush_edit();
group.flush_collab();
}
},
_ => break,
}
}
});
}
async fn init_collab_with_raw_data(
async fn init_collab(
oid: &str,
encoded_collab: &EncodedCollab,
doc: &Doc,
@ -144,16 +105,22 @@ async fn init_collab_with_raw_data(
if encoded_collab.doc_state.is_empty() {
return Err(RealtimeError::UnexpectedData("doc state is empty"));
}
// Can turn off INFO level into DEBUG. For now, just to see the log
event!(
tracing::Level::DEBUG,
"start decoding {}: doc state with len: {}",
tracing::Level::INFO,
"start decoding:{} state len: {}, sv len: {}, v: {:?}",
oid,
encoded_collab.doc_state.len()
encoded_collab.doc_state.len(),
encoded_collab.state_vector.len(),
encoded_collab.version
);
let mut txn = doc.transact_mut();
let update = Update::decode_v1(&encoded_collab.doc_state)?;
let mut txn = doc.transact_mut();
txn.try_apply_update(update)?;
event!(tracing::Level::DEBUG, "finish decoding {}: doc state", oid,);
drop(txn);
event!(tracing::Level::INFO, "finish decoding:{}: doc state", oid,);
Ok(())
}
@ -171,9 +138,7 @@ where
.get_collab_encoded(&self.uid, params, true)
.await
{
Ok(encoded_collab_v1) => match init_collab_with_raw_data(object_id, &encoded_collab_v1, doc)
.await
{
Ok(encoded_collab_v1) => match init_collab(object_id, &encoded_collab_v1, doc).await {
Ok(_) => {
// Attempt to create a snapshot for the collaboration object. When creating this snapshot, it is
// assumed that the 'encoded_collab_v1' is already in a valid format. Therefore, there is no need
@ -207,13 +172,23 @@ where
Err(err) => {
// When initializing a collaboration object, if the 'init_collab_with_raw_data' operation fails, attempt to
// restore the collaboration object from the latest snapshot.
if let Some(encoded_collab_v1) = get_latest_snapshot(object_id, &self.storage).await {
if let Err(err) = init_collab_with_raw_data(object_id, &encoded_collab_v1, doc).await {
error!("restore collab with snapshot failed: {:?}", err);
return;
}
error!(
"init collab:{} error: {:?}, try to restore from snapshot",
object_id, err
);
match get_latest_snapshot(object_id, &self.storage).await {
None => error!("No snapshot found for collab: {}", object_id),
Some(encoded_collab) => match init_collab(object_id, &encoded_collab, doc).await {
Ok(_) => info!("restore collab:{} with snapshot success", object_id),
Err(err) => {
error!(
"restore collab:{} with snapshot failed: {:?}",
object_id, err
);
},
},
}
error!("init collab failed: {:?}", err)
},
},
Err(err) => match &err {
@ -343,6 +318,7 @@ impl CollabEditState {
/// # Arguments
/// * `max_edit_count` - The maximum number of edits allowed before a flush is triggered.
/// * `max_interval` - The maximum time interval (in seconds) allowed before a flush is triggered.
#[allow(dead_code)]
fn should_flush(&self, max_edit_count: u32, max_interval: i64) -> bool {
// compare current time with last flush time
let current = chrono::Utc::now().timestamp();

View File

@ -365,6 +365,7 @@ async fn remove_user_from_group<S, U, AC>(
let should_remove = group.is_empty().await;
if should_remove {
group.flush_collab();
event!(tracing::Level::INFO, "Remove group: {}", editing.object_id);
groups.remove_group(&editing.object_id).await;
}