chore: fix potential sync issue (#474)

* chore: retry for handle message

* chore: retry when tring to lock the collab
This commit is contained in:
Nathan.fooo 2024-04-16 20:07:47 +08:00 committed by GitHub
parent 0be4d2d5b5
commit 4d36f7e9e6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 163 additions and 84 deletions

1
Cargo.lock generated
View File

@ -1689,6 +1689,7 @@ dependencies = [
"app-error",
"async-stream",
"async-trait",
"bytes",
"chrono",
"collab",
"collab-document",

View File

@ -5,7 +5,9 @@ use bytes::Bytes;
use collab::core::collab::MutexCollab;
use collab::core::origin::CollabOrigin;
use collab_rt_entity::{AckCode, ClientCollabMessage, ServerCollabMessage, ServerInit, UpdateSync};
use collab_rt_protocol::{handle_message, ClientSyncProtocol, Message, MessageReader, SyncMessage};
use collab_rt_protocol::{
handle_message_follow_protocol, ClientSyncProtocol, Message, MessageReader, SyncMessage,
};
use futures_util::{SinkExt, StreamExt};
use std::marker::PhantomData;
use std::sync::atomic::{AtomicU32, Ordering};
@ -145,7 +147,7 @@ where
// update
if let ServerCollabMessage::ClientAck(ref ack) = msg {
if ack.get_code() == AckCode::CannotApplyUpdate {
return Err(SyncError::YrsApplyUpdate(object.object_id.clone()));
return Err(SyncError::RequireInitSync);
}
}
@ -233,7 +235,8 @@ where
for msg in reader {
let msg = msg?;
let is_server_sync_step_1 = matches!(msg, Message::Sync(SyncMessage::SyncStep1(_)));
match handle_message(message_origin, &ClientSyncProtocol, &mut collab, msg)? {
match handle_message_follow_protocol(message_origin, &ClientSyncProtocol, &mut collab, msg)?
{
Some(payload) => {
let object_id = object_id.to_string();
sink.queue_msg(|msg_id| {

View File

@ -181,7 +181,7 @@ pub trait CollabSyncProtocol {
}
/// Handles incoming messages from the client/server
pub fn handle_message<P: CollabSyncProtocol>(
pub fn handle_message_follow_protocol<P: CollabSyncProtocol>(
message_origin: &CollabOrigin,
protocol: &P,
collab: &mut Collab,

View File

@ -20,6 +20,7 @@ serde.workspace = true
serde_json.workspace = true
thiserror = "1.0.56"
anyhow = "1"
bytes.workspace = true
collab = { version = "0.1.0" }
collab-entity = { version = "0.1.0" }

View File

@ -47,6 +47,9 @@ pub enum RealtimeError {
#[error("{0} send too many messages")]
TooManyMessage(String),
#[error("Acquire lock timeout")]
LockTimeout,
#[error("Internal failure: {0}")]
Internal(#[from] anyhow::Error),
}
@ -55,4 +58,8 @@ impl RealtimeError {
pub fn is_too_many_message(&self) -> bool {
matches!(self, RealtimeError::TooManyMessage(_))
}
pub fn is_lock_timeout(&self) -> bool {
matches!(self, RealtimeError::LockTimeout)
}
}

View File

@ -4,6 +4,7 @@ use crate::group::protocol::ServerSyncProtocol;
use crate::metrics::CollabMetricsCalculate;
use crate::rt_server::rt_spawn;
use anyhow::anyhow;
use bytes::Bytes;
use collab::core::awareness::{gen_awareness_update_message, AwarenessUpdateSubscription};
use collab::core::collab::{MutexCollab, WeakMutexCollab};
use collab::core::origin::CollabOrigin;
@ -13,15 +14,17 @@ use collab_rt_entity::{AckCode, MsgId};
use collab_rt_entity::{
AwarenessSync, BroadcastSync, ClientCollabMessage, CollabAck, CollabMessage,
};
use collab_rt_protocol::{handle_message, RTProtocolError};
use collab_rt_protocol::{handle_message_follow_protocol, RTProtocolError};
use collab_rt_protocol::{Message, MessageReader, MSG_SYNC, MSG_SYNC_UPDATE};
use futures_util::{SinkExt, StreamExt};
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use tokio::select;
use tokio::sync::broadcast::error::SendError;
use tokio::sync::broadcast::{channel, Sender};
use tokio::time::Instant;
use tokio::time::{sleep, Instant};
use tracing::{error, trace, warn};
use yrs::encoding::write::Write;
use yrs::updates::decoder::DecoderV1;
@ -337,33 +340,62 @@ async fn handle_one_client_message(
let message_origin = collab_msg.origin().clone();
let seq_num = edit_state.edit_count();
// If the payload is empty, we don't need to apply any updates to the document.
// If the payload is empty, we don't need to apply any updates .
// Currently, only the ping message should has an empty payload.
if collab_msg.payload().is_empty() {
if !matches!(collab_msg, ClientCollabMessage::ClientPingSync(_)) {
error!("receive unexpected empty payload message:{}", collab_msg);
}
let resp = CollabAck::new(message_origin, object_id.to_string(), msg_id, seq_num);
Ok(resp)
} else {
trace!(
"Applying client updates: {}, origin:{}",
collab_msg,
message_origin
);
let ack = handle_one_message_payload(
object_id,
return Ok(CollabAck::new(
message_origin,
object_id.to_string(),
msg_id,
seq_num,
));
}
trace!(
"Applying client updates: {}, origin:{}",
collab_msg,
message_origin
);
// Retry mechanism for lock timeout errors
let mut attempt = 0;
let max_attempts = 3;
// calling the handle_one_message_payload in a loop to handle the lock timeout error. It will retry
// 3 times before returning an error.
loop {
match handle_one_message_payload(
object_id,
message_origin.clone(),
msg_id,
collab_msg.payload(),
collab,
metrics_calculate,
seq_num,
)
.await?;
update_last_sync_at(collab);
Ok(ack)
.await
{
Ok(ack) => {
update_last_sync_at(collab);
return Ok(ack);
},
Err(err) if err.is_lock_timeout() => {
attempt += 1;
if attempt >= max_attempts {
return Err(RealtimeError::Internal(anyhow!(
"Lock timeout error after maximum retries"
)));
}
trace!("Lock timeout, retrying... attempt: {}", attempt);
sleep(Duration::from_millis(300)).await;
},
Err(err) => {
return Err(err);
},
}
}
}
@ -372,81 +404,116 @@ async fn handle_one_message_payload(
object_id: &str,
message_origin: CollabOrigin,
msg_id: MsgId,
payload: &[u8],
payload: &Bytes,
collab: &MutexCollab,
metrics_calculate: &CollabMetricsCalculate,
seq_num: u32,
) -> Result<CollabAck, RealtimeError> {
let mut decoder = DecoderV1::from(payload);
let reader = MessageReader::new(&mut decoder);
let mut ack_response = None;
let payload = payload.clone();
metrics_calculate
.acquire_collab_lock_count
.fetch_add(1, Ordering::Relaxed);
let mut collab_lock = match collab.try_lock() {
Some(collab) => collab,
None => {
metrics_calculate
.acquire_collab_lock_fail_count
.fetch_add(1, Ordering::Relaxed);
let cloned_object_id = object_id.to_string();
let mutex_collab = collab.clone();
let metrics_calculate = metrics_calculate.clone();
let cloned_collab_origin = message_origin.clone();
return Err(RealtimeError::Internal(anyhow!(
"fail to lock collab:{}",
object_id,
)));
},
};
// Spawn a blocking task to handle the message
let result = tokio::task::spawn_blocking(move || {
let mut collab_lock = match mutex_collab.try_lock_for(Duration::from_millis(300)) {
Some(collab) => collab,
None => {
metrics_calculate
.acquire_collab_lock_fail_count
.fetch_add(1, Ordering::Relaxed);
return Err(RealtimeError::LockTimeout);
},
};
let mut decoder = DecoderV1::from(payload.as_ref());
let reader = MessageReader::new(&mut decoder);
let mut ack_response = None;
for msg in reader {
match msg {
Ok(msg) => {
match handle_message_follow_protocol(
&message_origin,
&ServerSyncProtocol,
&mut collab_lock,
msg,
) {
Ok(payload) => {
metrics_calculate
.apply_update_count
.fetch_add(1, Ordering::Relaxed);
// One ClientCollabMessage can have multiple Yrs [Message] in it, but we only need to
// send one ack back to the client.
if ack_response.is_none() {
ack_response = Some(
CollabAck::new(
message_origin.clone(),
cloned_object_id.to_string(),
msg_id,
seq_num,
)
.with_payload(payload.unwrap_or_default()),
);
}
},
Err(err) => {
metrics_calculate
.apply_update_failed_count
.fetch_add(1, Ordering::Relaxed);
for msg in reader {
match msg {
Ok(msg) => {
let result = handle_message(&message_origin, &ServerSyncProtocol, &mut collab_lock, msg);
match result {
Ok(payload) => {
metrics_calculate
.apply_update_count
.fetch_add(1, Ordering::Relaxed);
// One ClientCollabMessage can have multiple Yrs [Message] in it, but we only need to
// send one ack back to the client.
if ack_response.is_none() {
let resp = CollabAck::new(
message_origin.clone(),
object_id.to_string(),
msg_id,
seq_num,
)
.with_payload(payload.unwrap_or_default());
ack_response = Some(resp);
}
},
Err(err) => {
metrics_calculate
.apply_update_failed_count
.fetch_add(1, Ordering::Relaxed);
if ack_response.is_none() {
let resp = CollabAck::new(
message_origin.clone(),
object_id.to_string(),
msg_id,
seq_num,
)
.with_code(ack_code_from_error(&err));
ack_response = Some(resp);
}
break;
},
}
},
Err(e) => {
error!("{} => parse sync message failed: {:?}", object_id, e);
break;
},
if !matches!(err, RTProtocolError::MissUpdates(_)) {
error!("{} => apply update failed: {:?}", cloned_object_id, err);
}
if ack_response.is_none() {
ack_response = Some(
CollabAck::new(
message_origin.clone(),
cloned_object_id.to_string(),
msg_id,
seq_num,
)
.with_code(ack_code_from_error(&err)),
);
}
break;
},
}
},
Err(e) => {
error!("{} => parse sync message failed: {:?}", cloned_object_id, e);
break;
},
}
}
Ok(ack_response)
})
.await;
match result {
Ok(inner_result) => match inner_result? {
Some(response) => Ok(response),
None => Err(RealtimeError::UnexpectedData("No ack response")),
},
Err(err) => {
// Currently, panic only happens when calling handle_message_follow_protocol.
if err.is_panic() {
Ok(
CollabAck::new(cloned_collab_origin, object_id.to_string(), msg_id, seq_num)
.with_code(AckCode::CannotApplyUpdate),
)
} else {
Err(RealtimeError::Internal(anyhow!(
"fail to handle message:{}",
err
)))
}
},
}
let response = ack_response.ok_or_else(|| RealtimeError::UnexpectedData("No ack response"))?;
Ok(response)
}
#[inline]