chore: broadcast check (#485)
* chore: check broadcast continuous * chore: update log * chore: update log
This commit is contained in:
parent
0de64f4f71
commit
db23b64229
|
|
@ -542,7 +542,6 @@ impl TestClient {
|
|||
SinkConfig::default(),
|
||||
stream,
|
||||
Some(handler),
|
||||
!self.ws_client.is_connected(),
|
||||
ws_connect_state,
|
||||
);
|
||||
collab.lock().add_plugin(Box::new(sync_plugin));
|
||||
|
|
@ -613,7 +612,6 @@ impl TestClient {
|
|||
SinkConfig::default(),
|
||||
stream,
|
||||
Some(handler),
|
||||
!self.ws_client.is_connected(),
|
||||
ws_connect_state,
|
||||
);
|
||||
|
||||
|
|
|
|||
|
|
@ -62,13 +62,12 @@ where
|
|||
notifier: watch::Sender<SinkSignal>,
|
||||
sync_state_tx: broadcast::Sender<CollabSyncState>,
|
||||
config: SinkConfig,
|
||||
pause: bool,
|
||||
) -> Self {
|
||||
let notifier = Arc::new(notifier);
|
||||
let sender = Arc::new(Mutex::new(sink));
|
||||
let message_queue = Arc::new(parking_lot::Mutex::new(SinkQueue::new()));
|
||||
let sending_messages = Arc::new(parking_lot::Mutex::new(HashSet::new()));
|
||||
let state = Arc::new(CollabSinkState::new(pause));
|
||||
let state = Arc::new(CollabSinkState::new());
|
||||
let mut interval = interval(SEND_INTERVAL);
|
||||
let weak_sending_messages = Arc::downgrade(&sending_messages);
|
||||
|
||||
|
|
@ -152,6 +151,7 @@ where
|
|||
/// message immediately.
|
||||
pub fn queue_init_sync(&self, f: impl FnOnce(MsgId) -> ClientCollabMessage) {
|
||||
let _ = self.sync_state_tx.send(CollabSyncState::Syncing);
|
||||
self.clear();
|
||||
|
||||
// When the client is connected, remove all pending messages and send the init message.
|
||||
let mut msg_queue = self.message_queue.lock();
|
||||
|
|
@ -200,8 +200,6 @@ where
|
|||
}
|
||||
|
||||
self.state.pause_ping.store(true, Ordering::SeqCst);
|
||||
self.state.pause.store(true, Ordering::SeqCst);
|
||||
let _ = self.sync_state_tx.send(CollabSyncState::Pause);
|
||||
}
|
||||
|
||||
pub fn resume(&self) {
|
||||
|
|
@ -210,7 +208,6 @@ where
|
|||
}
|
||||
|
||||
self.state.pause_ping.store(false, Ordering::SeqCst);
|
||||
self.state.pause.store(false, Ordering::SeqCst);
|
||||
}
|
||||
|
||||
/// Notify the sink to process the next message and mark the current message as done.
|
||||
|
|
@ -225,7 +222,7 @@ where
|
|||
let income_message_id = msg_id;
|
||||
let mut sending_messages = self.sending_messages.lock();
|
||||
|
||||
// if the message id is not in the flying messages, it means the message is invalid.
|
||||
// if the message id is not in the sending messages, it means the message is invalid.
|
||||
if !sending_messages.contains(&income_message_id) {
|
||||
return Ok(false);
|
||||
}
|
||||
|
|
@ -287,12 +284,6 @@ where
|
|||
}
|
||||
|
||||
async fn process_next_msg(&self) {
|
||||
if self.state.pause.load(Ordering::SeqCst) {
|
||||
// If the sink is paused, sleep for a while and try later.
|
||||
sleep(Duration::from_secs(2)).await;
|
||||
return;
|
||||
}
|
||||
|
||||
let items = {
|
||||
let (mut msg_queue, mut sending_messages) = match (
|
||||
self.message_queue.try_lock(),
|
||||
|
|
@ -301,6 +292,12 @@ where
|
|||
(Some(msg_queue), Some(sending_messages)) => (msg_queue, sending_messages),
|
||||
_ => {
|
||||
// If acquire the lock failed, try later
|
||||
if cfg!(feature = "sync_verbose_log") {
|
||||
trace!(
|
||||
"{}: failed to acquire the lock of the sink, retry later",
|
||||
self.object.object_id
|
||||
);
|
||||
}
|
||||
retry_later(Arc::downgrade(&self.notifier));
|
||||
return;
|
||||
},
|
||||
|
|
@ -543,7 +540,6 @@ impl SyncTimestamp {
|
|||
}
|
||||
|
||||
pub(crate) struct CollabSinkState {
|
||||
pub(crate) pause: AtomicBool,
|
||||
pub(crate) latest_sync: SyncTimestamp,
|
||||
pub(crate) pause_ping: AtomicBool,
|
||||
pub(crate) id_counter: DefaultMsgIdCounter,
|
||||
|
|
@ -551,10 +547,9 @@ pub(crate) struct CollabSinkState {
|
|||
}
|
||||
|
||||
impl CollabSinkState {
|
||||
fn new(pause: bool) -> Self {
|
||||
fn new() -> Self {
|
||||
let msg_id_counter = DefaultMsgIdCounter::new();
|
||||
CollabSinkState {
|
||||
pause: AtomicBool::new(pause),
|
||||
latest_sync: SyncTimestamp::new(),
|
||||
pause_ping: AtomicBool::new(false),
|
||||
id_counter: msg_id_counter,
|
||||
|
|
@ -569,7 +564,6 @@ pub enum CollabSyncState {
|
|||
Syncing,
|
||||
/// All the messages are synced to the remote.
|
||||
Finished,
|
||||
Pause,
|
||||
}
|
||||
|
||||
impl CollabSyncState {
|
||||
|
|
|
|||
|
|
@ -162,6 +162,7 @@ where
|
|||
match msg.msg_id() {
|
||||
None => {
|
||||
if let ServerCollabMessage::ServerBroadcast(ref data) = msg {
|
||||
seq_num_counter.check_broadcast_contiguous(&object.object_id, data.seq_num)?;
|
||||
seq_num_counter.store_broadcast_seq_num(data.seq_num);
|
||||
}
|
||||
Self::process_message_follow_protocol(&object.object_id, &msg, collab, sink).await?;
|
||||
|
|
@ -278,9 +279,16 @@ pub struct SeqNumCounter {
|
|||
|
||||
impl SeqNumCounter {
|
||||
pub fn store_ack_seq_num(&self, seq_num: u32) -> u32 {
|
||||
// If the broadcast sequence counter is 0, set it to the current sequence number.
|
||||
if self.broadcast_seq_counter.load(Ordering::SeqCst) == 0 {
|
||||
self.broadcast_seq_counter.store(seq_num, Ordering::SeqCst);
|
||||
}
|
||||
|
||||
match self
|
||||
.ack_seq_counter
|
||||
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
|
||||
// Check if the sequence number is less than the current one. A lower sequence number can indicate
|
||||
// that the server has been restarted, or the collaboration group has been reinitialized.
|
||||
if seq_num >= current {
|
||||
Some(seq_num)
|
||||
} else {
|
||||
|
|
@ -295,19 +303,52 @@ impl SeqNumCounter {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn store_broadcast_seq_num(&self, seq_num: u32) -> u32 {
|
||||
self
|
||||
pub fn store_broadcast_seq_num(&self, broadcast_seq_num: u32) -> u32 {
|
||||
match self
|
||||
.broadcast_seq_counter
|
||||
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_current| Some(seq_num))
|
||||
.unwrap()
|
||||
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
|
||||
// Check if the sequence number is less than the current one. A lower sequence number can indicate
|
||||
// that the server has been restarted, or the collaboration group has been reinitialized.
|
||||
if broadcast_seq_num >= current {
|
||||
Some(broadcast_seq_num)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}) {
|
||||
Ok(prev) => prev,
|
||||
Err(prev) => {
|
||||
self
|
||||
.broadcast_seq_counter
|
||||
.store(broadcast_seq_num, Ordering::SeqCst);
|
||||
prev
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_ack_seq_num(&self) -> u32 {
|
||||
self.ack_seq_counter.load(Ordering::SeqCst)
|
||||
}
|
||||
/// Checks if the given broadcast sequence number is contiguous with the current sequence.
|
||||
///
|
||||
/// Verifies that the broadcast sequence number provided (`broadcast_seq_num`) follows directly after
|
||||
/// the last known sequence number stored in the system (`current`).
|
||||
///
|
||||
/// If there is a gap between the `broadcast_seq_num` and `current`, it indicates that some
|
||||
/// messages may have been missed, and an error is returned.
|
||||
pub fn check_broadcast_contiguous(
|
||||
&self,
|
||||
object_id: &str,
|
||||
broadcast_seq_num: u32,
|
||||
) -> Result<(), SyncError> {
|
||||
let current = self.broadcast_seq_counter.load(Ordering::SeqCst);
|
||||
if current > 0 && broadcast_seq_num > current + 1 {
|
||||
return Err(SyncError::MissUpdates {
|
||||
state_vector_v1: None,
|
||||
reason: format!(
|
||||
"{} broadcast is not contiguous, current:{}, broadcast:{}",
|
||||
object_id, current, broadcast_seq_num,
|
||||
),
|
||||
});
|
||||
}
|
||||
|
||||
pub fn get_broadcast_seq_num(&self) -> u32 {
|
||||
self.broadcast_seq_counter.load(Ordering::SeqCst)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn check_ack_broadcast_contiguous(&self, object_id: &str) -> Result<(), SyncError> {
|
||||
|
|
@ -322,6 +363,13 @@ impl SeqNumCounter {
|
|||
|
||||
if old + 1 >= 2 {
|
||||
self.miss_update_counter.store(0, Ordering::SeqCst);
|
||||
// Mark the broadcast sequence number as ack seq_num because a MissUpdates error triggers
|
||||
// an initialization synchronization. After this initial sync, the ack and broadcast sequence
|
||||
// numbers are expected to align, ensuring that all updates are synchronized.
|
||||
self
|
||||
.broadcast_seq_counter
|
||||
.store(ack_seq_num, Ordering::SeqCst);
|
||||
|
||||
return Err(SyncError::MissUpdates {
|
||||
state_vector_v1: None,
|
||||
reason: format!(
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ impl CollabStateCheckRunner {
|
|||
break;
|
||||
},
|
||||
Some(message_queue) => {
|
||||
if state.pause.load(Ordering::SeqCst) {
|
||||
if state.pause_ping.load(Ordering::SeqCst) {
|
||||
continue;
|
||||
} else {
|
||||
// Skip this iteration if a message was sent recently, within the specified duration.
|
||||
|
|
|
|||
|
|
@ -58,7 +58,6 @@ where
|
|||
sink_config: SinkConfig,
|
||||
stream: Stream,
|
||||
channel: Option<Arc<C>>,
|
||||
pause: bool,
|
||||
mut ws_connect_state: WSConnectStateReceiver,
|
||||
) -> Self {
|
||||
let sync_queue = SyncControl::new(
|
||||
|
|
@ -68,7 +67,6 @@ where
|
|||
sink_config,
|
||||
stream,
|
||||
collab.clone(),
|
||||
pause,
|
||||
);
|
||||
|
||||
if let Some(local_collab) = collab.upgrade() {
|
||||
|
|
@ -100,8 +98,8 @@ where
|
|||
if let (Some(local_collab), Some(sync_queue)) =
|
||||
(weak_local_collab.upgrade(), weak_sync_queue.upgrade())
|
||||
{
|
||||
sync_queue.resume();
|
||||
if let Some(local_collab) = local_collab.try_lock() {
|
||||
sync_queue.resume();
|
||||
let _ = sync_queue.init_sync(&local_collab, SyncReason::NetworkResume);
|
||||
}
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -59,7 +59,6 @@ where
|
|||
sink_config: SinkConfig,
|
||||
stream: Stream,
|
||||
collab: Weak<MutexCollab>,
|
||||
pause: bool,
|
||||
) -> Self {
|
||||
let protocol = ClientSyncProtocol;
|
||||
let (notifier, notifier_rx) = watch::channel(SinkSignal::Proceed);
|
||||
|
|
@ -74,7 +73,6 @@ where
|
|||
notifier,
|
||||
sync_state_tx.clone(),
|
||||
sink_config,
|
||||
pause,
|
||||
));
|
||||
af_spawn(CollabSinkRunner::run(Arc::downgrade(&sink), notifier_rx));
|
||||
|
||||
|
|
|
|||
|
|
@ -119,7 +119,7 @@ async fn send_batch_message(
|
|||
}
|
||||
},
|
||||
Err(err) => {
|
||||
error!("Failed to RealtimeMessage: {}", err);
|
||||
error!("Failed to encode realtime message: {}", err);
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue