chore: lower tick interval when message is empty (#416)

* chore: lower tick interval when message is empty

* chore: update logs
This commit is contained in:
Nathan.fooo 2024-03-24 21:34:19 +08:00 committed by GitHub
parent d0c0d7832c
commit 51ecdd664e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 41 additions and 24 deletions

View File

@ -2,7 +2,7 @@ use client_websocket::Message;
use collab_rt_entity::collab_msg::{ClientCollabMessage, MsgId};
use collab_rt_entity::message::RealtimeMessage;
use std::collections::{BinaryHeap, HashMap, HashSet};
use std::sync::Arc;
use std::sync::{Arc, Weak};
use std::time::Duration;
use tokio::sync::{mpsc, Mutex};
use tokio::time::{sleep_until, Instant};
@ -62,33 +62,18 @@ impl AggregateMessageQueue {
_ = rx.recv() => break,
_ = sleep_until(next_tick) => {
if let Some(queue) = weak_queue.upgrade() {
let (did_sent_seen_ids, messages_map) = next_batch_message(10, maximum_payload_size, &queue).await;
if messages_map.is_empty() {
continue;
let (num_init_sync, num_messages) = handle_tick(&sender, &queue, maximum_payload_size, weak_seen_ids.clone()).await;
if num_messages == 0 {
next_tick = Instant::now() + Duration::from_secs(4);
} else {
// To determine the next interval dynamically, consider factors such as the number of messages sent,
// their total size, and the current network type. This approach allows for more nuanced interval
// adjustments, optimizing for efficiency and responsiveness under varying conditions.
next_tick = calculate_next_tick(num_init_sync, interval_duration);
}
if cfg!(debug_assertions) {
log_message_map(&messages_map);
}
// Send messages to server
send_batch_message(&sender, messages_map).await;
// after sending messages, remove seen_ids
let num_init_sync = did_sent_seen_ids.iter().filter(|id| id.is_init_sync).count();
if let Some(seen_ids) = weak_seen_ids.upgrade() {
let mut seen_lock = seen_ids.lock().await;
seen_lock.retain(|id| !did_sent_seen_ids.contains(id));
}
// To determine the next interval dynamically, consider factors such as the number of messages sent,
// their total size, and the current network type. This approach allows for more nuanced interval
// adjustments, optimizing for efficiency and responsiveness under varying conditions.
next_tick = calculate_next_tick(num_init_sync, interval_duration);
} else {
break;
}
}
}
}
@ -96,6 +81,38 @@ impl AggregateMessageQueue {
}
}
async fn handle_tick(
sender: &AggregateMessagesSender,
queue: &Arc<Mutex<BinaryHeap<ClientCollabMessage>>>,
maximum_payload_size: usize,
weak_seen_ids: Weak<Mutex<HashSet<SeenId>>>,
) -> (usize, usize) {
let (did_sent_seen_ids, messages_map) = next_batch_message(10, maximum_payload_size, queue).await;
if messages_map.is_empty() {
return (0, 0);
}
if cfg!(debug_assertions) {
log_message_map(&messages_map);
}
// Send messages to server
send_batch_message(sender, messages_map).await;
// after sending messages, remove seen_ids
let num_init_sync = did_sent_seen_ids
.iter()
.filter(|id| id.is_init_sync)
.count();
let num_messages = did_sent_seen_ids.len();
if let Some(seen_ids) = weak_seen_ids.upgrade() {
let mut seen_lock = seen_ids.lock().await;
seen_lock.retain(|id| !did_sent_seen_ids.contains(id));
}
(num_init_sync, num_messages)
}
#[inline]
async fn send_batch_message(
sender: &AggregateMessagesSender,