From e408073448ee08816f6e62f86a7bedb04539aec6 Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Thu, 14 Mar 2024 15:13:21 +0800 Subject: [PATCH] chore: send after prev finish (#385) --- libs/client-api/src/collab_sync/sink.rs | 42 +++++++++++-------------- libs/realtime-entity/src/collab_msg.rs | 4 +-- 2 files changed, 20 insertions(+), 26 deletions(-) diff --git a/libs/client-api/src/collab_sync/sink.rs b/libs/client-api/src/collab_sync/sink.rs index 19927556..61662c3a 100644 --- a/libs/client-api/src/collab_sync/sink.rs +++ b/libs/client-api/src/collab_sync/sink.rs @@ -149,26 +149,9 @@ where let mut msg_queue = self.message_queue.lock(); let msg_id = self.msg_id_counter.next(); let new_msg = f(msg_id); - - let mut requeue_items = Vec::with_capacity(msg_queue.len()); - if new_msg.is_server_init_sync() { - // When the message is a server initialization sync, indicating the absence of a client - // initialization sync in the queue, it means the server initialization sync contains the - // [Collab]'s latest updates. Therefore, we can clear the update sync messages in the queue. - while let Some(item) = msg_queue.pop() { - if !item.message().is_update_sync() { - let msg_id = self.msg_id_counter.next(); - let mut msg = item.into_message(); - msg.set_msg_id(msg_id); - requeue_items.push(QueueItem::new(msg, msg_id)); - } - } - self.flying_messages.lock().clear(); - } - trace!("🔥 queue {}", new_msg); msg_queue.push_msg(msg_id, new_msg); - msg_queue.extend(requeue_items); + // msg_queue.extend(requeue_items); drop(msg_queue); // Notify the sink to process the next message after 500ms. let _ = self @@ -268,7 +251,7 @@ where } trace!( - "{:?}: Pending messages:{} ids:{}", + "{:?}: pending count:{} ids:{}", self.object.object_id, message_queue.len(), message_queue @@ -390,7 +373,12 @@ where if cfg!(debug_assertions) { for (msg_id, merged_ids) in merged_ids { - trace!("merged {:?} messages into: {:?}", merged_ids, msg_id); + trace!( + "{}: merged {:?} messages into: {:?}", + self.object.object_id, + merged_ids, + msg_id + ); } } msg_queue.extend(items); @@ -420,9 +408,16 @@ where } if flying_messages.contains(&item.msg_id()) { - trace!("{} skip sending message: {:?}", object_id, item.msg_id()); + trace!( + "{} message:{} is syncing to server, stop sync more messages", + object_id, + item.msg_id() + ); + // because the messages in msg_queue are ordered by priority, so if the message is in the + // flying messages, it means the message is sending to the remote. So don't send the following + // messages. requeue_items.push(item); - continue; + break; } let is_init_sync = item.message().is_client_init_sync(); @@ -436,7 +431,7 @@ where if !requeue_items.is_empty() { trace!( - "{} requeue items: ids=>{}", + "requeue {} messages: ids=>{}", object_id, requeue_items .iter() @@ -446,7 +441,6 @@ where ); } msg_queue.extend(requeue_items); - let message_ids = items.iter().map(|item| item.msg_id()).collect::>(); flying_messages.extend(message_ids); items diff --git a/libs/realtime-entity/src/collab_msg.rs b/libs/realtime-entity/src/collab_msg.rs index 56618021..7ecfe264 100644 --- a/libs/realtime-entity/src/collab_msg.rs +++ b/libs/realtime-entity/src/collab_msg.rs @@ -812,8 +812,8 @@ impl Ord for ClientCollabMessage { }, (ClientCollabMessage::ClientInitSync { .. }, _) => Ordering::Greater, (_, ClientCollabMessage::ClientInitSync { .. }) => Ordering::Less, - (ClientCollabMessage::ServerInitSync(left), ClientCollabMessage::ServerInitSync(right)) => { - left.msg_id.cmp(&right.msg_id).reverse() + (ClientCollabMessage::ServerInitSync(_left), ClientCollabMessage::ServerInitSync(_right)) => { + Ordering::Equal }, (ClientCollabMessage::ServerInitSync { .. }, _) => Ordering::Greater, (_, ClientCollabMessage::ServerInitSync { .. }) => Ordering::Less,