From d0bdbb795ca18a68144d0587496e6dce38692839 Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Thu, 14 Mar 2024 11:27:18 +0800 Subject: [PATCH] chore: add constraits for init sync (#384) --- libs/client-api/src/ws/msg_queue.rs | 19 ++++++++++++++++++- libs/realtime-entity/src/collab_msg.rs | 4 ++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/libs/client-api/src/ws/msg_queue.rs b/libs/client-api/src/ws/msg_queue.rs index 5b4ae62a..cd4dbafd 100644 --- a/libs/client-api/src/ws/msg_queue.rs +++ b/libs/client-api/src/ws/msg_queue.rs @@ -54,7 +54,7 @@ impl AggregateMessageQueue { _ = rx.recv() => break, _ = interval.tick() => { if let Some(queue) = weak_queue.upgrade() { - let messages_map = next_batch_message(maximum_payload_size, &queue).await; + let messages_map = next_batch_message(10, maximum_payload_size, &queue).await; if messages_map.is_empty() { continue; } @@ -91,20 +91,37 @@ async fn send_batch_message( } } +/// Gathers a batch of messages up to certain limits. +/// +/// This function collects messages from a shared priority queue until reaching either the maximum number +/// of initial sync messages or the maximum payload size. It groups messages by their object ID. +/// +/// # Arguments +/// - `maximum_init_sync`: Max number of initial sync messages allowed in a batch. +/// - `maximum_payload_size`: Max total size of messages (in bytes) allowed in a batch. #[inline] async fn next_batch_message( + maximum_init_sync: usize, maximum_payload_size: usize, queue: &Arc>>, ) -> HashMap> { let mut messages_map = HashMap::new(); let mut size = 0; + let mut init_sync_count = 0; let mut lock_guard = queue.lock().await; while let Some(msg) = lock_guard.pop() { size += msg.size(); + if msg.is_init_sync() { + init_sync_count += 1; + } messages_map .entry(msg.object_id().to_string()) .or_insert(vec![]) .push(msg); + + if init_sync_count > maximum_init_sync { + break; + } if size > maximum_payload_size { break; } diff --git a/libs/realtime-entity/src/collab_msg.rs b/libs/realtime-entity/src/collab_msg.rs index 0e973637..56618021 100644 --- a/libs/realtime-entity/src/collab_msg.rs +++ b/libs/realtime-entity/src/collab_msg.rs @@ -670,6 +670,10 @@ impl ClientCollabMessage { ClientCollabMessage::ClientAwarenessSync(data) => data.msg_id, } } + + pub fn is_init_sync(&self) -> bool { + matches!(self, ClientCollabMessage::ClientInitSync { .. }) + } } impl Display for ClientCollabMessage {