chore: awarenss with origin (#498)

This commit is contained in:
Nathan.fooo 2024-04-26 19:39:08 +08:00 committed by GitHub
parent 32bdfb7037
commit e2fd049333
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 38 additions and 42 deletions

8
Cargo.lock generated
View File

@ -1648,7 +1648,7 @@ dependencies = [
[[package]]
name = "collab"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=7dadc5ac7da58b862e856797583f5e40cb709bd1#7dadc5ac7da58b862e856797583f5e40cb709bd1"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=85580a5c0e95b5dae4787336faa751da44365760#85580a5c0e95b5dae4787336faa751da44365760"
dependencies = [
"anyhow",
"async-trait",
@ -1672,7 +1672,7 @@ dependencies = [
[[package]]
name = "collab-document"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=7dadc5ac7da58b862e856797583f5e40cb709bd1#7dadc5ac7da58b862e856797583f5e40cb709bd1"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=85580a5c0e95b5dae4787336faa751da44365760#85580a5c0e95b5dae4787336faa751da44365760"
dependencies = [
"anyhow",
"collab",
@ -1691,7 +1691,7 @@ dependencies = [
[[package]]
name = "collab-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=7dadc5ac7da58b862e856797583f5e40cb709bd1#7dadc5ac7da58b862e856797583f5e40cb709bd1"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=85580a5c0e95b5dae4787336faa751da44365760#85580a5c0e95b5dae4787336faa751da44365760"
dependencies = [
"anyhow",
"bytes",
@ -1706,7 +1706,7 @@ dependencies = [
[[package]]
name = "collab-folder"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=7dadc5ac7da58b862e856797583f5e40cb709bd1#7dadc5ac7da58b862e856797583f5e40cb709bd1"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=85580a5c0e95b5dae4787336faa751da44365760#85580a5c0e95b5dae4787336faa751da44365760"
dependencies = [
"anyhow",
"chrono",

View File

@ -209,10 +209,10 @@ debug = true
# will be removed when using yrs 0.18.2 that expose pendings
yrs = { git = "https://github.com/appflowy/y-crdt", rev = "3f25bb510ca5274e7657d3713fbed41fb46b4487" }
collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "7dadc5ac7da58b862e856797583f5e40cb709bd1" }
collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "7dadc5ac7da58b862e856797583f5e40cb709bd1" }
collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "7dadc5ac7da58b862e856797583f5e40cb709bd1" }
collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "7dadc5ac7da58b862e856797583f5e40cb709bd1" }
collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "85580a5c0e95b5dae4787336faa751da44365760" }
collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "85580a5c0e95b5dae4787336faa751da44365760" }
collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "85580a5c0e95b5dae4787336faa751da44365760" }
collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "85580a5c0e95b5dae4787336faa751da44365760" }
[features]
ai_enable = []

View File

@ -308,11 +308,11 @@ pub struct AwarenessSync {
}
impl AwarenessSync {
pub fn new(object_id: String, payload: Vec<u8>) -> Self {
pub fn new(object_id: String, payload: Vec<u8>, origin: CollabOrigin) -> Self {
Self {
object_id,
payload: Bytes::from(payload),
origin: CollabOrigin::Server,
origin,
}
}
}

View File

@ -172,10 +172,11 @@ pub trait CollabSyncProtocol {
/// instance is being updated with incoming data.
fn handle_awareness_update(
&self,
message_origin: &CollabOrigin,
awareness: &mut Awareness,
update: AwarenessUpdate,
) -> Result<Option<Vec<u8>>, RTProtocolError> {
awareness.apply_update(update)?;
awareness.apply_update(update, message_origin)?;
Ok(None)
}
@ -221,7 +222,7 @@ pub fn handle_message_follow_protocol<P: CollabSyncProtocol>(
},
Message::Auth(reason) => protocol.handle_auth(collab.get_awareness(), reason),
Message::Awareness(update) => {
protocol.handle_awareness_update(collab.get_mut_awareness(), update)
protocol.handle_awareness_update(message_origin, collab.get_mut_awareness(), update)
},
Message::Custom(msg) => protocol.handle_custom_message(collab.get_mut_awareness(), msg),
}

View File

@ -21,7 +21,7 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use tokio::select;
use tokio::sync::broadcast::error::SendError;
use tokio::sync::broadcast::{channel, Sender};
use tokio::time::{sleep, Instant};
@ -37,7 +37,7 @@ use yrs::UpdateSubscription;
///
pub struct CollabBroadcast {
object_id: String,
sender: Sender<CollabMessage>,
broadcast_sender: Sender<CollabMessage>,
awareness_sub: Option<AwarenessUpdateSubscription>,
/// Keep the lifetime of the document observer subscription. The subscription will be stopped
/// when the broadcast is dropped.
@ -74,7 +74,7 @@ impl CollabBroadcast {
let (sender, _) = channel(buffer_capacity);
let mut this = CollabBroadcast {
object_id,
sender,
broadcast_sender: sender,
awareness_sub: Default::default(),
doc_subscription: Default::default(),
edit_state,
@ -88,7 +88,7 @@ impl CollabBroadcast {
let (doc_sub, awareness_sub) = {
// Observer the document's update and broadcast it to all subscribers.
let cloned_oid = self.object_id.clone();
let broadcast_sink = self.sender.clone();
let broadcast_sink = self.broadcast_sender.clone();
let modified_at = self.modified_at.clone();
let edit_state = self.edit_state.clone();
@ -101,13 +101,14 @@ impl CollabBroadcast {
.get_doc()
.observe_update_v1(move |txn, event| {
let seq_num = edit_state.increment_edit_count() + 1;
let update_len = event.update.len();
let origin = CollabOrigin::from(txn);
trace!(
"observe update with len:{}, origin: {}",
event.update.len(),
origin
);
let payload = gen_update_message(&event.update);
let msg = BroadcastSync::new(origin, cloned_oid.clone(), payload, seq_num);
trace!("collab update with len:{}", update_len);
if let Err(err) = broadcast_sink.send(msg.into()) {
trace!("fail to broadcast updates:{}", err);
}
@ -115,21 +116,22 @@ impl CollabBroadcast {
})
.unwrap();
let broadcast_sink = self.sender.clone();
let broadcast_sink = self.broadcast_sender.clone();
let cloned_oid = self.object_id.clone();
// Observer the awareness's update and broadcast it to all subscribers.
let awareness_sub = collab.lock().observe_awareness(move |awareness, event| {
if let Ok(awareness_update) = gen_awareness_update_message(awareness, event) {
trace!("awareness update:{}", awareness_update);
let payload = Message::Awareness(awareness_update).encode_v1();
// TODO(nathan): replace the origin from awareness transaction
let msg = AwarenessSync::new(cloned_oid.clone(), payload);
if let Err(err) = broadcast_sink.send(msg.into()) {
trace!("fail to broadcast awareness:{}", err);
let awareness_sub = collab
.lock()
.observe_awareness(move |awareness, event, origin| {
if let Ok(awareness_update) = gen_awareness_update_message(awareness, event) {
trace!("awareness update:{}", origin);
let payload = Message::Awareness(awareness_update).encode_v1();
let msg = AwarenessSync::new(cloned_oid.clone(), payload, origin.clone());
if let Err(err) = broadcast_sink.send(msg.into()) {
trace!("fail to broadcast awareness:{}", err);
}
}
}
});
});
(doc_sub, awareness_sub)
};
@ -137,16 +139,7 @@ impl CollabBroadcast {
self.awareness_sub = Some(awareness_sub);
}
/// Broadcasts user message to all active subscribers. Returns error if message could not have
/// been broadcast.
#[allow(clippy::result_large_err)]
#[allow(dead_code)]
pub fn broadcast_awareness(&self, msg: AwarenessSync) -> Result<(), SendError<CollabMessage>> {
self.sender.send(msg.into())?;
Ok(())
}
/// Subscribes a new connection to a broadcast group, enabling real-time collaboration.
/// Subscribes a new connection to a broadcast group
///
/// This function takes a `sink`/`stream` pair representing the connection to a subscriber. The `sink`
/// is used to send messages to the subscriber, while the `stream` receives messages from the subscriber.
@ -195,7 +188,7 @@ impl CollabBroadcast {
// the receiver will continue to receive updates from the document observer and forward the update to
// connected subscriber using its Sink. The loop will break if the stop_rx receives a message.
let mut receiver = self.sender.subscribe();
let mut receiver = self.broadcast_sender.subscribe();
let cloned_user = user.clone();
rt_spawn(async move {
loop {
@ -204,6 +197,8 @@ impl CollabBroadcast {
result = receiver.recv() => {
match result {
Ok(message) => {
// No need to broadcast the message back to the originator
if message.origin() == &subscriber_origin {
continue;
}