diff --git a/Cargo.lock b/Cargo.lock index 81fcbb23..7f6fd250 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1689,6 +1689,7 @@ dependencies = [ "app-error", "async-stream", "async-trait", + "bytes", "chrono", "collab", "collab-document", diff --git a/libs/client-api/src/collab_sync/collab_stream.rs b/libs/client-api/src/collab_sync/collab_stream.rs index ef34f346..9be5d739 100644 --- a/libs/client-api/src/collab_sync/collab_stream.rs +++ b/libs/client-api/src/collab_sync/collab_stream.rs @@ -5,7 +5,9 @@ use bytes::Bytes; use collab::core::collab::MutexCollab; use collab::core::origin::CollabOrigin; use collab_rt_entity::{AckCode, ClientCollabMessage, ServerCollabMessage, ServerInit, UpdateSync}; -use collab_rt_protocol::{handle_message, ClientSyncProtocol, Message, MessageReader, SyncMessage}; +use collab_rt_protocol::{ + handle_message_follow_protocol, ClientSyncProtocol, Message, MessageReader, SyncMessage, +}; use futures_util::{SinkExt, StreamExt}; use std::marker::PhantomData; use std::sync::atomic::{AtomicU32, Ordering}; @@ -145,7 +147,7 @@ where // update if let ServerCollabMessage::ClientAck(ref ack) = msg { if ack.get_code() == AckCode::CannotApplyUpdate { - return Err(SyncError::YrsApplyUpdate(object.object_id.clone())); + return Err(SyncError::RequireInitSync); } } @@ -233,7 +235,8 @@ where for msg in reader { let msg = msg?; let is_server_sync_step_1 = matches!(msg, Message::Sync(SyncMessage::SyncStep1(_))); - match handle_message(message_origin, &ClientSyncProtocol, &mut collab, msg)? { + match handle_message_follow_protocol(message_origin, &ClientSyncProtocol, &mut collab, msg)? + { Some(payload) => { let object_id = object_id.to_string(); sink.queue_msg(|msg_id| { diff --git a/libs/collab-rt-protocol/src/protocol.rs b/libs/collab-rt-protocol/src/protocol.rs index 11c3b096..1c87704b 100644 --- a/libs/collab-rt-protocol/src/protocol.rs +++ b/libs/collab-rt-protocol/src/protocol.rs @@ -181,7 +181,7 @@ pub trait CollabSyncProtocol { } /// Handles incoming messages from the client/server -pub fn handle_message( +pub fn handle_message_follow_protocol( message_origin: &CollabOrigin, protocol: &P, collab: &mut Collab, diff --git a/libs/collab-rt/Cargo.toml b/libs/collab-rt/Cargo.toml index 9f1fa9c1..85a7c0f4 100644 --- a/libs/collab-rt/Cargo.toml +++ b/libs/collab-rt/Cargo.toml @@ -20,6 +20,7 @@ serde.workspace = true serde_json.workspace = true thiserror = "1.0.56" anyhow = "1" +bytes.workspace = true collab = { version = "0.1.0" } collab-entity = { version = "0.1.0" } diff --git a/libs/collab-rt/src/error.rs b/libs/collab-rt/src/error.rs index 7aa12a0d..93f48fe8 100644 --- a/libs/collab-rt/src/error.rs +++ b/libs/collab-rt/src/error.rs @@ -47,6 +47,9 @@ pub enum RealtimeError { #[error("{0} send too many messages")] TooManyMessage(String), + #[error("Acquire lock timeout")] + LockTimeout, + #[error("Internal failure: {0}")] Internal(#[from] anyhow::Error), } @@ -55,4 +58,8 @@ impl RealtimeError { pub fn is_too_many_message(&self) -> bool { matches!(self, RealtimeError::TooManyMessage(_)) } + + pub fn is_lock_timeout(&self) -> bool { + matches!(self, RealtimeError::LockTimeout) + } } diff --git a/libs/collab-rt/src/group/broadcast.rs b/libs/collab-rt/src/group/broadcast.rs index 38f0c992..2342d3fc 100644 --- a/libs/collab-rt/src/group/broadcast.rs +++ b/libs/collab-rt/src/group/broadcast.rs @@ -4,6 +4,7 @@ use crate::group::protocol::ServerSyncProtocol; use crate::metrics::CollabMetricsCalculate; use crate::rt_server::rt_spawn; use anyhow::anyhow; +use bytes::Bytes; use collab::core::awareness::{gen_awareness_update_message, AwarenessUpdateSubscription}; use collab::core::collab::{MutexCollab, WeakMutexCollab}; use collab::core::origin::CollabOrigin; @@ -13,15 +14,17 @@ use collab_rt_entity::{AckCode, MsgId}; use collab_rt_entity::{ AwarenessSync, BroadcastSync, ClientCollabMessage, CollabAck, CollabMessage, }; -use collab_rt_protocol::{handle_message, RTProtocolError}; +use collab_rt_protocol::{handle_message_follow_protocol, RTProtocolError}; use collab_rt_protocol::{Message, MessageReader, MSG_SYNC, MSG_SYNC_UPDATE}; use futures_util::{SinkExt, StreamExt}; use std::sync::atomic::Ordering; use std::sync::Arc; +use std::time::Duration; use tokio::select; use tokio::sync::broadcast::error::SendError; use tokio::sync::broadcast::{channel, Sender}; -use tokio::time::Instant; + +use tokio::time::{sleep, Instant}; use tracing::{error, trace, warn}; use yrs::encoding::write::Write; use yrs::updates::decoder::DecoderV1; @@ -337,33 +340,62 @@ async fn handle_one_client_message( let message_origin = collab_msg.origin().clone(); let seq_num = edit_state.edit_count(); - // If the payload is empty, we don't need to apply any updates to the document. + // If the payload is empty, we don't need to apply any updates . // Currently, only the ping message should has an empty payload. if collab_msg.payload().is_empty() { if !matches!(collab_msg, ClientCollabMessage::ClientPingSync(_)) { error!("receive unexpected empty payload message:{}", collab_msg); } - let resp = CollabAck::new(message_origin, object_id.to_string(), msg_id, seq_num); - Ok(resp) - } else { - trace!( - "Applying client updates: {}, origin:{}", - collab_msg, - message_origin - ); - let ack = handle_one_message_payload( - object_id, + return Ok(CollabAck::new( message_origin, + object_id.to_string(), + msg_id, + seq_num, + )); + } + + trace!( + "Applying client updates: {}, origin:{}", + collab_msg, + message_origin + ); + + // Retry mechanism for lock timeout errors + let mut attempt = 0; + let max_attempts = 3; + + // calling the handle_one_message_payload in a loop to handle the lock timeout error. It will retry + // 3 times before returning an error. + loop { + match handle_one_message_payload( + object_id, + message_origin.clone(), msg_id, collab_msg.payload(), collab, metrics_calculate, seq_num, ) - .await?; - - update_last_sync_at(collab); - Ok(ack) + .await + { + Ok(ack) => { + update_last_sync_at(collab); + return Ok(ack); + }, + Err(err) if err.is_lock_timeout() => { + attempt += 1; + if attempt >= max_attempts { + return Err(RealtimeError::Internal(anyhow!( + "Lock timeout error after maximum retries" + ))); + } + trace!("Lock timeout, retrying... attempt: {}", attempt); + sleep(Duration::from_millis(300)).await; + }, + Err(err) => { + return Err(err); + }, + } } } @@ -372,81 +404,116 @@ async fn handle_one_message_payload( object_id: &str, message_origin: CollabOrigin, msg_id: MsgId, - payload: &[u8], + payload: &Bytes, collab: &MutexCollab, metrics_calculate: &CollabMetricsCalculate, seq_num: u32, ) -> Result { - let mut decoder = DecoderV1::from(payload); - let reader = MessageReader::new(&mut decoder); - let mut ack_response = None; - + let payload = payload.clone(); metrics_calculate .acquire_collab_lock_count .fetch_add(1, Ordering::Relaxed); - let mut collab_lock = match collab.try_lock() { - Some(collab) => collab, - None => { - metrics_calculate - .acquire_collab_lock_fail_count - .fetch_add(1, Ordering::Relaxed); + let cloned_object_id = object_id.to_string(); + let mutex_collab = collab.clone(); + let metrics_calculate = metrics_calculate.clone(); + let cloned_collab_origin = message_origin.clone(); - return Err(RealtimeError::Internal(anyhow!( - "fail to lock collab:{}", - object_id, - ))); - }, - }; + // Spawn a blocking task to handle the message + let result = tokio::task::spawn_blocking(move || { + let mut collab_lock = match mutex_collab.try_lock_for(Duration::from_millis(300)) { + Some(collab) => collab, + None => { + metrics_calculate + .acquire_collab_lock_fail_count + .fetch_add(1, Ordering::Relaxed); + return Err(RealtimeError::LockTimeout); + }, + }; + let mut decoder = DecoderV1::from(payload.as_ref()); + let reader = MessageReader::new(&mut decoder); + let mut ack_response = None; + for msg in reader { + match msg { + Ok(msg) => { + match handle_message_follow_protocol( + &message_origin, + &ServerSyncProtocol, + &mut collab_lock, + msg, + ) { + Ok(payload) => { + metrics_calculate + .apply_update_count + .fetch_add(1, Ordering::Relaxed); + // One ClientCollabMessage can have multiple Yrs [Message] in it, but we only need to + // send one ack back to the client. + if ack_response.is_none() { + ack_response = Some( + CollabAck::new( + message_origin.clone(), + cloned_object_id.to_string(), + msg_id, + seq_num, + ) + .with_payload(payload.unwrap_or_default()), + ); + } + }, + Err(err) => { + metrics_calculate + .apply_update_failed_count + .fetch_add(1, Ordering::Relaxed); - for msg in reader { - match msg { - Ok(msg) => { - let result = handle_message(&message_origin, &ServerSyncProtocol, &mut collab_lock, msg); - match result { - Ok(payload) => { - metrics_calculate - .apply_update_count - .fetch_add(1, Ordering::Relaxed); - // One ClientCollabMessage can have multiple Yrs [Message] in it, but we only need to - // send one ack back to the client. - if ack_response.is_none() { - let resp = CollabAck::new( - message_origin.clone(), - object_id.to_string(), - msg_id, - seq_num, - ) - .with_payload(payload.unwrap_or_default()); - ack_response = Some(resp); - } - }, - Err(err) => { - metrics_calculate - .apply_update_failed_count - .fetch_add(1, Ordering::Relaxed); - if ack_response.is_none() { - let resp = CollabAck::new( - message_origin.clone(), - object_id.to_string(), - msg_id, - seq_num, - ) - .with_code(ack_code_from_error(&err)); - ack_response = Some(resp); - } - break; - }, - } - }, - Err(e) => { - error!("{} => parse sync message failed: {:?}", object_id, e); - break; - }, + if !matches!(err, RTProtocolError::MissUpdates(_)) { + error!("{} => apply update failed: {:?}", cloned_object_id, err); + } + + if ack_response.is_none() { + ack_response = Some( + CollabAck::new( + message_origin.clone(), + cloned_object_id.to_string(), + msg_id, + seq_num, + ) + .with_code(ack_code_from_error(&err)), + ); + } + break; + }, + } + }, + Err(e) => { + error!("{} => parse sync message failed: {:?}", cloned_object_id, e); + break; + }, + } } + Ok(ack_response) + }) + .await; + + match result { + Ok(inner_result) => match inner_result? { + Some(response) => Ok(response), + None => Err(RealtimeError::UnexpectedData("No ack response")), + }, + Err(err) => { + // Currently, panic only happens when calling handle_message_follow_protocol. + if err.is_panic() { + Ok( + CollabAck::new(cloned_collab_origin, object_id.to_string(), msg_id, seq_num) + .with_code(AckCode::CannotApplyUpdate), + ) + } else { + Err(RealtimeError::Internal(anyhow!( + "fail to handle message:{}", + err + ))) + } + }, } - let response = ack_response.ok_or_else(|| RealtimeError::UnexpectedData("No ack response"))?; - Ok(response) } #[inline]