diff --git a/libs/client-api/src/ws/msg_queue.rs b/libs/client-api/src/ws/msg_queue.rs index 647f47b9..1bdf6095 100644 --- a/libs/client-api/src/ws/msg_queue.rs +++ b/libs/client-api/src/ws/msg_queue.rs @@ -2,7 +2,7 @@ use client_websocket::Message; use collab_rt_entity::collab_msg::{ClientCollabMessage, MsgId}; use collab_rt_entity::message::RealtimeMessage; use std::collections::{BinaryHeap, HashMap, HashSet}; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use std::time::Duration; use tokio::sync::{mpsc, Mutex}; use tokio::time::{sleep_until, Instant}; @@ -62,33 +62,18 @@ impl AggregateMessageQueue { _ = rx.recv() => break, _ = sleep_until(next_tick) => { if let Some(queue) = weak_queue.upgrade() { - let (did_sent_seen_ids, messages_map) = next_batch_message(10, maximum_payload_size, &queue).await; - if messages_map.is_empty() { - continue; + let (num_init_sync, num_messages) = handle_tick(&sender, &queue, maximum_payload_size, weak_seen_ids.clone()).await; + if num_messages == 0 { + next_tick = Instant::now() + Duration::from_secs(4); + } else { + // To determine the next interval dynamically, consider factors such as the number of messages sent, + // their total size, and the current network type. This approach allows for more nuanced interval + // adjustments, optimizing for efficiency and responsiveness under varying conditions. + next_tick = calculate_next_tick(num_init_sync, interval_duration); } - - if cfg!(debug_assertions) { - log_message_map(&messages_map); - } - - // Send messages to server - send_batch_message(&sender, messages_map).await; - - // after sending messages, remove seen_ids - let num_init_sync = did_sent_seen_ids.iter().filter(|id| id.is_init_sync).count(); - if let Some(seen_ids) = weak_seen_ids.upgrade() { - let mut seen_lock = seen_ids.lock().await; - seen_lock.retain(|id| !did_sent_seen_ids.contains(id)); - } - - // To determine the next interval dynamically, consider factors such as the number of messages sent, - // their total size, and the current network type. This approach allows for more nuanced interval - // adjustments, optimizing for efficiency and responsiveness under varying conditions. - next_tick = calculate_next_tick(num_init_sync, interval_duration); } else { break; } - } } } @@ -96,6 +81,38 @@ impl AggregateMessageQueue { } } +async fn handle_tick( + sender: &AggregateMessagesSender, + queue: &Arc>>, + maximum_payload_size: usize, + weak_seen_ids: Weak>>, +) -> (usize, usize) { + let (did_sent_seen_ids, messages_map) = next_batch_message(10, maximum_payload_size, queue).await; + if messages_map.is_empty() { + return (0, 0); + } + + if cfg!(debug_assertions) { + log_message_map(&messages_map); + } + + // Send messages to server + send_batch_message(sender, messages_map).await; + + // after sending messages, remove seen_ids + let num_init_sync = did_sent_seen_ids + .iter() + .filter(|id| id.is_init_sync) + .count(); + let num_messages = did_sent_seen_ids.len(); + + if let Some(seen_ids) = weak_seen_ids.upgrade() { + let mut seen_lock = seen_ids.lock().await; + seen_lock.retain(|id| !did_sent_seen_ids.contains(id)); + } + (num_init_sync, num_messages) +} + #[inline] async fn send_batch_message( sender: &AggregateMessagesSender,