From db23b64229ee9ba0178143dda382a30342c832f1 Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Sun, 21 Apr 2024 19:31:24 +0800 Subject: [PATCH] chore: broadcast check (#485) * chore: check broadcast continuous * chore: update log * chore: update log --- libs/client-api-test-util/src/test_client.rs | 2 - .../client-api/src/collab_sync/collab_sink.rs | 26 +++----- .../src/collab_sync/collab_stream.rs | 66 ++++++++++++++++--- .../src/collab_sync/period_state_check.rs | 2 +- libs/client-api/src/collab_sync/plugin.rs | 4 +- .../src/collab_sync/sync_control.rs | 2 - libs/client-api/src/ws/msg_queue.rs | 2 +- 7 files changed, 70 insertions(+), 34 deletions(-) diff --git a/libs/client-api-test-util/src/test_client.rs b/libs/client-api-test-util/src/test_client.rs index 7df6c308..dcafe7d1 100644 --- a/libs/client-api-test-util/src/test_client.rs +++ b/libs/client-api-test-util/src/test_client.rs @@ -542,7 +542,6 @@ impl TestClient { SinkConfig::default(), stream, Some(handler), - !self.ws_client.is_connected(), ws_connect_state, ); collab.lock().add_plugin(Box::new(sync_plugin)); @@ -613,7 +612,6 @@ impl TestClient { SinkConfig::default(), stream, Some(handler), - !self.ws_client.is_connected(), ws_connect_state, ); diff --git a/libs/client-api/src/collab_sync/collab_sink.rs b/libs/client-api/src/collab_sync/collab_sink.rs index 698f023a..8983bd78 100644 --- a/libs/client-api/src/collab_sync/collab_sink.rs +++ b/libs/client-api/src/collab_sync/collab_sink.rs @@ -62,13 +62,12 @@ where notifier: watch::Sender, sync_state_tx: broadcast::Sender, config: SinkConfig, - pause: bool, ) -> Self { let notifier = Arc::new(notifier); let sender = Arc::new(Mutex::new(sink)); let message_queue = Arc::new(parking_lot::Mutex::new(SinkQueue::new())); let sending_messages = Arc::new(parking_lot::Mutex::new(HashSet::new())); - let state = Arc::new(CollabSinkState::new(pause)); + let state = Arc::new(CollabSinkState::new()); let mut interval = interval(SEND_INTERVAL); let weak_sending_messages = Arc::downgrade(&sending_messages); @@ -152,6 +151,7 @@ where /// message immediately. pub fn queue_init_sync(&self, f: impl FnOnce(MsgId) -> ClientCollabMessage) { let _ = self.sync_state_tx.send(CollabSyncState::Syncing); + self.clear(); // When the client is connected, remove all pending messages and send the init message. let mut msg_queue = self.message_queue.lock(); @@ -200,8 +200,6 @@ where } self.state.pause_ping.store(true, Ordering::SeqCst); - self.state.pause.store(true, Ordering::SeqCst); - let _ = self.sync_state_tx.send(CollabSyncState::Pause); } pub fn resume(&self) { @@ -210,7 +208,6 @@ where } self.state.pause_ping.store(false, Ordering::SeqCst); - self.state.pause.store(false, Ordering::SeqCst); } /// Notify the sink to process the next message and mark the current message as done. @@ -225,7 +222,7 @@ where let income_message_id = msg_id; let mut sending_messages = self.sending_messages.lock(); - // if the message id is not in the flying messages, it means the message is invalid. + // if the message id is not in the sending messages, it means the message is invalid. if !sending_messages.contains(&income_message_id) { return Ok(false); } @@ -287,12 +284,6 @@ where } async fn process_next_msg(&self) { - if self.state.pause.load(Ordering::SeqCst) { - // If the sink is paused, sleep for a while and try later. - sleep(Duration::from_secs(2)).await; - return; - } - let items = { let (mut msg_queue, mut sending_messages) = match ( self.message_queue.try_lock(), @@ -301,6 +292,12 @@ where (Some(msg_queue), Some(sending_messages)) => (msg_queue, sending_messages), _ => { // If acquire the lock failed, try later + if cfg!(feature = "sync_verbose_log") { + trace!( + "{}: failed to acquire the lock of the sink, retry later", + self.object.object_id + ); + } retry_later(Arc::downgrade(&self.notifier)); return; }, @@ -543,7 +540,6 @@ impl SyncTimestamp { } pub(crate) struct CollabSinkState { - pub(crate) pause: AtomicBool, pub(crate) latest_sync: SyncTimestamp, pub(crate) pause_ping: AtomicBool, pub(crate) id_counter: DefaultMsgIdCounter, @@ -551,10 +547,9 @@ pub(crate) struct CollabSinkState { } impl CollabSinkState { - fn new(pause: bool) -> Self { + fn new() -> Self { let msg_id_counter = DefaultMsgIdCounter::new(); CollabSinkState { - pause: AtomicBool::new(pause), latest_sync: SyncTimestamp::new(), pause_ping: AtomicBool::new(false), id_counter: msg_id_counter, @@ -569,7 +564,6 @@ pub enum CollabSyncState { Syncing, /// All the messages are synced to the remote. Finished, - Pause, } impl CollabSyncState { diff --git a/libs/client-api/src/collab_sync/collab_stream.rs b/libs/client-api/src/collab_sync/collab_stream.rs index 8353416c..8a3ef89e 100644 --- a/libs/client-api/src/collab_sync/collab_stream.rs +++ b/libs/client-api/src/collab_sync/collab_stream.rs @@ -162,6 +162,7 @@ where match msg.msg_id() { None => { if let ServerCollabMessage::ServerBroadcast(ref data) = msg { + seq_num_counter.check_broadcast_contiguous(&object.object_id, data.seq_num)?; seq_num_counter.store_broadcast_seq_num(data.seq_num); } Self::process_message_follow_protocol(&object.object_id, &msg, collab, sink).await?; @@ -278,9 +279,16 @@ pub struct SeqNumCounter { impl SeqNumCounter { pub fn store_ack_seq_num(&self, seq_num: u32) -> u32 { + // If the broadcast sequence counter is 0, set it to the current sequence number. + if self.broadcast_seq_counter.load(Ordering::SeqCst) == 0 { + self.broadcast_seq_counter.store(seq_num, Ordering::SeqCst); + } + match self .ack_seq_counter .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| { + // Check if the sequence number is less than the current one. A lower sequence number can indicate + // that the server has been restarted, or the collaboration group has been reinitialized. if seq_num >= current { Some(seq_num) } else { @@ -295,19 +303,52 @@ impl SeqNumCounter { } } - pub fn store_broadcast_seq_num(&self, seq_num: u32) -> u32 { - self + pub fn store_broadcast_seq_num(&self, broadcast_seq_num: u32) -> u32 { + match self .broadcast_seq_counter - .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_current| Some(seq_num)) - .unwrap() + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| { + // Check if the sequence number is less than the current one. A lower sequence number can indicate + // that the server has been restarted, or the collaboration group has been reinitialized. + if broadcast_seq_num >= current { + Some(broadcast_seq_num) + } else { + None + } + }) { + Ok(prev) => prev, + Err(prev) => { + self + .broadcast_seq_counter + .store(broadcast_seq_num, Ordering::SeqCst); + prev + }, + } } - pub fn get_ack_seq_num(&self) -> u32 { - self.ack_seq_counter.load(Ordering::SeqCst) - } + /// Checks if the given broadcast sequence number is contiguous with the current sequence. + /// + /// Verifies that the broadcast sequence number provided (`broadcast_seq_num`) follows directly after + /// the last known sequence number stored in the system (`current`). + /// + /// If there is a gap between the `broadcast_seq_num` and `current`, it indicates that some + /// messages may have been missed, and an error is returned. + pub fn check_broadcast_contiguous( + &self, + object_id: &str, + broadcast_seq_num: u32, + ) -> Result<(), SyncError> { + let current = self.broadcast_seq_counter.load(Ordering::SeqCst); + if current > 0 && broadcast_seq_num > current + 1 { + return Err(SyncError::MissUpdates { + state_vector_v1: None, + reason: format!( + "{} broadcast is not contiguous, current:{}, broadcast:{}", + object_id, current, broadcast_seq_num, + ), + }); + } - pub fn get_broadcast_seq_num(&self) -> u32 { - self.broadcast_seq_counter.load(Ordering::SeqCst) + Ok(()) } pub fn check_ack_broadcast_contiguous(&self, object_id: &str) -> Result<(), SyncError> { @@ -322,6 +363,13 @@ impl SeqNumCounter { if old + 1 >= 2 { self.miss_update_counter.store(0, Ordering::SeqCst); + // Mark the broadcast sequence number as ack seq_num because a MissUpdates error triggers + // an initialization synchronization. After this initial sync, the ack and broadcast sequence + // numbers are expected to align, ensuring that all updates are synchronized. + self + .broadcast_seq_counter + .store(ack_seq_num, Ordering::SeqCst); + return Err(SyncError::MissUpdates { state_vector_v1: None, reason: format!( diff --git a/libs/client-api/src/collab_sync/period_state_check.rs b/libs/client-api/src/collab_sync/period_state_check.rs index 884e2a11..11fa64e0 100644 --- a/libs/client-api/src/collab_sync/period_state_check.rs +++ b/libs/client-api/src/collab_sync/period_state_check.rs @@ -43,7 +43,7 @@ impl CollabStateCheckRunner { break; }, Some(message_queue) => { - if state.pause.load(Ordering::SeqCst) { + if state.pause_ping.load(Ordering::SeqCst) { continue; } else { // Skip this iteration if a message was sent recently, within the specified duration. diff --git a/libs/client-api/src/collab_sync/plugin.rs b/libs/client-api/src/collab_sync/plugin.rs index f05784dc..10898adb 100644 --- a/libs/client-api/src/collab_sync/plugin.rs +++ b/libs/client-api/src/collab_sync/plugin.rs @@ -58,7 +58,6 @@ where sink_config: SinkConfig, stream: Stream, channel: Option>, - pause: bool, mut ws_connect_state: WSConnectStateReceiver, ) -> Self { let sync_queue = SyncControl::new( @@ -68,7 +67,6 @@ where sink_config, stream, collab.clone(), - pause, ); if let Some(local_collab) = collab.upgrade() { @@ -100,8 +98,8 @@ where if let (Some(local_collab), Some(sync_queue)) = (weak_local_collab.upgrade(), weak_sync_queue.upgrade()) { + sync_queue.resume(); if let Some(local_collab) = local_collab.try_lock() { - sync_queue.resume(); let _ = sync_queue.init_sync(&local_collab, SyncReason::NetworkResume); } } else { diff --git a/libs/client-api/src/collab_sync/sync_control.rs b/libs/client-api/src/collab_sync/sync_control.rs index 7fc20bf4..bdb6d092 100644 --- a/libs/client-api/src/collab_sync/sync_control.rs +++ b/libs/client-api/src/collab_sync/sync_control.rs @@ -59,7 +59,6 @@ where sink_config: SinkConfig, stream: Stream, collab: Weak, - pause: bool, ) -> Self { let protocol = ClientSyncProtocol; let (notifier, notifier_rx) = watch::channel(SinkSignal::Proceed); @@ -74,7 +73,6 @@ where notifier, sync_state_tx.clone(), sink_config, - pause, )); af_spawn(CollabSinkRunner::run(Arc::downgrade(&sink), notifier_rx)); diff --git a/libs/client-api/src/ws/msg_queue.rs b/libs/client-api/src/ws/msg_queue.rs index 4a22cd7a..81cec527 100644 --- a/libs/client-api/src/ws/msg_queue.rs +++ b/libs/client-api/src/ws/msg_queue.rs @@ -119,7 +119,7 @@ async fn send_batch_message( } }, Err(err) => { - error!("Failed to RealtimeMessage: {}", err); + error!("Failed to encode realtime message: {}", err); }, } }