From 9589054f3874c60063878f084af01905b182d537 Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Mon, 11 Dec 2023 11:27:11 +0800 Subject: [PATCH] refactor: sync protocol (#203) * refactor: sync protocol * chore: update collab rev --- Cargo.lock | 30 ++- Cargo.toml | 13 +- libs/client-api/Cargo.toml | 2 + libs/client-api/src/collab_sync/error.rs | 4 +- libs/client-api/src/collab_sync/plugin.rs | 4 +- libs/client-api/src/collab_sync/sync.rs | 6 +- libs/database-entity/src/dto.rs | 3 +- libs/encrypt/Cargo.toml | 2 +- libs/realtime-entity/Cargo.toml | 5 +- libs/realtime-entity/src/collab_msg.rs | 2 +- libs/realtime-entity/src/lib.rs | 1 + libs/realtime-protocol/Cargo.toml | 14 ++ libs/realtime-protocol/src/lib.rs | 5 + libs/realtime-protocol/src/protocol.rs | 205 ++++++++++++++++++ libs/realtime/Cargo.toml | 1 + libs/realtime/src/collaborate/broadcast.rs | 7 +- libs/realtime/src/collaborate/group.rs | 5 + libs/realtime/src/collaborate/plugin.rs | 53 ++--- libs/realtime/src/collaborate/server.rs | 6 +- .../realtime/src/collaborate/sync_protocol.rs | 6 +- libs/realtime/src/error.rs | 4 +- 21 files changed, 313 insertions(+), 65 deletions(-) create mode 100644 libs/realtime-protocol/Cargo.toml create mode 100644 libs/realtime-protocol/src/lib.rs create mode 100644 libs/realtime-protocol/src/protocol.rs diff --git a/Cargo.lock b/Cargo.lock index 98ae6137..fbdd7662 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1243,6 +1243,7 @@ dependencies = [ "anyhow", "app-error", "async-trait", + "bincode", "bytes", "collab", "collab-entity", @@ -1256,6 +1257,7 @@ dependencies = [ "parking_lot", "prost", "realtime-entity", + "realtime-protocol", "reqwest", "scraper", "serde", @@ -1277,7 +1279,7 @@ dependencies = [ [[package]] name = "collab" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=82b3f74a716285ec595b8140c7255402433e7c8a#82b3f74a716285ec595b8140c7255402433e7c8a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=237cc24eda1e52f6dad9126f4a4c160a449688e3#237cc24eda1e52f6dad9126f4a4c160a449688e3" dependencies = [ "anyhow", "async-trait", @@ -1296,7 +1298,7 @@ dependencies = [ [[package]] name = "collab-derive" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=82b3f74a716285ec595b8140c7255402433e7c8a#82b3f74a716285ec595b8140c7255402433e7c8a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=237cc24eda1e52f6dad9126f4a4c160a449688e3#237cc24eda1e52f6dad9126f4a4c160a449688e3" dependencies = [ "proc-macro2", "quote", @@ -1308,7 +1310,7 @@ dependencies = [ [[package]] name = "collab-document" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=82b3f74a716285ec595b8140c7255402433e7c8a#82b3f74a716285ec595b8140c7255402433e7c8a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=237cc24eda1e52f6dad9126f4a4c160a449688e3#237cc24eda1e52f6dad9126f4a4c160a449688e3" dependencies = [ "anyhow", "collab", @@ -1327,7 +1329,7 @@ dependencies = [ [[package]] name = "collab-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=82b3f74a716285ec595b8140c7255402433e7c8a#82b3f74a716285ec595b8140c7255402433e7c8a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=237cc24eda1e52f6dad9126f4a4c160a449688e3#237cc24eda1e52f6dad9126f4a4c160a449688e3" dependencies = [ "anyhow", "bytes", @@ -1341,7 +1343,7 @@ dependencies = [ [[package]] name = "collab-folder" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=82b3f74a716285ec595b8140c7255402433e7c8a#82b3f74a716285ec595b8140c7255402433e7c8a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=237cc24eda1e52f6dad9126f4a4c160a449688e3#237cc24eda1e52f6dad9126f4a4c160a449688e3" dependencies = [ "anyhow", "chrono", @@ -1361,7 +1363,7 @@ dependencies = [ [[package]] name = "collab-persistence" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=82b3f74a716285ec595b8140c7255402433e7c8a#82b3f74a716285ec595b8140c7255402433e7c8a" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=237cc24eda1e52f6dad9126f4a4c160a449688e3#237cc24eda1e52f6dad9126f4a4c160a449688e3" dependencies = [ "anyhow", "async-trait", @@ -3927,6 +3929,7 @@ dependencies = [ "once_cell", "parking_lot", "realtime-entity", + "realtime-protocol", "reqwest", "serde", "serde-aux", @@ -3960,9 +3963,24 @@ dependencies = [ "prost", "prost-build", "protoc-bin-vendored", + "realtime-protocol", "serde", "serde_json", + "thiserror", "tokio-tungstenite", + "yrs", +] + +[[package]] +name = "realtime-protocol" +version = "0.1.0" +dependencies = [ + "anyhow", + "bincode", + "collab", + "serde", + "thiserror", + "yrs", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 038eee0c..fd350b68 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -123,11 +123,13 @@ members = [ "admin_frontend", "libs/app_error", "libs/workspace-template", - "libs/encrypt" + "libs/encrypt", + "libs/realtime-protocol" ] [workspace.dependencies] realtime-entity = { path = "libs/realtime-entity" } +realtime-protocol = { path = "libs/realtime-protocol" } database-entity = { path = "libs/database-entity" } app-error = { path = "libs/app_error" } serde_json = "1.0.108" @@ -139,6 +141,7 @@ uuid = { version = "1.4.1", features = ["v4"] } anyhow = "1.0.75" tokio = { version = "1.34", features = ["sync"] } yrs = "0.17.1" +bincode = "1.3.3" [profile.release] lto = true @@ -155,10 +158,10 @@ lto = false opt-level = 3 [patch.crates-io] -collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "82b3f74a716285ec595b8140c7255402433e7c8a" } -collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "82b3f74a716285ec595b8140c7255402433e7c8a" } -collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "82b3f74a716285ec595b8140c7255402433e7c8a" } -collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "82b3f74a716285ec595b8140c7255402433e7c8a" } +collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "237cc24eda1e52f6dad9126f4a4c160a449688e3" } +collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "237cc24eda1e52f6dad9126f4a4c160a449688e3" } +collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "237cc24eda1e52f6dad9126f4a4c160a449688e3" } +collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "237cc24eda1e52f6dad9126f4a4c160a449688e3" } # Comment the above and uncomment the below to use local version of collab by cloning the repo and placing it in libs folder #collab = { path = "libs/AppFlowy-Collab/collab" } diff --git a/libs/client-api/Cargo.toml b/libs/client-api/Cargo.toml index 7552b757..06faa00d 100644 --- a/libs/client-api/Cargo.toml +++ b/libs/client-api/Cargo.toml @@ -38,10 +38,12 @@ collab = { version = "0.1.0", optional = true } collab-entity = { version = "0.1.0" } yrs = { workspace = true, optional = true } realtime-entity = { workspace = true, features = ["tungstenite"] } +realtime-protocol = { workspace = true } workspace-template = { workspace = true, optional = true } mime_guess = "2.0.4" async-trait = { version = "0.1.73" } prost = "0.12.1" +bincode = "1.3.3" [features] diff --git a/libs/client-api/src/collab_sync/error.rs b/libs/client-api/src/collab_sync/error.rs index f03ff11a..4dfe5b67 100644 --- a/libs/client-api/src/collab_sync/error.rs +++ b/libs/client-api/src/collab_sync/error.rs @@ -1,10 +1,10 @@ #[derive(Debug, thiserror::Error)] pub enum SyncError { #[error(transparent)] - YSync(#[from] collab::sync_protocol::message::Error), + YSync(#[from] realtime_protocol::Error), #[error(transparent)] - YAwareness(#[from] collab::sync_protocol::awareness::Error), + YAwareness(#[from] collab::core::awareness::Error), #[error("failed to deserialize message: {0}")] DecodingError(#[from] yrs::encoding::read::Error), diff --git a/libs/client-api/src/collab_sync/plugin.rs b/libs/client-api/src/collab_sync/plugin.rs index 774b300a..84c68bc6 100644 --- a/libs/client-api/src/collab_sync/plugin.rs +++ b/libs/client-api/src/collab_sync/plugin.rs @@ -1,14 +1,14 @@ use std::sync::{Arc, Weak}; +use collab::core::awareness::Awareness; use collab::core::collab::MutexCollab; use collab::core::collab_state::SyncState; use collab::core::origin::CollabOrigin; use collab::preclude::CollabPlugin; -use collab::sync_protocol::awareness::Awareness; -use collab::sync_protocol::message::{Message, SyncMessage}; use collab_entity::{CollabObject, CollabType}; use futures_util::SinkExt; use realtime_entity::collab_msg::{CollabMessage, UpdateSync}; +use realtime_protocol::{Message, SyncMessage}; use tokio_stream::StreamExt; use crate::collab_sync::{SinkConfig, SyncQueue}; diff --git a/libs/client-api/src/collab_sync/sync.rs b/libs/client-api/src/collab_sync/sync.rs index c558d362..ddfd483b 100644 --- a/libs/client-api/src/collab_sync/sync.rs +++ b/libs/client-api/src/collab_sync/sync.rs @@ -2,14 +2,14 @@ use crate::collab_sync::{ CollabSink, CollabSinkRunner, SinkConfig, SinkState, SyncError, SyncObject, }; use bytes::Bytes; +use collab::core::awareness::Awareness; use collab::core::collab::MutexCollab; use collab::core::collab_state::SyncState; use collab::core::origin::CollabOrigin; -use collab::sync_protocol::awareness::Awareness; -use collab::sync_protocol::message::{Message, MessageReader, SyncMessage}; -use collab::sync_protocol::{handle_msg, ClientSyncProtocol, CollabSyncProtocol}; use futures_util::{SinkExt, StreamExt}; use realtime_entity::collab_msg::{CollabMessage, InitSync, ServerInit, UpdateSync}; +use realtime_protocol::{handle_msg, ClientSyncProtocol, CollabSyncProtocol}; +use realtime_protocol::{Message, MessageReader, SyncMessage}; use std::marker::PhantomData; use std::ops::Deref; use std::sync::{Arc, Weak}; diff --git a/libs/database-entity/src/dto.rs b/libs/database-entity/src/dto.rs index c2ef52b4..ad6c7518 100644 --- a/libs/database-entity/src/dto.rs +++ b/libs/database-entity/src/dto.rs @@ -38,12 +38,11 @@ impl InsertCollabParams { } } pub fn from_raw_data( - object_id: &str, + object_id: String, collab_type: CollabType, encoded_collab_v1: Vec, workspace_id: &str, ) -> Self { - let object_id = object_id.to_string(); let workspace_id = workspace_id.to_string(); Self { object_id, diff --git a/libs/encrypt/Cargo.toml b/libs/encrypt/Cargo.toml index 09f0e6f0..d4eaefb4 100644 --- a/libs/encrypt/Cargo.toml +++ b/libs/encrypt/Cargo.toml @@ -15,7 +15,7 @@ base64 = "0.21.5" hkdf = { version = "0.12.3" } sha2 = "0.10.8" serde = { version = "1.0.188", features = ["derive"] } -bincode = "1.3.3" +bincode.workspace = true bytes.workspace = true [dev-dependencies] diff --git a/libs/realtime-entity/Cargo.toml b/libs/realtime-entity/Cargo.toml index 7e874718..28315501 100644 --- a/libs/realtime-entity/Cargo.toml +++ b/libs/realtime-entity/Cargo.toml @@ -13,10 +13,13 @@ serde_json.workspace = true bytes = { version = "1.0", features = ["serde"] } anyhow = "1.0.75" actix = { version = "0.13", optional = true } -bincode = "1.3.3" +bincode.workspace = true tokio-tungstenite = { version = "0.20.1", optional = true } prost = "0.12.1" database-entity.workspace = true +yrs.workspace = true +thiserror = "1.0.48" +realtime-protocol.workspace = true [features] actix_message = ["actix"] diff --git a/libs/realtime-entity/src/collab_msg.rs b/libs/realtime-entity/src/collab_msg.rs index fed93a04..9038ae0a 100644 --- a/libs/realtime-entity/src/collab_msg.rs +++ b/libs/realtime-entity/src/collab_msg.rs @@ -8,8 +8,8 @@ use collab::core::origin::CollabOrigin; use collab::preclude::merge_updates_v1; use collab::preclude::updates::decoder::DecoderV1; use collab::preclude::updates::encoder::{Encode, Encoder, EncoderV1}; -use collab::sync_protocol::message::{Message, MessageReader, SyncMessage}; use collab_entity::CollabType; +use realtime_protocol::{Message, MessageReader, SyncMessage}; use serde::{Deserialize, Serialize}; pub trait CollabSinkMessage: Clone + Send + Sync + 'static + Ord + Display { diff --git a/libs/realtime-entity/src/lib.rs b/libs/realtime-entity/src/lib.rs index f789aaa3..ddc8784d 100644 --- a/libs/realtime-entity/src/lib.rs +++ b/libs/realtime-entity/src/lib.rs @@ -1,4 +1,5 @@ pub mod collab_msg; + pub mod message; pub mod user; diff --git a/libs/realtime-protocol/Cargo.toml b/libs/realtime-protocol/Cargo.toml new file mode 100644 index 00000000..6b0e6f9c --- /dev/null +++ b/libs/realtime-protocol/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "realtime-protocol" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +yrs.workspace = true +thiserror = "1.0.48" +serde.workspace = true +collab = { version = "0.1.0" } +bincode.workspace = true +anyhow.workspace = true diff --git a/libs/realtime-protocol/src/lib.rs b/libs/realtime-protocol/src/lib.rs new file mode 100644 index 00000000..40e6fcf7 --- /dev/null +++ b/libs/realtime-protocol/src/lib.rs @@ -0,0 +1,5 @@ +mod message; +mod protocol; + +pub use message::*; +pub use protocol::*; diff --git a/libs/realtime-protocol/src/protocol.rs b/libs/realtime-protocol/src/protocol.rs new file mode 100644 index 00000000..ff99a821 --- /dev/null +++ b/libs/realtime-protocol/src/protocol.rs @@ -0,0 +1,205 @@ +use std::time::Duration; + +use anyhow::anyhow; +use collab::core::awareness::{Awareness, AwarenessUpdate}; +use collab::core::collab::{MutexCollab, TransactionMutExt}; +use collab::core::origin::CollabOrigin; +use yrs::updates::decoder::Decode; +use yrs::updates::encoder::{Encode, Encoder}; +use yrs::{ReadTxn, StateVector, Transact, Update}; + +use crate::message::{CustomMessage, Error, Message, SyncMessage, SyncMeta}; + +// *************************** +// Client A Client B Server +// | | | +// |---(1)--Sync Step1----->| +// | | | +// |<--(2)--Sync Step2------| +// |<-------Sync Step1------| +// | | | +// |---(3)--Sync Step2----->| +// | | | +// ************************** +// |---(1)-- Update-------->| +// | | | +// | | (2) Apply->| +// | | | +// | |<-(3) Broadcast +// | | | +// | |< (4) Apply | +/// A implementation of [CollabSyncProtocol]. +#[derive(Clone)] +pub struct ClientSyncProtocol; +impl CollabSyncProtocol for ClientSyncProtocol { + fn check(&self, encoder: &mut E, last_sync_at: i64) -> Result<(), Error> { + let meta = SyncMeta { last_sync_at }; + Message::Custom(CustomMessage::SyncCheck(meta)).encode(encoder); + Ok(()) + } +} + +pub trait CollabSyncProtocol { + fn check(&self, _encoder: &mut E, _last_sync_at: i64) -> Result<(), Error> { + Ok(()) + } + + fn start(&self, awareness: &Awareness, encoder: &mut E) -> Result<(), Error> { + let (sv, update) = { + let sv = awareness.doc().transact().state_vector(); + let update = awareness.update()?; + (sv, update) + }; + + Message::Sync(SyncMessage::SyncStep1(sv)).encode(encoder); + Message::Awareness(update).encode(encoder); + Ok(()) + } + + /// Given a [StateVector] of a remote side, calculate missing + /// updates. Returns a sync-step-2 message containing a calculated update. + fn handle_sync_step1( + &self, + awareness: &Awareness, + sv: StateVector, + ) -> Result>, Error> { + let update = awareness + .doc() + .try_transact() + .map_err(|err| Error::YrsTransaction(format!("fail to handle sync step1. error: {}", err)))? + .encode_state_as_update_v1(&sv); + Ok(Some( + Message::Sync(SyncMessage::SyncStep2(update)).encode_v1(), + )) + } + + /// Handle reply for a sync-step-1 send from this replica previously. By default just apply + /// an update to current `awareness` document instance. + fn handle_sync_step2( + &self, + origin: &Option<&CollabOrigin>, + awareness: &mut Awareness, + update: Update, + ) -> Result>, Error> { + let mut txn = match origin { + Some(origin) => awareness.doc().try_transact_mut_with((*origin).clone()), + None => awareness.doc().try_transact_mut(), + } + .map_err(|err| Error::YrsTransaction(format!("fail to handle sync step2. error: {}", err)))?; + txn.try_apply_update(update).map_err(|err| { + Error::YrsTransaction(format!("fail to apply sync step2 update. error: {}", err)) + })?; + Ok(None) + } + + /// Handle continuous update send from the client. By default just apply an update to a current + /// `awareness` document instance. + fn handle_update( + &self, + origin: &Option<&CollabOrigin>, + awareness: &mut Awareness, + update: Update, + ) -> Result>, Error> { + self.handle_sync_step2(origin, awareness, update) + } + + fn handle_auth( + &self, + _awareness: &Awareness, + deny_reason: Option, + ) -> Result>, Error> { + if let Some(reason) = deny_reason { + Err(Error::PermissionDenied { reason }) + } else { + Ok(None) + } + } + + /// Reply to awareness query or just incoming [AwarenessUpdate], where current `awareness` + /// instance is being updated with incoming data. + fn handle_awareness_update( + &self, + awareness: &mut Awareness, + update: AwarenessUpdate, + ) -> Result>, Error> { + awareness.apply_update(update)?; + Ok(None) + } + + fn handle_custom_message( + &self, + _awareness: &mut Awareness, + _msg: CustomMessage, + ) -> Result>, Error> { + Ok(None) + } +} + +/// Handles incoming messages from the client/server +pub fn handle_msg( + origin: &Option<&CollabOrigin>, + protocol: &P, + collab: &MutexCollab, + msg: Message, +) -> Result>, Error> { + match msg { + Message::Sync(msg) => match msg { + SyncMessage::SyncStep1(sv) => { + let collab = collab + .try_lock_for(Duration::from_millis(400)) + .ok_or(Error::Internal(anyhow!( + "Timeout while trying to acquire lock" + )))?; + protocol.handle_sync_step1(collab.get_awareness(), sv) + }, + SyncMessage::SyncStep2(update) => { + let mut collab = collab + .try_lock_for(Duration::from_millis(400)) + .ok_or(Error::Internal(anyhow!( + "Timeout while trying to acquire lock" + )))?; + protocol.handle_sync_step2( + origin, + collab.get_mut_awareness(), + Update::decode_v1(&update)?, + ) + }, + SyncMessage::Update(update) => { + let mut collab = collab + .try_lock_for(Duration::from_millis(400)) + .ok_or(Error::Internal(anyhow!( + "Timeout while trying to acquire lock" + )))?; + protocol.handle_update( + origin, + collab.get_mut_awareness(), + Update::decode_v1(&update)?, + ) + }, + }, + Message::Auth(reason) => { + let collab = collab + .try_lock_for(Duration::from_millis(400)) + .ok_or(Error::Internal(anyhow!( + "Timeout while trying to acquire lock" + )))?; + protocol.handle_auth(collab.get_awareness(), reason) + }, + Message::Awareness(update) => { + let mut collab = collab + .try_lock_for(Duration::from_millis(400)) + .ok_or(Error::Internal(anyhow!( + "Timeout while trying to acquire lock" + )))?; + protocol.handle_awareness_update(collab.get_mut_awareness(), update) + }, + Message::Custom(msg) => { + let mut collab = collab + .try_lock_for(Duration::from_millis(400)) + .ok_or(Error::Internal(anyhow!( + "Timeout while trying to acquire lock" + )))?; + protocol.handle_custom_message(collab.get_mut_awareness(), msg) + }, + } +} diff --git a/libs/realtime/Cargo.toml b/libs/realtime/Cargo.toml index c345f6ec..04f604a0 100644 --- a/libs/realtime/Cargo.toml +++ b/libs/realtime/Cargo.toml @@ -32,6 +32,7 @@ database-entity.workspace = true yrs.workspace = true chrono = "0.4.30" realtime-entity = { workspace = true, features = ["actix_message"] } +realtime-protocol.workspace = true uuid = { version = "1", features = ["v4"] } [dev-dependencies] diff --git a/libs/realtime/src/collaborate/broadcast.rs b/libs/realtime/src/collaborate/broadcast.rs index 938b94b6..560b8efa 100644 --- a/libs/realtime/src/collaborate/broadcast.rs +++ b/libs/realtime/src/collaborate/broadcast.rs @@ -1,12 +1,13 @@ +use collab::core::awareness; use std::sync::Arc; use crate::collaborate::sync_protocol::ServerSyncProtocol; +use collab::core::awareness::{Awareness, AwarenessUpdate}; use collab::core::collab::MutexCollab; use collab::core::origin::CollabOrigin; -use collab::sync_protocol::awareness::{Awareness, AwarenessUpdate}; -use collab::sync_protocol::message::{Message, MessageReader, MSG_SYNC, MSG_SYNC_UPDATE}; -use collab::sync_protocol::{awareness, handle_msg}; use futures_util::{SinkExt, StreamExt}; +use realtime_protocol::handle_msg; +use realtime_protocol::{Message, MessageReader, MSG_SYNC, MSG_SYNC_UPDATE}; use tokio::select; use tokio::sync::broadcast::error::SendError; use tokio::sync::broadcast::{channel, Sender}; diff --git a/libs/realtime/src/collaborate/group.rs b/libs/realtime/src/collaborate/group.rs index a212cc70..93aeacbc 100644 --- a/libs/realtime/src/collaborate/group.rs +++ b/libs/realtime/src/collaborate/group.rs @@ -8,6 +8,7 @@ use collab_entity::CollabType; use database::collab::CollabStorage; use std::collections::HashMap; +use collab::core::collab_plugin::EncodedCollabV1; use std::sync::Arc; use tokio::sync::RwLock; use tokio::task::spawn_blocking; @@ -177,6 +178,10 @@ where f(&collab); } + pub fn encode_v1(&self) -> EncodedCollabV1 { + self.collab.lock().encode_collab_v1() + } + pub async fn is_empty(&self) -> bool { self.subscribers.read().await.is_empty() } diff --git a/libs/realtime/src/collaborate/plugin.rs b/libs/realtime/src/collaborate/plugin.rs index 28cd4ec6..2fb4f24f 100644 --- a/libs/realtime/src/collaborate/plugin.rs +++ b/libs/realtime/src/collaborate/plugin.rs @@ -5,11 +5,11 @@ use app_error::AppError; use async_trait::async_trait; use crate::collaborate::{CollabAccessControl, CollabUserId}; +use collab::core::awareness::Awareness; use collab::core::collab::TransactionMutExt; use collab::core::collab_plugin::EncodedCollabV1; use collab::core::origin::CollabOrigin; use collab::preclude::{CollabPlugin, Doc, TransactionMut}; -use collab::sync_protocol::awareness::Awareness; use collab_entity::CollabType; use database::collab::CollabStorage; use database_entity::dto::{AFAccessLevel, InsertCollabParams, QueryCollabParams}; @@ -160,7 +160,7 @@ where match result { Ok(encoded_collab_v1) => { let params = InsertCollabParams::from_raw_data( - object_id, + object_id.to_string(), self.collab_type.clone(), encoded_collab_v1, &self.workspace_id, @@ -199,36 +199,27 @@ where } } - fn flush(&self, object_id: &str, data: &EncodedCollabV1) { + fn flush(&self, object_id: &str, doc: &Doc) { let storage = self.storage.clone(); - match data.encode_to_bytes() { - Ok(encoded_collab_v1) => { - let params = InsertCollabParams::from_raw_data( - object_id, - self.collab_type.clone(), - encoded_collab_v1, - &self.workspace_id, - ); - - info!( - "[realtime] start flushing {}:{} with len: {}", - object_id, - params.collab_type, - params.encoded_collab_v1.len() - ); - - let uid = self.uid; - tokio::spawn(async move { - let object_id = params.object_id.clone(); - match storage.insert_collab(&uid, params).await { - Ok(_) => info!("[realtime] end flushing collab: {}", object_id), - Err(err) => error!("save collab failed: {:?}", err), - } - }); - }, - Err(err) => { - error!("fail to encode EncodedDocV1 to bytes: {:?}", err); - }, + let uid = self.uid; + let workspace_id = self.workspace_id.clone(); + let collab_type = self.collab_type.clone(); + let object_id = object_id.to_string(); + if let Ok(encoded_collab_v1) = { + let txn = doc.transact(); + let doc_state = txn.encode_state_as_update_v1(&StateVector::default()); + let state_vector = txn.state_vector().encode_v1(); + EncodedCollabV1::new(doc_state, state_vector).encode_to_bytes() + } { + let params = + InsertCollabParams::from_raw_data(object_id, collab_type, encoded_collab_v1, &workspace_id); + let object_id = params.object_id.clone(); + tokio::spawn(async move { + match storage.insert_collab(&uid, params).await { + Ok(_) => info!("[realtime] flushing collab: {}", object_id), + Err(err) => error!("save collab failed: {:?}", err), + } + }); } } } diff --git a/libs/realtime/src/collaborate/server.rs b/libs/realtime/src/collaborate/server.rs index 5baeb84b..87cd61aa 100644 --- a/libs/realtime/src/collaborate/server.rs +++ b/libs/realtime/src/collaborate/server.rs @@ -238,7 +238,7 @@ where .run() .await?; - broadcast_message(&user, &collab_message, &client_stream_by_user).await; + broadcast_message(&user, collab_message, &client_stream_by_user).await; Ok(()) }) }, @@ -253,7 +253,7 @@ where #[inline] async fn broadcast_message( user: &U, - collab_message: &CollabMessage, + collab_message: CollabMessage, client_streams: &Arc>>, ) where U: RealtimeUser, @@ -263,7 +263,7 @@ async fn broadcast_message( trace!("[realtime]: receives collab message: {}", collab_message); match client_stream .stream_tx - .send(Ok(RealtimeMessage::Collab(collab_message.clone()))) + .send(Ok(RealtimeMessage::Collab(collab_message))) { Ok(_) => {}, Err(e) => error!("send error: {}", e), diff --git a/libs/realtime/src/collaborate/sync_protocol.rs b/libs/realtime/src/collaborate/sync_protocol.rs index 97607128..755f1f74 100644 --- a/libs/realtime/src/collaborate/sync_protocol.rs +++ b/libs/realtime/src/collaborate/sync_protocol.rs @@ -1,6 +1,6 @@ -use collab::sync_protocol::awareness::Awareness; -use collab::sync_protocol::message::{CustomMessage, Error, Message, SyncMessage}; -use collab::sync_protocol::CollabSyncProtocol; +use collab::core::awareness::Awareness; +use realtime_protocol::CollabSyncProtocol; +use realtime_protocol::{CustomMessage, Error, Message, SyncMessage}; use yrs::updates::encoder::{Encode, Encoder, EncoderV1}; use yrs::{ReadTxn, StateVector, Transact}; diff --git a/libs/realtime/src/error.rs b/libs/realtime/src/error.rs index fa7d36b6..e0b89c43 100644 --- a/libs/realtime/src/error.rs +++ b/libs/realtime/src/error.rs @@ -3,10 +3,10 @@ use collab::error::CollabError; #[derive(Debug, thiserror::Error)] pub enum RealtimeError { #[error(transparent)] - YSync(#[from] collab::sync_protocol::message::Error), + YSync(#[from] realtime_protocol::Error), #[error(transparent)] - YAwareness(#[from] collab::sync_protocol::awareness::Error), + YAwareness(#[from] collab::core::awareness::Error), #[error("failed to deserialize message: {0}")] DecodingError(#[from] yrs::encoding::read::Error),