diff --git a/libs/client-api/src/collab_sync/sink.rs b/libs/client-api/src/collab_sync/sink.rs index 4279aa8e..fd0efa8a 100644 --- a/libs/client-api/src/collab_sync/sink.rs +++ b/libs/client-api/src/collab_sync/sink.rs @@ -212,7 +212,6 @@ where pub fn resume(&self) { self.pause.store(false, Ordering::SeqCst); - self.notify(); } /// Notify the sink to process the next message and mark the current message as done. diff --git a/libs/client-api/src/ws/msg_queue.rs b/libs/client-api/src/ws/msg_queue.rs index 87cec227..647f47b9 100644 --- a/libs/client-api/src/ws/msg_queue.rs +++ b/libs/client-api/src/ws/msg_queue.rs @@ -1,11 +1,11 @@ use client_websocket::Message; -use collab_rt_entity::collab_msg::ClientCollabMessage; +use collab_rt_entity::collab_msg::{ClientCollabMessage, MsgId}; use collab_rt_entity::message::RealtimeMessage; -use std::collections::{BinaryHeap, HashMap}; +use std::collections::{BinaryHeap, HashMap, HashSet}; use std::sync::Arc; use std::time::Duration; use tokio::sync::{mpsc, Mutex}; -use tokio::time::interval; +use tokio::time::{sleep_until, Instant}; use tracing::{debug, error, trace}; pub type AggregateMessagesSender = mpsc::Sender; @@ -15,6 +15,7 @@ pub struct AggregateMessageQueue { maximum_payload_size: usize, queue: Arc>>, stop_tx: Mutex>>, + seen_ids: Arc>>, } impl AggregateMessageQueue { @@ -23,18 +24,24 @@ impl AggregateMessageQueue { maximum_payload_size, queue: Default::default(), stop_tx: Default::default(), + seen_ids: Arc::new(Default::default()), } } pub async fn push(&self, msg: Vec) { - let mut queue_lock_guard = self.queue.lock().await; - for msg in msg { - queue_lock_guard.push(msg); + let mut queue_guard = self.queue.lock().await; + let mut seen_ids_guard = self.seen_ids.lock().await; + + for msg in msg.into_iter() { + if seen_ids_guard.insert(SeenId::from(&msg)) { + queue_guard.push(msg); + } } } pub async fn clear(&self) { self.queue.lock().await.clear(); + self.seen_ids.lock().await.clear(); } pub async fn set_sender(&self, sender: AggregateMessagesSender) { @@ -46,15 +53,16 @@ impl AggregateMessageQueue { let maximum_payload_size = self.maximum_payload_size; let weak_queue = Arc::downgrade(&self.queue); - let mut interval = interval(Duration::from_secs(1)); - + let weak_seen_ids = Arc::downgrade(&self.seen_ids); + let interval_duration = Duration::from_secs(1); + let mut next_tick = Instant::now() + interval_duration; tokio::spawn(async move { loop { tokio::select! { _ = rx.recv() => break, - _ = interval.tick() => { + _ = sleep_until(next_tick) => { if let Some(queue) = weak_queue.upgrade() { - let messages_map = next_batch_message(10, maximum_payload_size, &queue).await; + let (did_sent_seen_ids, messages_map) = next_batch_message(10, maximum_payload_size, &queue).await; if messages_map.is_empty() { continue; } @@ -63,10 +71,24 @@ impl AggregateMessageQueue { log_message_map(&messages_map); } + // Send messages to server send_batch_message(&sender, messages_map).await; + + // after sending messages, remove seen_ids + let num_init_sync = did_sent_seen_ids.iter().filter(|id| id.is_init_sync).count(); + if let Some(seen_ids) = weak_seen_ids.upgrade() { + let mut seen_lock = seen_ids.lock().await; + seen_lock.retain(|id| !did_sent_seen_ids.contains(id)); + } + + // To determine the next interval dynamically, consider factors such as the number of messages sent, + // their total size, and the current network type. This approach allows for more nuanced interval + // adjustments, optimizing for efficiency and responsiveness under varying conditions. + next_tick = calculate_next_tick(num_init_sync, interval_duration); } else { break; } + } } } @@ -104,16 +126,19 @@ async fn next_batch_message( maximum_init_sync: usize, maximum_payload_size: usize, queue: &Arc>>, -) -> HashMap> { +) -> (HashSet, HashMap>) { let mut messages_map = HashMap::new(); let mut size = 0; let mut init_sync_count = 0; let mut lock_guard = queue.lock().await; + let mut seen_ids = HashSet::new(); while let Some(msg) = lock_guard.pop() { size += msg.size(); if msg.is_init_sync() { init_sync_count += 1; } + + seen_ids.insert(SeenId::from(&msg)); messages_map .entry(msg.object_id().to_string()) .or_insert(vec![]) @@ -127,7 +152,7 @@ async fn next_batch_message( } } - messages_map + (seen_ids, messages_map) } #[inline] @@ -153,3 +178,29 @@ fn log_message_map(messages_map: &HashMap>) { debug!("Aggregate message list:\n{}", log_msg); } + +#[derive(Eq, PartialEq, Hash)] +struct SeenId { + object_id: String, + msg_id: MsgId, + is_init_sync: bool, +} + +impl From<&ClientCollabMessage> for SeenId { + fn from(msg: &ClientCollabMessage) -> Self { + Self { + object_id: msg.object_id().to_string(), + msg_id: msg.msg_id(), + is_init_sync: msg.is_init_sync(), + } + } +} + +fn calculate_next_tick(num_init_sync: usize, default_interval: Duration) -> Instant { + match num_init_sync { + 0 => Instant::now() + default_interval, + 1..=3 => Instant::now() + Duration::from_secs(2), + 4..=7 => Instant::now() + Duration::from_secs(4), + _ => Instant::now() + Duration::from_secs(6), + } +} diff --git a/libs/collab-rt/src/collaborate/group_cmd.rs b/libs/collab-rt/src/collaborate/group_cmd.rs index 0a52d208..6b2abfea 100644 --- a/libs/collab-rt/src/collaborate/group_cmd.rs +++ b/libs/collab-rt/src/collaborate/group_cmd.rs @@ -225,8 +225,9 @@ pub async fn forward_message_to_group( ) { if let Some(client_stream) = client_streams.get(user) { trace!( - "[realtime]: receive: uid:{} oid:{} msg ids: {:?}", + "[realtime]: receive client:{} device:{} oid:{} msg ids: {:?}", user.uid, + user.device_id, object_id, collab_messages .iter() diff --git a/libs/collab-rt/src/rt_server.rs b/libs/collab-rt/src/rt_server.rs index a07cdcf6..3b3459d4 100644 --- a/libs/collab-rt/src/rt_server.rs +++ b/libs/collab-rt/src/rt_server.rs @@ -355,9 +355,10 @@ impl CollabClientStream { ) .await; trace!( - "{} receive {} client message: valid:{} invalid:{}", + "{} receive client:{}, device:{}, message: valid:{} invalid:{}", msg_oid, user.uid, + user.device_id, valid_messages.len(), invalid_message.len() ); diff --git a/src/biz/casbin/metrics.rs b/src/biz/casbin/metrics.rs index 3013471b..f50b2526 100644 --- a/src/biz/casbin/metrics.rs +++ b/src/biz/casbin/metrics.rs @@ -5,7 +5,6 @@ use std::sync::Arc; use crate::biz::casbin::enforcer::ENFORCER_METRICS_TICK_INTERVAL; use prometheus_client::registry::Registry; use tokio::time::interval; -use tracing::trace; #[derive(Clone)] pub struct AccessControlMetrics { @@ -52,11 +51,6 @@ impl AccessControlMetrics { } pub fn record_enforce_count(&self, total: i64, from_cache: i64) { - trace!( - "enforce_count: total: {}, from_cache: {}", - total, - from_cache - ); self.total_read_enforce_count.set(total); self.read_enforce_from_cache_count.set(from_cache); } diff --git a/src/biz/collab/cache.rs b/src/biz/collab/cache.rs index f7908b68..2be21091 100644 --- a/src/biz/collab/cache.rs +++ b/src/biz/collab/cache.rs @@ -42,11 +42,7 @@ impl CollabCache { ) -> Result { self.total_attempts.fetch_add(1, Ordering::Relaxed); // Attempt to retrieve encoded collab from memory cache, falling back to disk cache if necessary. - if let Some(encoded_collab) = self - .mem_cache - .get_encode_collab(¶ms.inner.object_id) - .await - { + if let Some(encoded_collab) = self.mem_cache.get_encode_collab(¶ms.object_id).await { event!( Level::DEBUG, "Get encoded collab:{} from cache", diff --git a/src/config/config.rs b/src/config/config.rs index 2824e243..d011f559 100644 --- a/src/config/config.rs +++ b/src/config/config.rs @@ -145,7 +145,7 @@ pub fn get_configuration() -> Result { Ok(config) } -fn get_env_var(key: &str, default: &str) -> String { +pub fn get_env_var(key: &str, default: &str) -> String { std::env::var(key).unwrap_or_else(|e| { tracing::warn!( "failed to read environment variable: {}, using default value: {}", diff --git a/tests/collab/storage_test.rs b/tests/collab/storage_test.rs index c9b85086..5b8986c1 100644 --- a/tests/collab/storage_test.rs +++ b/tests/collab/storage_test.rs @@ -6,6 +6,7 @@ use collab_entity::CollabType; use database_entity::dto::{ CreateCollabParams, DeleteCollabParams, QueryCollab, QueryCollabParams, QueryCollabResult, }; + use sqlx::types::Uuid; use std::collections::HashMap; @@ -225,3 +226,26 @@ async fn fail_insert_collab_with_invalid_workspace_id_test() { assert_eq!(error.code, ErrorCode::NotEnoughPermissions); } + +// #[tokio::test] +// async fn collab_mem_cache_read_write_test() { +// let redis_client = redis_client().await; +// let conn = redis_client.get_connection_manager().await.unwrap(); +// +// let mem_cache = CollabMemCache::new(conn); +// let encode_collab = EncodedCollab::new_v1(vec![1, 2, 3], vec![4, 5, 6]); +// +// let object_id = uuid::Uuid::new_v4().to_string(); +// mem_cache +// .insert_encode_collab_bytes(object_id.clone(), encode_collab.encode_to_bytes().unwrap()) +// .await; +// let encode_collab_from_cache = mem_cache.get_encode_collab(&object_id).await.unwrap(); +// assert_eq!(encode_collab_from_cache.doc_state, encode_collab.doc_state); +// assert_eq!( +// encode_collab_from_cache.state_vector, +// encode_collab.state_vector +// ); +// +// assert_eq!(encode_collab.state_vector, vec![1, 2, 3]); +// assert_eq!(encode_collab.doc_state, vec![4, 5, 6]); +// } diff --git a/tests/collab/util.rs b/tests/collab/util.rs index 90171b58..930b6e68 100644 --- a/tests/collab/util.rs +++ b/tests/collab/util.rs @@ -1,3 +1,5 @@ +use anyhow::Context; +use appflowy_cloud::config::config::get_env_var; use collab::core::collab_plugin::EncodedCollab; use collab::core::origin::CollabOrigin; use collab::preclude::Collab; @@ -35,3 +37,11 @@ pub fn test_encode_collab_v1(object_id: &str, key: &str, value: &str) -> Encoded collab.insert(key, value); collab.encode_collab_v1() } + +#[allow(dead_code)] +pub async fn redis_client() -> redis::Client { + let redis_uri = get_env_var("APPFLOWY_REDIS_URI", "redis://localhost:6379"); + redis::Client::open(redis_uri) + .context("failed to connect to redis") + .unwrap() +}