chore: send after prev finish (#385)

This commit is contained in:
Nathan.fooo 2024-03-14 15:13:21 +08:00 committed by GitHub
parent d0bdbb795c
commit e408073448
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 20 additions and 26 deletions

View File

@ -149,26 +149,9 @@ where
let mut msg_queue = self.message_queue.lock();
let msg_id = self.msg_id_counter.next();
let new_msg = f(msg_id);
let mut requeue_items = Vec::with_capacity(msg_queue.len());
if new_msg.is_server_init_sync() {
// When the message is a server initialization sync, indicating the absence of a client
// initialization sync in the queue, it means the server initialization sync contains the
// [Collab]'s latest updates. Therefore, we can clear the update sync messages in the queue.
while let Some(item) = msg_queue.pop() {
if !item.message().is_update_sync() {
let msg_id = self.msg_id_counter.next();
let mut msg = item.into_message();
msg.set_msg_id(msg_id);
requeue_items.push(QueueItem::new(msg, msg_id));
}
}
self.flying_messages.lock().clear();
}
trace!("🔥 queue {}", new_msg);
msg_queue.push_msg(msg_id, new_msg);
msg_queue.extend(requeue_items);
// msg_queue.extend(requeue_items);
drop(msg_queue);
// Notify the sink to process the next message after 500ms.
let _ = self
@ -268,7 +251,7 @@ where
}
trace!(
"{:?}: Pending messages:{} ids:{}",
"{:?}: pending count:{} ids:{}",
self.object.object_id,
message_queue.len(),
message_queue
@ -390,7 +373,12 @@ where
if cfg!(debug_assertions) {
for (msg_id, merged_ids) in merged_ids {
trace!("merged {:?} messages into: {:?}", merged_ids, msg_id);
trace!(
"{}: merged {:?} messages into: {:?}",
self.object.object_id,
merged_ids,
msg_id
);
}
}
msg_queue.extend(items);
@ -420,9 +408,16 @@ where
}
if flying_messages.contains(&item.msg_id()) {
trace!("{} skip sending message: {:?}", object_id, item.msg_id());
trace!(
"{} message:{} is syncing to server, stop sync more messages",
object_id,
item.msg_id()
);
// because the messages in msg_queue are ordered by priority, so if the message is in the
// flying messages, it means the message is sending to the remote. So don't send the following
// messages.
requeue_items.push(item);
continue;
break;
}
let is_init_sync = item.message().is_client_init_sync();
@ -436,7 +431,7 @@ where
if !requeue_items.is_empty() {
trace!(
"{} requeue items: ids=>{}",
"requeue {} messages: ids=>{}",
object_id,
requeue_items
.iter()
@ -446,7 +441,6 @@ where
);
}
msg_queue.extend(requeue_items);
let message_ids = items.iter().map(|item| item.msg_id()).collect::<Vec<_>>();
flying_messages.extend(message_ids);
items

View File

@ -812,8 +812,8 @@ impl Ord for ClientCollabMessage {
},
(ClientCollabMessage::ClientInitSync { .. }, _) => Ordering::Greater,
(_, ClientCollabMessage::ClientInitSync { .. }) => Ordering::Less,
(ClientCollabMessage::ServerInitSync(left), ClientCollabMessage::ServerInitSync(right)) => {
left.msg_id.cmp(&right.msg_id).reverse()
(ClientCollabMessage::ServerInitSync(_left), ClientCollabMessage::ServerInitSync(_right)) => {
Ordering::Equal
},
(ClientCollabMessage::ServerInitSync { .. }, _) => Ordering::Greater,
(_, ClientCollabMessage::ServerInitSync { .. }) => Ordering::Less,