From 47e14d3b25d63425084ae908ce8f69ae0c9d6641 Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Wed, 13 Mar 2024 09:53:25 +0800 Subject: [PATCH] fix: potential duplicate init sync (#381) * fix: potential duplicate init sync * chore: clippy --- libs/client-api/Cargo.toml | 1 + libs/client-api/src/collab_sync/sink.rs | 125 +++++++++--------- .../src/collab_sync/sync_control.rs | 2 +- libs/client-api/src/ws/client.rs | 2 +- libs/realtime/src/server/collaborate/group.rs | 5 +- .../realtime/src/server/collaborate/plugin.rs | 1 - libs/realtime/src/server/rt_server.rs | 24 +++- src/biz/casbin/enforcer_cache.rs | 4 +- 8 files changed, 90 insertions(+), 74 deletions(-) diff --git a/libs/client-api/Cargo.toml b/libs/client-api/Cargo.toml index dc2739d9..d28c802d 100644 --- a/libs/client-api/Cargo.toml +++ b/libs/client-api/Cargo.toml @@ -67,3 +67,4 @@ again = "0.1.2" collab-sync = ["collab", "yrs"] test_util = ["scraper"] template = ["workspace-template"] +ws_verbose_log = [] diff --git a/libs/client-api/src/collab_sync/sink.rs b/libs/client-api/src/collab_sync/sink.rs index dac614ec..090e8335 100644 --- a/libs/client-api/src/collab_sync/sink.rs +++ b/libs/client-api/src/collab_sync/sink.rs @@ -167,18 +167,7 @@ where trace!("🔥 queue {}", new_msg); msg_queue.push_msg(msg_id, new_msg); - if !requeue_items.is_empty() { - trace!( - "{} requeue items: {}", - self.object.object_id, - requeue_items - .iter() - .map(|item| { item.msg_id().to_string() }) - .collect::>() - .join(",") - ); - msg_queue.extend(requeue_items); - } + msg_queue.extend(requeue_items); drop(msg_queue); // Notify the sink to process the next message after 500ms. let _ = self @@ -201,7 +190,9 @@ where // When the client is connected, remove all pending messages and send the init message. let mut msg_queue = self.message_queue.lock(); let msg_id = self.msg_id_counter.next(); - msg_queue.push_msg(msg_id, f(msg_id)); + let init_sync = f(msg_id); + trace!("🔥queue {}", init_sync); + msg_queue.push_msg(msg_id, init_sync); let _ = self.notifier.send(SinkSignal::Proceed); } @@ -242,7 +233,7 @@ where /// Notify the sink to process the next message and mark the current message as done. /// Returns bool value to indicate whether the message is valid. - pub async fn validate_server_message(&self, server_message: &ServerCollabMessage) -> bool { + pub async fn validate_response(&self, server_message: &ServerCollabMessage) -> bool { if server_message.msg_id().is_none() { // msg_id will be None for [ServerBroadcast] or [ServerAwareness], automatically valid. return true; @@ -298,56 +289,30 @@ where if self.pause.load(Ordering::SeqCst) { return; } - self.send_msg_immediately().await; - } - async fn send_msg_immediately(&self) { let items = { - let mut msg_queue = match self.message_queue.try_lock() { - None => { + let (mut msg_queue, mut flying_messages) = match ( + self.message_queue.try_lock(), + self.flying_messages.try_lock(), + ) { + (Some(msg_queue), Some(flying_messages)) => (msg_queue, flying_messages), + _ => { // If acquire the lock failed, try later retry_later(Arc::downgrade(&self.notifier)); return; }, - Some(msg_queue) => msg_queue, }; - let mut flying_messages = self.flying_messages.lock(); - let mut items = vec![]; - - let mut count = 0; - let mut item_to_requeue = vec![]; - while let Some(item) = msg_queue.pop() { - if count > 20 { - item_to_requeue.push(item); - break; - } - - if flying_messages.contains(&item.msg_id()) { - item_to_requeue.push(item); - continue; - } - - let is_init_sync = item.message().is_client_init_sync(); - items.push(item.clone()); - count += 1; - item_to_requeue.push(item); - - if is_init_sync { - break; - } - } - msg_queue.extend(item_to_requeue); - - let message_ids = items.iter().map(|item| item.msg_id()).collect::>(); - flying_messages.extend(message_ids); - - items + get_next_batch_item(&self.object.object_id, &mut flying_messages, &mut msg_queue) }; if items.is_empty() { return; } + self.send_immediately(items).await; + } + + async fn send_immediately(&self, items: Vec>) { let message_ids = items.iter().map(|item| item.msg_id()).collect::>(); let messages = items .into_iter() @@ -364,18 +329,10 @@ where }, Err(err) => { error!("Failed to send error: {:?}", err.into()); - self - .flying_messages - .lock() - .retain(|id| !message_ids.contains(id)); }, }, Err(_) => { warn!("failed to acquire the lock of the sink, retry later"); - self - .flying_messages - .lock() - .retain(|id| !message_ids.contains(id)); retry_later(Arc::downgrade(&self.notifier)); }, } @@ -437,9 +394,57 @@ where } } +fn get_next_batch_item( + object_id: &str, + flying_messages: &mut HashSet, + msg_queue: &mut SinkQueue, +) -> Vec> +where + Msg: CollabSinkMessage, +{ + let mut items = vec![]; + let mut requeue_items = vec![]; + while let Some(item) = msg_queue.pop() { + if items.len() > 20 { + requeue_items.push(item); + break; + } + + if flying_messages.contains(&item.msg_id()) { + requeue_items.push(item); + continue; + } + + let is_init_sync = item.message().is_client_init_sync(); + items.push(item.clone()); + requeue_items.push(item); + + if is_init_sync { + break; + } + } + + if !requeue_items.is_empty() { + trace!( + "{} requeue items: {}", + object_id, + requeue_items + .iter() + .map(|item| { item.msg_id().to_string() }) + .collect::>() + .join(",") + ); + } + msg_queue.extend(requeue_items); + + let message_ids = items.iter().map(|item| item.msg_id()).collect::>(); + flying_messages.extend(message_ids); + items +} + fn retry_later(weak_notifier: Weak>) { af_spawn(async move { - interval(Duration::from_millis(300)).tick().await; + interval(Duration::from_millis(500)).tick().await; if let Some(notifier) = weak_notifier.upgrade() { let _ = notifier.send(SinkSignal::Proceed); } diff --git a/libs/client-api/src/collab_sync/sync_control.rs b/libs/client-api/src/collab_sync/sync_control.rs index 0b2e168b..a7980b11 100644 --- a/libs/client-api/src/collab_sync/sync_control.rs +++ b/libs/client-api/src/collab_sync/sync_control.rs @@ -334,7 +334,7 @@ where } // Check if the message is acknowledged by the sink. If not, return. - let is_valid = sink.validate_server_message(&msg).await; + let is_valid = sink.validate_response(&msg).await; // If there's no payload or the payload is empty, return. if is_valid && !msg.payload().is_empty() { ObserveCollab::::process_payload( diff --git a/libs/client-api/src/ws/client.rs b/libs/client-api/src/ws/client.rs index 02d1bce5..02191f28 100644 --- a/libs/client-api/src/ws/client.rs +++ b/libs/client-api/src/ws/client.rs @@ -65,7 +65,7 @@ pub(crate) type CurrentConnInfo = parking_lot::Mutex>; /// The maximum size allowed for a WebSocket message is 65,536 bytes. If the message exceeds /// 50960 bytes (to avoid occupying the entire space), it should be sent over HTTP instead. const MAXIMUM_MESSAGE_SIZE: usize = 40960; -const MAXIMUM_BATCH_MESSAGE_SIZE: usize = 30960; +const MAXIMUM_BATCH_MESSAGE_SIZE: usize = 20480; pub struct WSClient { current_conn_info: Arc, config: WSClientConfig, diff --git a/libs/realtime/src/server/collaborate/group.rs b/libs/realtime/src/server/collaborate/group.rs index 812e2431..2258b85b 100644 --- a/libs/realtime/src/server/collaborate/group.rs +++ b/libs/realtime/src/server/collaborate/group.rs @@ -136,8 +136,9 @@ where >::Error: std::error::Error + Send + Sync, { trace!( - "[realtime]: new group subscriber: {}, connected members: {}", - subscriber_origin, + "[realtime]: {} new subscriber: {}, connected members: {}", + self.object_id, + user.uid(), self.subscribers.len(), ); diff --git a/libs/realtime/src/server/collaborate/plugin.rs b/libs/realtime/src/server/collaborate/plugin.rs index b84f5895..9f448f3e 100644 --- a/libs/realtime/src/server/collaborate/plugin.rs +++ b/libs/realtime/src/server/collaborate/plugin.rs @@ -212,7 +212,6 @@ where trace!("{} edit state:{}", object_id, self.edit_state); if self.edit_state.should_flush(100, 3 * 60) { self.edit_state.tick(); - let _object_id = object_id.to_string(); } } diff --git a/libs/realtime/src/server/rt_server.rs b/libs/realtime/src/server/rt_server.rs index 4a7d6d6f..4e1d0268 100644 --- a/libs/realtime/src/server/rt_server.rs +++ b/libs/realtime/src/server/rt_server.rs @@ -562,17 +562,24 @@ impl CollabClientStream { while let Some(Ok(realtime_msg)) = stream_rx.next().await { match realtime_msg.try_into_message_by_object_id() { Ok(messages_by_oid) => { - for (msg_oid, messages) in messages_by_oid { + for (msg_oid, original_messages) in messages_by_oid { if cloned_object_id != msg_oid { continue; } - let messages = Self::access_control(&uid, &msg_oid, &access_control, messages).await; - if messages.is_empty() { + let (valid_messages, invalid_message) = + Self::access_control(&uid, &msg_oid, &access_control, original_messages).await; + trace!( + "{} receive message: valid:{} invalid:{}", + msg_oid, + valid_messages.len(), + invalid_message.len() + ); + + if valid_messages.is_empty() { continue; } - - if tx.send([(msg_oid, messages)].into()).await.is_err() { + if tx.send([(msg_oid, valid_messages)].into()).await.is_err() { break; } } @@ -594,7 +601,7 @@ impl CollabClientStream { object_id: &str, access_control: &Arc, messages: Vec, - ) -> Vec + ) -> (Vec, Vec) where AC: RealtimeAccessControl, { @@ -604,6 +611,7 @@ impl CollabClientStream { .unwrap_or(false); let mut valid_messages = Vec::with_capacity(messages.len()); + let mut invalid_messages = Vec::with_capacity(messages.len()); for message in messages { if message.is_client_init_sync() && access_control @@ -617,9 +625,11 @@ impl CollabClientStream { if can_write { valid_messages.push(message); + } else { + invalid_messages.push(message); } } - valid_messages + (valid_messages, invalid_messages) } } diff --git a/src/biz/casbin/enforcer_cache.rs b/src/biz/casbin/enforcer_cache.rs index d146e9ee..ff5e5ded 100644 --- a/src/biz/casbin/enforcer_cache.rs +++ b/src/biz/casbin/enforcer_cache.rs @@ -9,7 +9,7 @@ use async_trait::async_trait; use tracing::error; /// Expire time for cache in seconds. When the cache is expired, the enforcer will re-evaluate the policy. -const EXPIRE_TIME: u64 = 60 * 60 * 24 * 3; +const EXPIRE_IN_ONE_DAY: u64 = 60 * 60 * 24; #[derive(Clone)] pub struct AFEnforcerCacheImpl { @@ -31,7 +31,7 @@ impl AFEnforcerCache for AFEnforcerCacheImpl { ) -> Result<(), AppError> { self .redis_client - .set_ex::<&str, bool, ()>(key, value, EXPIRE_TIME) + .set_ex::<&str, bool, ()>(key, value, EXPIRE_IN_ONE_DAY) .await .map_err(|e| AppError::Internal(anyhow!("Failed to set enforcer result in redis: {}", e))) }