From e2fd0493339b3d769e3f827b3351f16dd0ad457f Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Fri, 26 Apr 2024 19:39:08 +0800 Subject: [PATCH] chore: awarenss with origin (#498) --- Cargo.lock | 8 +-- Cargo.toml | 8 +-- libs/collab-rt-entity/src/server_message.rs | 4 +- libs/collab-rt-protocol/src/protocol.rs | 5 +- .../src/group/broadcast.rs | 55 +++++++++---------- 5 files changed, 38 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a34fd59d..406d9cc4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1648,7 +1648,7 @@ dependencies = [ [[package]] name = "collab" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=7dadc5ac7da58b862e856797583f5e40cb709bd1#7dadc5ac7da58b862e856797583f5e40cb709bd1" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=85580a5c0e95b5dae4787336faa751da44365760#85580a5c0e95b5dae4787336faa751da44365760" dependencies = [ "anyhow", "async-trait", @@ -1672,7 +1672,7 @@ dependencies = [ [[package]] name = "collab-document" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=7dadc5ac7da58b862e856797583f5e40cb709bd1#7dadc5ac7da58b862e856797583f5e40cb709bd1" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=85580a5c0e95b5dae4787336faa751da44365760#85580a5c0e95b5dae4787336faa751da44365760" dependencies = [ "anyhow", "collab", @@ -1691,7 +1691,7 @@ dependencies = [ [[package]] name = "collab-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=7dadc5ac7da58b862e856797583f5e40cb709bd1#7dadc5ac7da58b862e856797583f5e40cb709bd1" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=85580a5c0e95b5dae4787336faa751da44365760#85580a5c0e95b5dae4787336faa751da44365760" dependencies = [ "anyhow", "bytes", @@ -1706,7 +1706,7 @@ dependencies = [ [[package]] name = "collab-folder" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=7dadc5ac7da58b862e856797583f5e40cb709bd1#7dadc5ac7da58b862e856797583f5e40cb709bd1" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=85580a5c0e95b5dae4787336faa751da44365760#85580a5c0e95b5dae4787336faa751da44365760" dependencies = [ "anyhow", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 1fa1fa1a..8e91c52e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -209,10 +209,10 @@ debug = true # will be removed when using yrs 0.18.2 that expose pendings yrs = { git = "https://github.com/appflowy/y-crdt", rev = "3f25bb510ca5274e7657d3713fbed41fb46b4487" } -collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "7dadc5ac7da58b862e856797583f5e40cb709bd1" } -collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "7dadc5ac7da58b862e856797583f5e40cb709bd1" } -collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "7dadc5ac7da58b862e856797583f5e40cb709bd1" } -collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "7dadc5ac7da58b862e856797583f5e40cb709bd1" } +collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "85580a5c0e95b5dae4787336faa751da44365760" } +collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "85580a5c0e95b5dae4787336faa751da44365760" } +collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "85580a5c0e95b5dae4787336faa751da44365760" } +collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "85580a5c0e95b5dae4787336faa751da44365760" } [features] ai_enable = [] diff --git a/libs/collab-rt-entity/src/server_message.rs b/libs/collab-rt-entity/src/server_message.rs index 63b69686..5a658e08 100644 --- a/libs/collab-rt-entity/src/server_message.rs +++ b/libs/collab-rt-entity/src/server_message.rs @@ -308,11 +308,11 @@ pub struct AwarenessSync { } impl AwarenessSync { - pub fn new(object_id: String, payload: Vec) -> Self { + pub fn new(object_id: String, payload: Vec, origin: CollabOrigin) -> Self { Self { object_id, payload: Bytes::from(payload), - origin: CollabOrigin::Server, + origin, } } } diff --git a/libs/collab-rt-protocol/src/protocol.rs b/libs/collab-rt-protocol/src/protocol.rs index fbc5684a..d85a4468 100644 --- a/libs/collab-rt-protocol/src/protocol.rs +++ b/libs/collab-rt-protocol/src/protocol.rs @@ -172,10 +172,11 @@ pub trait CollabSyncProtocol { /// instance is being updated with incoming data. fn handle_awareness_update( &self, + message_origin: &CollabOrigin, awareness: &mut Awareness, update: AwarenessUpdate, ) -> Result>, RTProtocolError> { - awareness.apply_update(update)?; + awareness.apply_update(update, message_origin)?; Ok(None) } @@ -221,7 +222,7 @@ pub fn handle_message_follow_protocol( }, Message::Auth(reason) => protocol.handle_auth(collab.get_awareness(), reason), Message::Awareness(update) => { - protocol.handle_awareness_update(collab.get_mut_awareness(), update) + protocol.handle_awareness_update(message_origin, collab.get_mut_awareness(), update) }, Message::Custom(msg) => protocol.handle_custom_message(collab.get_mut_awareness(), msg), } diff --git a/services/appflowy-collaborate/src/group/broadcast.rs b/services/appflowy-collaborate/src/group/broadcast.rs index 54492031..094391d4 100644 --- a/services/appflowy-collaborate/src/group/broadcast.rs +++ b/services/appflowy-collaborate/src/group/broadcast.rs @@ -21,7 +21,7 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; use tokio::select; -use tokio::sync::broadcast::error::SendError; + use tokio::sync::broadcast::{channel, Sender}; use tokio::time::{sleep, Instant}; @@ -37,7 +37,7 @@ use yrs::UpdateSubscription; /// pub struct CollabBroadcast { object_id: String, - sender: Sender, + broadcast_sender: Sender, awareness_sub: Option, /// Keep the lifetime of the document observer subscription. The subscription will be stopped /// when the broadcast is dropped. @@ -74,7 +74,7 @@ impl CollabBroadcast { let (sender, _) = channel(buffer_capacity); let mut this = CollabBroadcast { object_id, - sender, + broadcast_sender: sender, awareness_sub: Default::default(), doc_subscription: Default::default(), edit_state, @@ -88,7 +88,7 @@ impl CollabBroadcast { let (doc_sub, awareness_sub) = { // Observer the document's update and broadcast it to all subscribers. let cloned_oid = self.object_id.clone(); - let broadcast_sink = self.sender.clone(); + let broadcast_sink = self.broadcast_sender.clone(); let modified_at = self.modified_at.clone(); let edit_state = self.edit_state.clone(); @@ -101,13 +101,14 @@ impl CollabBroadcast { .get_doc() .observe_update_v1(move |txn, event| { let seq_num = edit_state.increment_edit_count() + 1; - let update_len = event.update.len(); let origin = CollabOrigin::from(txn); - + trace!( + "observe update with len:{}, origin: {}", + event.update.len(), + origin + ); let payload = gen_update_message(&event.update); let msg = BroadcastSync::new(origin, cloned_oid.clone(), payload, seq_num); - - trace!("collab update with len:{}", update_len); if let Err(err) = broadcast_sink.send(msg.into()) { trace!("fail to broadcast updates:{}", err); } @@ -115,21 +116,22 @@ impl CollabBroadcast { }) .unwrap(); - let broadcast_sink = self.sender.clone(); + let broadcast_sink = self.broadcast_sender.clone(); let cloned_oid = self.object_id.clone(); // Observer the awareness's update and broadcast it to all subscribers. - let awareness_sub = collab.lock().observe_awareness(move |awareness, event| { - if let Ok(awareness_update) = gen_awareness_update_message(awareness, event) { - trace!("awareness update:{}", awareness_update); - let payload = Message::Awareness(awareness_update).encode_v1(); - // TODO(nathan): replace the origin from awareness transaction - let msg = AwarenessSync::new(cloned_oid.clone(), payload); - if let Err(err) = broadcast_sink.send(msg.into()) { - trace!("fail to broadcast awareness:{}", err); + let awareness_sub = collab + .lock() + .observe_awareness(move |awareness, event, origin| { + if let Ok(awareness_update) = gen_awareness_update_message(awareness, event) { + trace!("awareness update:{}", origin); + let payload = Message::Awareness(awareness_update).encode_v1(); + let msg = AwarenessSync::new(cloned_oid.clone(), payload, origin.clone()); + if let Err(err) = broadcast_sink.send(msg.into()) { + trace!("fail to broadcast awareness:{}", err); + } } - } - }); + }); (doc_sub, awareness_sub) }; @@ -137,16 +139,7 @@ impl CollabBroadcast { self.awareness_sub = Some(awareness_sub); } - /// Broadcasts user message to all active subscribers. Returns error if message could not have - /// been broadcast. - #[allow(clippy::result_large_err)] - #[allow(dead_code)] - pub fn broadcast_awareness(&self, msg: AwarenessSync) -> Result<(), SendError> { - self.sender.send(msg.into())?; - Ok(()) - } - - /// Subscribes a new connection to a broadcast group, enabling real-time collaboration. + /// Subscribes a new connection to a broadcast group /// /// This function takes a `sink`/`stream` pair representing the connection to a subscriber. The `sink` /// is used to send messages to the subscriber, while the `stream` receives messages from the subscriber. @@ -195,7 +188,7 @@ impl CollabBroadcast { // the receiver will continue to receive updates from the document observer and forward the update to // connected subscriber using its Sink. The loop will break if the stop_rx receives a message. - let mut receiver = self.sender.subscribe(); + let mut receiver = self.broadcast_sender.subscribe(); let cloned_user = user.clone(); rt_spawn(async move { loop { @@ -204,6 +197,8 @@ impl CollabBroadcast { result = receiver.recv() => { match result { Ok(message) => { + + // No need to broadcast the message back to the originator if message.origin() == &subscriber_origin { continue; }