feat: sync deduplicate (#412)

* chore: update logs

* chore: update logs

* chore: deduplicate messages

* chore: optimize sync interval

* chore: fmt

* ci: test

* chore: clippy

* chore: clippy
This commit is contained in:
Nathan.fooo 2024-03-24 07:30:05 +08:00 committed by GitHub
parent 47649efe18
commit acc13414cf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 103 additions and 27 deletions

View File

@ -212,7 +212,6 @@ where
pub fn resume(&self) {
self.pause.store(false, Ordering::SeqCst);
self.notify();
}
/// Notify the sink to process the next message and mark the current message as done.

View File

@ -1,11 +1,11 @@
use client_websocket::Message;
use collab_rt_entity::collab_msg::ClientCollabMessage;
use collab_rt_entity::collab_msg::{ClientCollabMessage, MsgId};
use collab_rt_entity::message::RealtimeMessage;
use std::collections::{BinaryHeap, HashMap};
use std::collections::{BinaryHeap, HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{mpsc, Mutex};
use tokio::time::interval;
use tokio::time::{sleep_until, Instant};
use tracing::{debug, error, trace};
pub type AggregateMessagesSender = mpsc::Sender<Message>;
@ -15,6 +15,7 @@ pub struct AggregateMessageQueue {
maximum_payload_size: usize,
queue: Arc<Mutex<BinaryHeap<ClientCollabMessage>>>,
stop_tx: Mutex<Option<mpsc::Sender<()>>>,
seen_ids: Arc<Mutex<HashSet<SeenId>>>,
}
impl AggregateMessageQueue {
@ -23,18 +24,24 @@ impl AggregateMessageQueue {
maximum_payload_size,
queue: Default::default(),
stop_tx: Default::default(),
seen_ids: Arc::new(Default::default()),
}
}
pub async fn push(&self, msg: Vec<ClientCollabMessage>) {
let mut queue_lock_guard = self.queue.lock().await;
for msg in msg {
queue_lock_guard.push(msg);
let mut queue_guard = self.queue.lock().await;
let mut seen_ids_guard = self.seen_ids.lock().await;
for msg in msg.into_iter() {
if seen_ids_guard.insert(SeenId::from(&msg)) {
queue_guard.push(msg);
}
}
}
pub async fn clear(&self) {
self.queue.lock().await.clear();
self.seen_ids.lock().await.clear();
}
pub async fn set_sender(&self, sender: AggregateMessagesSender) {
@ -46,15 +53,16 @@ impl AggregateMessageQueue {
let maximum_payload_size = self.maximum_payload_size;
let weak_queue = Arc::downgrade(&self.queue);
let mut interval = interval(Duration::from_secs(1));
let weak_seen_ids = Arc::downgrade(&self.seen_ids);
let interval_duration = Duration::from_secs(1);
let mut next_tick = Instant::now() + interval_duration;
tokio::spawn(async move {
loop {
tokio::select! {
_ = rx.recv() => break,
_ = interval.tick() => {
_ = sleep_until(next_tick) => {
if let Some(queue) = weak_queue.upgrade() {
let messages_map = next_batch_message(10, maximum_payload_size, &queue).await;
let (did_sent_seen_ids, messages_map) = next_batch_message(10, maximum_payload_size, &queue).await;
if messages_map.is_empty() {
continue;
}
@ -63,10 +71,24 @@ impl AggregateMessageQueue {
log_message_map(&messages_map);
}
// Send messages to server
send_batch_message(&sender, messages_map).await;
// after sending messages, remove seen_ids
let num_init_sync = did_sent_seen_ids.iter().filter(|id| id.is_init_sync).count();
if let Some(seen_ids) = weak_seen_ids.upgrade() {
let mut seen_lock = seen_ids.lock().await;
seen_lock.retain(|id| !did_sent_seen_ids.contains(id));
}
// To determine the next interval dynamically, consider factors such as the number of messages sent,
// their total size, and the current network type. This approach allows for more nuanced interval
// adjustments, optimizing for efficiency and responsiveness under varying conditions.
next_tick = calculate_next_tick(num_init_sync, interval_duration);
} else {
break;
}
}
}
}
@ -104,16 +126,19 @@ async fn next_batch_message(
maximum_init_sync: usize,
maximum_payload_size: usize,
queue: &Arc<Mutex<BinaryHeap<ClientCollabMessage>>>,
) -> HashMap<String, Vec<ClientCollabMessage>> {
) -> (HashSet<SeenId>, HashMap<String, Vec<ClientCollabMessage>>) {
let mut messages_map = HashMap::new();
let mut size = 0;
let mut init_sync_count = 0;
let mut lock_guard = queue.lock().await;
let mut seen_ids = HashSet::new();
while let Some(msg) = lock_guard.pop() {
size += msg.size();
if msg.is_init_sync() {
init_sync_count += 1;
}
seen_ids.insert(SeenId::from(&msg));
messages_map
.entry(msg.object_id().to_string())
.or_insert(vec![])
@ -127,7 +152,7 @@ async fn next_batch_message(
}
}
messages_map
(seen_ids, messages_map)
}
#[inline]
@ -153,3 +178,29 @@ fn log_message_map(messages_map: &HashMap<String, Vec<ClientCollabMessage>>) {
debug!("Aggregate message list:\n{}", log_msg);
}
#[derive(Eq, PartialEq, Hash)]
struct SeenId {
object_id: String,
msg_id: MsgId,
is_init_sync: bool,
}
impl From<&ClientCollabMessage> for SeenId {
fn from(msg: &ClientCollabMessage) -> Self {
Self {
object_id: msg.object_id().to_string(),
msg_id: msg.msg_id(),
is_init_sync: msg.is_init_sync(),
}
}
}
fn calculate_next_tick(num_init_sync: usize, default_interval: Duration) -> Instant {
match num_init_sync {
0 => Instant::now() + default_interval,
1..=3 => Instant::now() + Duration::from_secs(2),
4..=7 => Instant::now() + Duration::from_secs(4),
_ => Instant::now() + Duration::from_secs(6),
}
}

View File

@ -225,8 +225,9 @@ pub async fn forward_message_to_group(
) {
if let Some(client_stream) = client_streams.get(user) {
trace!(
"[realtime]: receive: uid:{} oid:{} msg ids: {:?}",
"[realtime]: receive client:{} device:{} oid:{} msg ids: {:?}",
user.uid,
user.device_id,
object_id,
collab_messages
.iter()

View File

@ -355,9 +355,10 @@ impl CollabClientStream {
)
.await;
trace!(
"{} receive {} client message: valid:{} invalid:{}",
"{} receive client:{}, device:{}, message: valid:{} invalid:{}",
msg_oid,
user.uid,
user.device_id,
valid_messages.len(),
invalid_message.len()
);

View File

@ -5,7 +5,6 @@ use std::sync::Arc;
use crate::biz::casbin::enforcer::ENFORCER_METRICS_TICK_INTERVAL;
use prometheus_client::registry::Registry;
use tokio::time::interval;
use tracing::trace;
#[derive(Clone)]
pub struct AccessControlMetrics {
@ -52,11 +51,6 @@ impl AccessControlMetrics {
}
pub fn record_enforce_count(&self, total: i64, from_cache: i64) {
trace!(
"enforce_count: total: {}, from_cache: {}",
total,
from_cache
);
self.total_read_enforce_count.set(total);
self.read_enforce_from_cache_count.set(from_cache);
}

View File

@ -42,11 +42,7 @@ impl CollabCache {
) -> Result<EncodedCollab, AppError> {
self.total_attempts.fetch_add(1, Ordering::Relaxed);
// Attempt to retrieve encoded collab from memory cache, falling back to disk cache if necessary.
if let Some(encoded_collab) = self
.mem_cache
.get_encode_collab(&params.inner.object_id)
.await
{
if let Some(encoded_collab) = self.mem_cache.get_encode_collab(&params.object_id).await {
event!(
Level::DEBUG,
"Get encoded collab:{} from cache",

View File

@ -145,7 +145,7 @@ pub fn get_configuration() -> Result<Config, anyhow::Error> {
Ok(config)
}
fn get_env_var(key: &str, default: &str) -> String {
pub fn get_env_var(key: &str, default: &str) -> String {
std::env::var(key).unwrap_or_else(|e| {
tracing::warn!(
"failed to read environment variable: {}, using default value: {}",

View File

@ -6,6 +6,7 @@ use collab_entity::CollabType;
use database_entity::dto::{
CreateCollabParams, DeleteCollabParams, QueryCollab, QueryCollabParams, QueryCollabResult,
};
use sqlx::types::Uuid;
use std::collections::HashMap;
@ -225,3 +226,26 @@ async fn fail_insert_collab_with_invalid_workspace_id_test() {
assert_eq!(error.code, ErrorCode::NotEnoughPermissions);
}
// #[tokio::test]
// async fn collab_mem_cache_read_write_test() {
// let redis_client = redis_client().await;
// let conn = redis_client.get_connection_manager().await.unwrap();
//
// let mem_cache = CollabMemCache::new(conn);
// let encode_collab = EncodedCollab::new_v1(vec![1, 2, 3], vec![4, 5, 6]);
//
// let object_id = uuid::Uuid::new_v4().to_string();
// mem_cache
// .insert_encode_collab_bytes(object_id.clone(), encode_collab.encode_to_bytes().unwrap())
// .await;
// let encode_collab_from_cache = mem_cache.get_encode_collab(&object_id).await.unwrap();
// assert_eq!(encode_collab_from_cache.doc_state, encode_collab.doc_state);
// assert_eq!(
// encode_collab_from_cache.state_vector,
// encode_collab.state_vector
// );
//
// assert_eq!(encode_collab.state_vector, vec![1, 2, 3]);
// assert_eq!(encode_collab.doc_state, vec![4, 5, 6]);
// }

View File

@ -1,3 +1,5 @@
use anyhow::Context;
use appflowy_cloud::config::config::get_env_var;
use collab::core::collab_plugin::EncodedCollab;
use collab::core::origin::CollabOrigin;
use collab::preclude::Collab;
@ -35,3 +37,11 @@ pub fn test_encode_collab_v1(object_id: &str, key: &str, value: &str) -> Encoded
collab.insert(key, value);
collab.encode_collab_v1()
}
#[allow(dead_code)]
pub async fn redis_client() -> redis::Client {
let redis_uri = get_env_var("APPFLOWY_REDIS_URI", "redis://localhost:6379");
redis::Client::open(redis_uri)
.context("failed to connect to redis")
.unwrap()
}