diff --git a/Cargo.lock b/Cargo.lock index a2de269f..a6012695 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1169,7 +1169,7 @@ dependencies = [ [[package]] name = "collab" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=4d348f11#4d348f11ab6c9922017b45376cd58a217a671cd7" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=bab20052#bab200529ef0306194fa8618cc8708878b01ce04" dependencies = [ "anyhow", "async-trait", @@ -1188,7 +1188,7 @@ dependencies = [ [[package]] name = "collab-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=4d348f11#4d348f11ab6c9922017b45376cd58a217a671cd7" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=bab20052#bab200529ef0306194fa8618cc8708878b01ce04" dependencies = [ "anyhow", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 96749096..07197676 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -138,8 +138,8 @@ lto = false opt-level = 3 [patch.crates-io] -collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "4d348f11" } -collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "4d348f11" } +collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "bab20052" } +collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "bab20052" } # Comment the above and uncomment the below to use local version of collab by cloning the repo and placing it in libs folder #collab = { path = "libs/AppFlowy-Collab/collab" } diff --git a/libs/app_error/src/lib.rs b/libs/app_error/src/lib.rs index 467bee22..b194a099 100644 --- a/libs/app_error/src/lib.rs +++ b/libs/app_error/src/lib.rs @@ -28,7 +28,7 @@ pub enum AppError { #[error("Invalid password:{0}")] InvalidPassword(String), - #[error("OAuth error:{0}")] + #[error("{0}")] OAuthError(String), #[error("Missing Payload:{0}")] diff --git a/libs/client-api/src/collab_sync/pending_msg.rs b/libs/client-api/src/collab_sync/pending_msg.rs index 2e2a495f..d3b49587 100644 --- a/libs/client-api/src/collab_sync/pending_msg.rs +++ b/libs/client-api/src/collab_sync/pending_msg.rs @@ -1,7 +1,6 @@ use anyhow::Error; use std::cmp::Ordering; use std::collections::BinaryHeap; -use std::fmt::Display; use std::ops::{Deref, DerefMut}; use realtime_entity::collab_msg::{CollabSinkMessage, MsgId}; @@ -15,7 +14,7 @@ pub(crate) struct PendingMsgQueue { impl PendingMsgQueue where - Msg: Ord + Clone + Display, + Msg: CollabSinkMessage, { pub(crate) fn new(uid: i64) -> Self { Self { @@ -32,7 +31,7 @@ where impl Deref for PendingMsgQueue where - Msg: Ord, + Msg: CollabSinkMessage, { type Target = BinaryHeap>; @@ -43,7 +42,7 @@ where impl DerefMut for PendingMsgQueue where - Msg: Ord, + Msg: CollabSinkMessage, { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.queue @@ -60,7 +59,7 @@ pub(crate) struct PendingMessage { impl PendingMessage where - Msg: Clone + Display, + Msg: CollabSinkMessage, { pub fn new(msg: Msg, msg_id: MsgId) -> Self { Self { @@ -71,6 +70,10 @@ where } } + pub fn object_id(&self) -> &str { + self.msg.collab_object_id() + } + pub fn get_msg(&self) -> &Msg { &self.msg } @@ -82,27 +85,29 @@ where pub fn set_state(&mut self, uid: i64, new_state: MessageState) -> bool { self.state = new_state; trace!( - "[Client {}] : msg_id: {}, state: {:?}", + "[Client {}] : oid:{}|msg_id:{},state:{:?}", uid, + self.msg.collab_object_id(), self.msg_id, self.state ); - if !self.state.is_done() { - return false; - } - match self.tx.take() { - None => false, - Some(tx) => { - // Notify that the message with given id was received - match tx.send(self.msg_id) { - Ok(_) => true, - Err(err) => { - warn!("Failed to send msg_id: {}, err: {}", self.msg_id, err); - false - }, - } - }, + if self.state.is_done() { + match self.tx.take() { + None => false, + Some(tx) => { + // Notify that the message with given id was received + match tx.send(self.msg_id) { + Ok(_) => true, + Err(err) => { + warn!("Failed to send msg_id: {}, err: {}", self.msg_id, err); + false + }, + } + }, + } + } else { + false } } diff --git a/libs/client-api/src/collab_sync/sink.rs b/libs/client-api/src/collab_sync/sink.rs index fb8bd5bb..2e2ca490 100644 --- a/libs/client-api/src/collab_sync/sink.rs +++ b/libs/client-api/src/collab_sync/sink.rs @@ -324,8 +324,12 @@ where Ok(_) => match self.pending_msg_queue.try_lock() { None => warn!("Failed to acquire the lock of the pending_msg_queue"), Some(mut pending_msg_queue) => { - let _ = pending_msg_queue.pop(); - trace!("Pending messages: {}", pending_msg_queue.len()); + let msg = pending_msg_queue.pop(); + trace!( + "{:?}: Pending messages: {}", + msg.map(|msg| msg.object_id().to_owned()), + pending_msg_queue.len() + ); if pending_msg_queue.is_empty() { if let Err(e) = self.state_notifier.send(SinkState::Finished) { error!("send sink state failed: {}", e); diff --git a/libs/client-api/src/collab_sync/sync.rs b/libs/client-api/src/collab_sync/sync.rs index fbb3a634..6929d52c 100644 --- a/libs/client-api/src/collab_sync/sync.rs +++ b/libs/client-api/src/collab_sync/sync.rs @@ -266,11 +266,12 @@ where P: CollabSyncProtocol + Send + Sync + 'static, { if match msg.msg_id() { + // The msg_id is None if the message is [ServerBroadcast] or [ServerAwareness] None => true, Some(msg_id) => sink.ack_msg(msg.origin(), msg.object_id(), msg_id).await, } && !msg.payload().is_empty() { - trace!("💬process messages"); + trace!("start process message: {:?}", msg.msg_id()); SyncStream::::process_payload( origin, msg.payload(), @@ -280,7 +281,7 @@ where sink, ) .await?; - trace!("💬"); + trace!("end process message: {:?}", msg.msg_id()); } Ok(()) } diff --git a/libs/realtime-entity/src/collab_msg.rs b/libs/realtime-entity/src/collab_msg.rs index b06841e4..d155d529 100644 --- a/libs/realtime-entity/src/collab_msg.rs +++ b/libs/realtime-entity/src/collab_msg.rs @@ -11,6 +11,7 @@ use collab_entity::CollabType; use serde::{Deserialize, Serialize}; pub trait CollabSinkMessage: Clone + Send + Sync + 'static + Ord + Display { + fn collab_object_id(&self) -> &str; /// Returns the length of the message in bytes. fn length(&self) -> usize; @@ -38,6 +39,10 @@ pub enum CollabMessage { } impl CollabSinkMessage for CollabMessage { + fn collab_object_id(&self) -> &str { + self.object_id() + } + fn length(&self) -> usize { self.payload().len() } diff --git a/tests/collab/workspace_collab.rs b/tests/collab/workspace_collab.rs index 8c2658d0..2180c1c2 100644 --- a/tests/collab/workspace_collab.rs +++ b/tests/collab/workspace_collab.rs @@ -10,8 +10,8 @@ async fn edit_workspace_without_permission() { let mut client_2 = TestClient::new_user().await; let workspace_id = client_1.workspace_id().await; - client_1.edit_workspace_collab(&workspace_id).await; - client_2.edit_workspace_collab(&workspace_id).await; + client_1.open_workspace_collab(&workspace_id).await; + client_2.open_workspace_collab(&workspace_id).await; client_1 .collab_by_object_id @@ -31,13 +31,13 @@ async fn init_sync_workspace_with_guest_permission() { let mut client_1 = TestClient::new_user().await; let mut client_2 = TestClient::new_user().await; let workspace_id = client_1.workspace_id().await; - client_1.edit_workspace_collab(&workspace_id).await; + client_1.open_workspace_collab(&workspace_id).await; // add client 2 as the member of the workspace then the client 2 will receive the update. client_1 .add_workspace_member(&workspace_id, &client_2, AFRole::Guest) .await; - client_2.edit_workspace_collab(&workspace_id).await; + client_2.open_workspace_collab(&workspace_id).await; client_1 .collab_by_object_id @@ -57,7 +57,7 @@ async fn edit_workspace_with_guest_permission() { let mut client_1 = TestClient::new_user().await; let mut client_2 = TestClient::new_user().await; let workspace_id = client_1.workspace_id().await; - client_1.edit_workspace_collab(&workspace_id).await; + client_1.open_workspace_collab(&workspace_id).await; // add client 2 as the member of the workspace then the client 2 will receive the update. client_1 @@ -73,7 +73,7 @@ async fn edit_workspace_with_guest_permission() { .insert("name", "zack"); client_1.wait_object_sync_complete(&workspace_id).await; - client_2.edit_workspace_collab(&workspace_id).await; + client_2.open_workspace_collab(&workspace_id).await; // make sure the client 2 has received the remote updates before the client 2 edits the collab client_2 .wait_object_sync_complete_with_secs(&workspace_id, 10) diff --git a/tests/util/test_client.rs b/tests/util/test_client.rs index dcf32b21..0d3029c7 100644 --- a/tests/util/test_client.rs +++ b/tests/util/test_client.rs @@ -325,7 +325,7 @@ impl TestClient { object_id } - pub(crate) async fn edit_workspace_collab(&mut self, workspace_id: &str) { + pub(crate) async fn open_workspace_collab(&mut self, workspace_id: &str) { self .open_collab(workspace_id, workspace_id, CollabType::Folder) .await;