From 1590e948c6e874a9ce6042075611a582326223b3 Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Thu, 22 Feb 2024 14:18:41 +0800 Subject: [PATCH] feat: Optimize sync (#341) * chore: optimize sync * chore: optimize sync * chore: optimize sync * chore: update collab rev --- Cargo.lock | 9 +- Cargo.toml | 8 +- build/client_api_deps_check.sh | 2 +- libs/client-api-test-util/src/test_client.rs | 15 +- libs/client-api/Cargo.toml | 1 + libs/client-api/src/collab_sync/mod.rs | 8 +- libs/client-api/src/collab_sync/plugin.rs | 23 +- libs/client-api/src/collab_sync/sink.rs | 104 ++------ .../client-api/src/collab_sync/sink_config.rs | 68 +++++ .../{pending_msg.rs => sink_pending_queue.rs} | 8 +- .../collab_sync/{sync.rs => sync_control.rs} | 235 +++++++++++------- libs/client-api/src/ws/client.rs | 36 ++- libs/realtime-entity/src/collab_msg.rs | 11 +- libs/realtime-protocol/src/protocol.rs | 6 +- libs/realtime/src/collaborate/broadcast.rs | 108 +++----- .../realtime/src/collaborate/group_control.rs | 13 +- libs/realtime/src/collaborate/plugin.rs | 6 +- tests/collab/edit_permission.rs | 18 +- tests/collab/edit_workspace.rs | 30 ++- tests/collab/multi_devices_edit.rs | 198 ++++++++++++++- tests/collab/single_device_edit.rs | 18 +- 21 files changed, 622 insertions(+), 303 deletions(-) create mode 100644 libs/client-api/src/collab_sync/sink_config.rs rename libs/client-api/src/collab_sync/{pending_msg.rs => sink_pending_queue.rs} (95%) rename libs/client-api/src/collab_sync/{sync.rs => sync_control.rs} (62%) diff --git a/Cargo.lock b/Cargo.lock index bae0f4ae..1618f806 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1299,6 +1299,7 @@ dependencies = [ "gotrue", "gotrue-entity", "governor", + "log", "mime", "mime_guess", "parking_lot 0.12.1", @@ -1360,7 +1361,7 @@ dependencies = [ [[package]] name = "collab" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=01be7a981515dd02a8bd37e3e79f1d24eade0f47#01be7a981515dd02a8bd37e3e79f1d24eade0f47" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=35c649ea201e12bf40f5352a8bf9c46141e013a5#35c649ea201e12bf40f5352a8bf9c46141e013a5" dependencies = [ "anyhow", "async-trait", @@ -1382,7 +1383,7 @@ dependencies = [ [[package]] name = "collab-document" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=01be7a981515dd02a8bd37e3e79f1d24eade0f47#01be7a981515dd02a8bd37e3e79f1d24eade0f47" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=35c649ea201e12bf40f5352a8bf9c46141e013a5#35c649ea201e12bf40f5352a8bf9c46141e013a5" dependencies = [ "anyhow", "collab", @@ -1401,7 +1402,7 @@ dependencies = [ [[package]] name = "collab-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=01be7a981515dd02a8bd37e3e79f1d24eade0f47#01be7a981515dd02a8bd37e3e79f1d24eade0f47" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=35c649ea201e12bf40f5352a8bf9c46141e013a5#35c649ea201e12bf40f5352a8bf9c46141e013a5" dependencies = [ "anyhow", "bytes", @@ -1416,7 +1417,7 @@ dependencies = [ [[package]] name = "collab-folder" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=01be7a981515dd02a8bd37e3e79f1d24eade0f47#01be7a981515dd02a8bd37e3e79f1d24eade0f47" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=35c649ea201e12bf40f5352a8bf9c46141e013a5#35c649ea201e12bf40f5352a8bf9c46141e013a5" dependencies = [ "anyhow", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 0abde9e8..5474327f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -172,10 +172,10 @@ inherits = "release" debug = true [patch.crates-io] -collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "01be7a981515dd02a8bd37e3e79f1d24eade0f47" } -collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "01be7a981515dd02a8bd37e3e79f1d24eade0f47" } -collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "01be7a981515dd02a8bd37e3e79f1d24eade0f47" } -collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "01be7a981515dd02a8bd37e3e79f1d24eade0f47" } +collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "35c649ea201e12bf40f5352a8bf9c46141e013a5" } +collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "35c649ea201e12bf40f5352a8bf9c46141e013a5" } +collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "35c649ea201e12bf40f5352a8bf9c46141e013a5" } +collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "35c649ea201e12bf40f5352a8bf9c46141e013a5" } [features] custom_env= [] diff --git a/build/client_api_deps_check.sh b/build/client_api_deps_check.sh index 189986a0..37516f27 100644 --- a/build/client_api_deps_check.sh +++ b/build/client_api_deps_check.sh @@ -3,7 +3,7 @@ # Generate the current dependency list cargo tree > current_deps.txt -BASELINE_COUNT=1684 +BASELINE_COUNT=1685 CURRENT_COUNT=$(cat current_deps.txt | wc -l) echo "Expected dependency count (baseline): $BASELINE_COUNT" diff --git a/libs/client-api-test-util/src/test_client.rs b/libs/client-api-test-util/src/test_client.rs index 3c929231..9595a943 100644 --- a/libs/client-api-test-util/src/test_client.rs +++ b/libs/client-api-test-util/src/test_client.rs @@ -534,6 +534,16 @@ impl TestClient { .await .unwrap(); } + + pub async fn get_edit_collab_json(&self, object_id: &str) -> Value { + self + .collab_by_object_id + .get(object_id) + .unwrap() + .collab + .lock() + .to_json_value() + } } pub async fn assert_server_snapshot( @@ -638,12 +648,11 @@ pub async fn assert_server_collab( } } -pub async fn assert_client_collab( +pub async fn assert_client_collab_within_30_secs( client: &mut TestClient, object_id: &str, key: &str, expected: Value, - _retry_duration: u64, ) { let secs = 30; let object_id = object_id.to_string(); @@ -676,7 +685,7 @@ pub async fn assert_client_collab( } } -pub async fn assert_client_collab_include_value( +pub async fn assert_client_collab_include_value_within_30_secs( client: &mut TestClient, object_id: &str, expected: Value, diff --git a/libs/client-api/Cargo.toml b/libs/client-api/Cargo.toml index 138c0040..60de9518 100644 --- a/libs/client-api/Cargo.toml +++ b/libs/client-api/Cargo.toml @@ -44,6 +44,7 @@ database-entity.workspace = true app-error = { workspace = true, features = ["tokio_error", "bincode_error"] } scraper = { version = "0.17.1", optional = true } governor = { version = "0.6.0" } +log = "0.4.20" [target.'cfg(not(target_arch = "wasm32"))'.dependencies] tokio-retry = "0.3" diff --git a/libs/client-api/src/collab_sync/mod.rs b/libs/client-api/src/collab_sync/mod.rs index f967b450..b3826832 100644 --- a/libs/client-api/src/collab_sync/mod.rs +++ b/libs/client-api/src/collab_sync/mod.rs @@ -1,14 +1,16 @@ mod channel; mod error; -mod pending_msg; mod plugin; mod sink; -mod sync; +mod sink_config; +mod sink_pending_queue; +mod sync_control; pub use channel::*; pub use error::*; pub use plugin::*; pub use sink::*; -pub use sync::*; +pub use sink_config::*; +pub use sync_control::*; pub use realtime_entity::collab_msg; diff --git a/libs/client-api/src/collab_sync/plugin.rs b/libs/client-api/src/collab_sync/plugin.rs index c0e28f60..96dbbcdf 100644 --- a/libs/client-api/src/collab_sync/plugin.rs +++ b/libs/client-api/src/collab_sync/plugin.rs @@ -1,27 +1,27 @@ use std::sync::{Arc, Weak}; -use collab::core::awareness::Awareness; use collab::core::collab::MutexCollab; use collab::core::collab_state::SyncState; use collab::core::origin::CollabOrigin; -use collab::preclude::CollabPlugin; +use collab::preclude::{Collab, CollabPlugin}; use collab_entity::{CollabObject, CollabType}; use futures_util::SinkExt; use realtime_entity::collab_msg::{CollabMessage, UpdateSync}; use realtime_protocol::{Message, SyncMessage}; use tokio_stream::StreamExt; -use crate::collab_sync::{SinkConfig, SyncQueue}; +use crate::collab_sync::SyncControl; use tokio_stream::wrappers::WatchStream; use tracing::trace; +use crate::collab_sync::sink_config::SinkConfig; use crate::platform_spawn; use crate::ws::{ConnectState, WSConnectStateReceiver}; use yrs::updates::encoder::Encode; pub struct SyncPlugin { object: SyncObject, - sync_queue: Arc>, + sync_queue: Arc>, // Used to keep the lifetime of the channel #[allow(dead_code)] channel: Option>, @@ -53,13 +53,13 @@ where mut ws_connect_state: WSConnectStateReceiver, ) -> Self { let weak_local_collab = collab.clone(); - let sync_queue = SyncQueue::new( + let sync_queue = SyncControl::new( object.clone(), origin, sink, + sink_config, stream, collab.clone(), - sink_config, pause, ); @@ -86,16 +86,19 @@ where (weak_local_collab.upgrade(), weak_sync_queue.upgrade()) { if let Some(local_collab) = local_collab.try_lock() { - let last_sync_at = local_collab.get_last_sync_at(); sync_queue.resume(); - sync_queue.init_sync(local_collab.get_awareness(), last_sync_at); + sync_queue.init_sync(&local_collab); } + } else { + break; } }, ConnectState::Unauthorized | ConnectState::Closed => { if let Some(sync_queue) = weak_sync_queue.upgrade() { // Stop sync if the websocket is unauthorized or disconnected sync_queue.pause(); + } else { + break; } }, _ => {}, @@ -123,8 +126,8 @@ where Stream: StreamExt> + Send + Sync + Unpin + 'static, C: Send + Sync + 'static, { - fn did_init(&self, _awareness: &Awareness, _object_id: &str, last_sync_at: i64) { - self.sync_queue.init_sync(_awareness, last_sync_at); + fn did_init(&self, collab: &Collab, _object_id: &str, _last_sync_at: i64) { + self.sync_queue.init_sync(collab); } fn receive_local_update(&self, origin: &CollabOrigin, _object_id: &str, update: &[u8]) { diff --git a/libs/client-api/src/collab_sync/sink.rs b/libs/client-api/src/collab_sync/sink.rs index 8bc6b84b..85ec8542 100644 --- a/libs/client-api/src/collab_sync/sink.rs +++ b/libs/client-api/src/collab_sync/sink.rs @@ -3,10 +3,11 @@ use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::{Arc, Weak}; use std::time::Duration; -use crate::collab_sync::pending_msg::{MessageState, PendingMsgQueue}; -use crate::collab_sync::{SyncError, SyncObject, DEFAULT_SYNC_TIMEOUT}; +use crate::collab_sync::sink_pending_queue::{MessageState, SinkPendingQueue}; +use crate::collab_sync::{SyncError, SyncObject}; use futures_util::SinkExt; +use crate::collab_sync::sink_config::{SinkConfig, SinkStrategy}; use crate::platform_spawn; use realtime_entity::collab_msg::{CollabMessage, CollabSinkMessage, MsgId}; use tokio::sync::{mpsc, oneshot, watch, Mutex}; @@ -35,21 +36,17 @@ pub struct CollabSink { /// The [Sink] is used to send the messages to the remote. It might be a websocket sink or /// other sink that implements the [SinkExt] trait. sender: Arc>, - - /// The [PendingMsgQueue] is used to queue the messages that are waiting to be sent to the + /// The [SinkPendingQueue] is used to queue the messages that are waiting to be sent to the /// remote. It will merge the messages if possible. - pending_msg_queue: Arc>>, + pending_msg_queue: Arc>>, msg_id_counter: Arc, - /// The [watch::Sender] is used to notify the [CollabSinkRunner] to process the pending messages. /// Sending `false` will stop the [CollabSinkRunner]. notifier: Arc>, config: SinkConfig, - /// Stop the [IntervalRunner] if the sink strategy is [SinkStrategy::FixInterval]. #[allow(dead_code)] interval_runner_stop_tx: Option>, - /// Used to calculate the time interval between two messages. Only used when the sink strategy /// is [SinkStrategy::FixInterval]. instant: Mutex, @@ -84,7 +81,7 @@ where let notifier = Arc::new(notifier); let state_notifier = Arc::new(sync_state_tx); let sender = Arc::new(Mutex::new(sink)); - let pending_msg_queue = PendingMsgQueue::new(uid); + let pending_msg_queue = SinkPendingQueue::new(uid); let pending_msg_queue = Arc::new(parking_lot::Mutex::new(pending_msg_queue)); let msg_id_counter = Arc::new(msg_id_counter); // @@ -112,7 +109,7 @@ where } /// Put the message into the queue and notify the sink to process the next message. - /// After the [Msg] was pushed into the [PendingMsgQueue]. The queue will pop the next msg base on + /// After the [Msg] was pushed into the [SinkPendingQueue]. The queue will pop the next msg base on /// its priority. And the message priority is determined by the [Msg] that implement the [Ord] and /// [PartialOrd] trait. Check out the [CollabMessage] for more details. /// @@ -134,8 +131,13 @@ where // When the client is connected, remove all pending messages and send the init message. { let mut pending_msg_queue = self.pending_msg_queue.lock(); + // if there is an init message in the queue, return; + if let Some(msg) = pending_msg_queue.peek() { + if msg.get_msg().is_init_msg() { + return; + } + } pending_msg_queue.clear(); - let msg_id = self.msg_id_counter.next(); let msg = f(msg_id); pending_msg_queue.push_msg(msg_id, msg); @@ -145,6 +147,16 @@ where self.notify(); } + pub fn can_queue_init_sync(&self) -> bool { + let pending_msg_queue = self.pending_msg_queue.lock(); + if let Some(msg) = pending_msg_queue.peek() { + if msg.get_msg().is_init_msg() { + return false; + } + } + true + } + pub fn clear(&self) { self.pending_msg_queue.lock().clear(); } @@ -341,7 +353,10 @@ where } }, }, - Err(err) => error!("Send message failed error: {}", err), + Err(err) => { + // the error might be caused by the sending message was removed from the queue. + trace!("pending message oneshot channel error: {}", err) + }, } self.notify() @@ -406,16 +421,6 @@ impl CollabSinkRunner { } } -pub struct SinkConfig { - /// `timeout` is the time to wait for the remote to ack the message. If the remote - /// does not ack the message in time, the message will be sent again. - pub send_timeout: Duration, - /// `maximum_payload_size` is the maximum size of the messages to be merged. - pub maximum_payload_size: usize, - /// `strategy` is the strategy to send the messages. - pub strategy: SinkStrategy, -} - fn calculate_timeout(payload_len: usize, default: Duration) -> Duration { match payload_len { 0..=40959 => default, @@ -426,61 +431,6 @@ fn calculate_timeout(payload_len: usize, default: Duration) -> Duration { } } -impl SinkConfig { - pub fn new() -> Self { - Self::default() - } - pub fn send_timeout(mut self, secs: u64) -> Self { - let timeout_duration = Duration::from_secs(secs); - if let SinkStrategy::FixInterval(duration) = self.strategy { - if timeout_duration < duration { - warn!("The timeout duration should greater than the fix interval duration"); - } - } - self.send_timeout = timeout_duration; - self - } - - /// `max_zip_size` is the maximum size of the messages to be merged. - pub fn with_max_payload_size(mut self, max_size: usize) -> Self { - self.maximum_payload_size = max_size; - self - } - - pub fn with_strategy(mut self, strategy: SinkStrategy) -> Self { - if let SinkStrategy::FixInterval(duration) = strategy { - if self.send_timeout < duration { - warn!("The timeout duration should greater than the fix interval duration"); - } - } - self.strategy = strategy; - self - } -} - -impl Default for SinkConfig { - fn default() -> Self { - Self { - send_timeout: Duration::from_secs(DEFAULT_SYNC_TIMEOUT), - maximum_payload_size: 1024 * 64, - strategy: SinkStrategy::ASAP, - } - } -} - -pub enum SinkStrategy { - /// Send the message as soon as possible. - ASAP, - /// Send the message in a fixed interval. - FixInterval(Duration), -} - -impl SinkStrategy { - pub fn is_fix_interval(&self) -> bool { - matches!(self, SinkStrategy::FixInterval(_)) - } -} - pub trait MsgIdCounter: Send + Sync + 'static { /// Get the next message id. The message id should be unique. fn next(&self) -> MsgId; diff --git a/libs/client-api/src/collab_sync/sink_config.rs b/libs/client-api/src/collab_sync/sink_config.rs new file mode 100644 index 00000000..32781c05 --- /dev/null +++ b/libs/client-api/src/collab_sync/sink_config.rs @@ -0,0 +1,68 @@ +use crate::collab_sync::DEFAULT_SYNC_TIMEOUT; +use std::time::Duration; +use tracing::warn; + +pub struct SinkConfig { + /// `timeout` is the time to wait for the remote to ack the message. If the remote + /// does not ack the message in time, the message will be sent again. + pub send_timeout: Duration, + /// `maximum_payload_size` is the maximum size of the messages to be merged. + pub maximum_payload_size: usize, + /// `strategy` is the strategy to send the messages. + pub strategy: SinkStrategy, +} + +impl SinkConfig { + pub fn new() -> Self { + Self::default() + } + pub fn send_timeout(mut self, secs: u64) -> Self { + let timeout_duration = Duration::from_secs(secs); + if let SinkStrategy::FixInterval(duration) = self.strategy { + if timeout_duration < duration { + warn!("The timeout duration should greater than the fix interval duration"); + } + } + self.send_timeout = timeout_duration; + self + } + + /// `max_zip_size` is the maximum size of the messages to be merged. + pub fn with_max_payload_size(mut self, max_size: usize) -> Self { + self.maximum_payload_size = max_size; + self + } + + pub fn with_strategy(mut self, strategy: SinkStrategy) -> Self { + if let SinkStrategy::FixInterval(duration) = strategy { + if self.send_timeout < duration { + warn!("The timeout duration should greater than the fix interval duration"); + } + } + self.strategy = strategy; + self + } +} + +impl Default for SinkConfig { + fn default() -> Self { + Self { + send_timeout: Duration::from_secs(DEFAULT_SYNC_TIMEOUT), + maximum_payload_size: 1024 * 64, + strategy: SinkStrategy::ASAP, + } + } +} + +pub enum SinkStrategy { + /// Send the message as soon as possible. + ASAP, + /// Send the message in a fixed interval. + FixInterval(Duration), +} + +impl SinkStrategy { + pub fn is_fix_interval(&self) -> bool { + matches!(self, SinkStrategy::FixInterval(_)) + } +} diff --git a/libs/client-api/src/collab_sync/pending_msg.rs b/libs/client-api/src/collab_sync/sink_pending_queue.rs similarity index 95% rename from libs/client-api/src/collab_sync/pending_msg.rs rename to libs/client-api/src/collab_sync/sink_pending_queue.rs index fa0f4f4a..445b26bc 100644 --- a/libs/client-api/src/collab_sync/pending_msg.rs +++ b/libs/client-api/src/collab_sync/sink_pending_queue.rs @@ -7,13 +7,13 @@ use realtime_entity::collab_msg::{CollabSinkMessage, MsgId}; use tokio::sync::oneshot; use tracing::{trace, warn}; -pub(crate) struct PendingMsgQueue { +pub(crate) struct SinkPendingQueue { #[allow(dead_code)] uid: i64, queue: BinaryHeap>, } -impl PendingMsgQueue +impl SinkPendingQueue where Msg: CollabSinkMessage, { @@ -29,7 +29,7 @@ where } } -impl Deref for PendingMsgQueue +impl Deref for SinkPendingQueue where Msg: CollabSinkMessage, { @@ -40,7 +40,7 @@ where } } -impl DerefMut for PendingMsgQueue +impl DerefMut for SinkPendingQueue where Msg: CollabSinkMessage, { diff --git a/libs/client-api/src/collab_sync/sync.rs b/libs/client-api/src/collab_sync/sync_control.rs similarity index 62% rename from libs/client-api/src/collab_sync/sync.rs rename to libs/client-api/src/collab_sync/sync_control.rs index e116727b..61d803e8 100644 --- a/libs/client-api/src/collab_sync/sync.rs +++ b/libs/client-api/src/collab_sync/sync_control.rs @@ -1,62 +1,68 @@ -use crate::collab_sync::{ - CollabSink, CollabSinkRunner, SinkConfig, SinkState, SyncError, SyncObject, -}; +use crate::collab_sync::sink_config::SinkConfig; +use crate::collab_sync::{CollabSink, CollabSinkRunner, SinkState, SyncError, SyncObject}; use crate::platform_spawn; use bytes::Bytes; use collab::core::awareness::Awareness; use collab::core::collab::MutexCollab; use collab::core::collab_state::SyncState; use collab::core::origin::CollabOrigin; +use collab::preclude::Collab; use futures_util::{SinkExt, StreamExt}; +use log::trace; use realtime_entity::collab_msg::{AckCode, CollabMessage, InitSync, ServerInit, UpdateSync}; use realtime_protocol::{handle_collab_message, ClientSyncProtocol, CollabSyncProtocol}; use realtime_protocol::{Message, MessageReader, SyncMessage}; use std::marker::PhantomData; use std::ops::Deref; +use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, Weak}; -use tokio::sync::watch; +use std::time::{Duration, Instant}; +use tokio::sync::{watch, Mutex}; use tokio_stream::wrappers::WatchStream; -use tracing::{error, span, trace, warn, Level}; +use tracing::{error, info, warn}; use yrs::encoding::read::Cursor; use yrs::updates::decoder::DecoderV1; use yrs::updates::encoder::{Encoder, EncoderV1}; pub const DEFAULT_SYNC_TIMEOUT: u64 = 4; +pub const NUMBER_OF_UPDATE_TRIGGER_INIT_SYNC: u32 = 5; -pub struct SyncQueue { +const DEBOUNCE_DURATION: Duration = Duration::from_secs(10); + +pub struct SyncControl { object: SyncObject, origin: CollabOrigin, /// The [CollabSink] is used to send the updates to the remote. It will send the current /// update periodically if the timeout is reached or it will send the next update if /// it receive previous ack from the remote. sink: Arc>, - /// The [SyncStream] will be spawned in a separate task It continuously receive + /// The [ObserveCollab] will be spawned in a separate task It continuously receive /// the updates from the remote. #[allow(dead_code)] - stream: SyncStream, - protocol: ClientSyncProtocol, + observe_collab: ObserveCollab, sync_state: Arc>, } -impl Drop for SyncQueue { +impl Drop for SyncControl { fn drop(&mut self) { trace!("Drop SyncQueue {}", self.object.object_id); } } -impl SyncQueue +impl SyncControl where E: Into + Send + Sync + 'static, Sink: SinkExt + Send + Sync + Unpin + 'static, Stream: StreamExt> + Send + Sync + Unpin + 'static, { + #[allow(clippy::too_many_arguments)] pub fn new( object: SyncObject, origin: CollabOrigin, sink: Sink, + sink_config: SinkConfig, stream: Stream, collab: Weak, - config: SinkConfig, pause: bool, ) -> Self { let protocol = ClientSyncProtocol; @@ -65,25 +71,26 @@ where let (sync_state_tx, sink_state_rx) = watch::channel(SinkState::Init); debug_assert!(origin.client_user_id().is_some()); + // Create the sink and start the sink runner. let sink = Arc::new(CollabSink::new( origin.client_user_id().unwrap_or(0), object.clone(), sink, notifier, sync_state_tx, - config, + sink_config, pause, )); - platform_spawn(CollabSinkRunner::run(Arc::downgrade(&sink), notifier_rx)); - let cloned_protocol = protocol.clone(); - let object_id = object.object_id.clone(); - let stream = SyncStream::new( + + // Create the observe collab stream. + let _cloned_protocol = protocol.clone(); + let _object_id = object.object_id.clone(); + let stream = ObserveCollab::new( origin.clone(), - object_id, + object.clone(), stream, - protocol, - collab, + collab.clone(), Arc::downgrade(&sink), ); @@ -113,8 +120,7 @@ where object, origin, sink, - stream, - protocol: cloned_protocol, + observe_collab: stream, sync_state, } } @@ -131,22 +137,8 @@ where self.sync_state.subscribe() } - pub fn init_sync(&self, awareness: &Awareness, _last_sync_at: i64) { - if let Some(payload) = doc_init_state(awareness, &self.protocol) { - self.sink.queue_init_sync(|msg_id| { - InitSync::new( - self.origin.clone(), - self.object.object_id.clone(), - self.object.collab_type.clone(), - self.object.workspace_id.clone(), - msg_id, - payload, - ) - .into() - }); - } else { - self.sink.notify(); - } + pub fn init_sync(&self, collab: &Collab) { + _init_sync(self.origin.clone(), &self.object, collab, &self.sink); } /// Remove all the messages in the sink queue @@ -168,7 +160,34 @@ fn doc_init_state(awareness: &Awareness, protocol: &P) -> } } -impl Deref for SyncQueue { +pub fn _init_sync( + origin: CollabOrigin, + sync_object: &SyncObject, + collab: &Collab, + sink: &Arc>, +) where + E: Into + Send + Sync + 'static, + Sink: SinkExt + Send + Sync + Unpin + 'static, +{ + let awareness = collab.get_awareness(); + if let Some(payload) = doc_init_state(awareness, &ClientSyncProtocol) { + sink.queue_init_sync(|msg_id| { + InitSync::new( + origin, + sync_object.object_id.clone(), + sync_object.collab_type.clone(), + sync_object.workspace_id.clone(), + msg_id, + payload, + ) + .into() + }) + } else { + sink.notify(); + } +} + +impl Deref for SyncControl { type Target = Arc>; fn deref(&self) -> &Self::Target { @@ -177,7 +196,7 @@ impl Deref for SyncQueue { } /// Use to continuously receive updates from remote. -struct SyncStream { +struct ObserveCollab { object_id: String, #[allow(dead_code)] weak_collab: Weak, @@ -185,37 +204,37 @@ struct SyncStream { phantom_stream: PhantomData, } -impl Drop for SyncStream { +impl Drop for ObserveCollab { fn drop(&mut self) { trace!("Drop SyncStream {}", self.object_id); } } -impl SyncStream +impl ObserveCollab where E: Into + Send + Sync + 'static, Sink: SinkExt + Send + Sync + Unpin + 'static, Stream: StreamExt> + Send + Sync + Unpin + 'static, { - pub fn new

( + pub fn new( origin: CollabOrigin, - object_id: String, + object: SyncObject, stream: Stream, - protocol: P, weak_collab: Weak, sink: Weak>, - ) -> Self - where - P: CollabSyncProtocol + Send + Sync + 'static, - { + ) -> Self { + let seq_num = Arc::new(AtomicU32::new(0)); + let last_init_sync = LastSyncTime::new(); + let object_id = object.object_id.clone(); let cloned_weak_collab = weak_collab.clone(); - platform_spawn(SyncStream::::spawn_doc_stream::

( + platform_spawn(ObserveCollab::::observer_collab_message( origin, - object_id.clone(), + object, stream, cloned_weak_collab, sink, - protocol, + seq_num, + last_init_sync, )); Self { object_id, @@ -226,16 +245,15 @@ where } // Spawn the stream that continuously reads the doc's updates from remote. - async fn spawn_doc_stream

( + async fn observer_collab_message( origin: CollabOrigin, - object_id: String, + object: SyncObject, mut stream: Stream, weak_collab: Weak, weak_sink: Weak>, - protocol: P, - ) where - P: CollabSyncProtocol + Send + Sync + 'static, - { + seq_num: Arc, + last_init_sync: LastSyncTime, + ) { while let Some(collab_message_result) = stream.next().await { let collab = match weak_collab.upgrade() { Some(collab) => collab, @@ -258,10 +276,14 @@ where }, }; - let span = span!(Level::TRACE, "doc_stream", object_id = %msg.object_id()); - let _enter = span.enter(); - if let Err(error) = SyncStream::::process_message::

( - &origin, &object_id, &protocol, &collab, &sink, msg, + if let Err(error) = ObserveCollab::::process_message( + &origin, + &object, + &collab, + &sink, + msg, + &seq_num, + &last_init_sync, ) .await { @@ -269,7 +291,7 @@ where // TODO(nathan): ask the client to resolve the conflict. error!( "collab:{} can not be synced because of error: {}", - object_id, error + object.object_id, error ); break; } else { @@ -280,22 +302,38 @@ where } /// Continuously handle messages from the remote doc - async fn process_message

( + async fn process_message( origin: &CollabOrigin, - object_id: &str, - protocol: &P, + object: &SyncObject, collab: &Arc, sink: &Arc>, msg: CollabMessage, - ) -> Result<(), SyncError> - where - P: CollabSyncProtocol + Send + Sync + 'static, - { + broadcast_seq_num: &Arc, + last_sync_time: &LastSyncTime, + ) -> Result<(), SyncError> { // If server return the AckCode::ApplyInternalError, which means the server can not apply the // update - if let CollabMessage::ClientAck(ack) = &msg { - if ack.code == AckCode::CannotApplyUpdate { - return Err(SyncError::CannotApplyUpdate(object_id.to_string())); + if matches!(msg, CollabMessage::ClientAck(ref ack) if ack.code == AckCode::CannotApplyUpdate) { + return Err(SyncError::CannotApplyUpdate(object.object_id.clone())); + } + + if let Some(msg_seq_num) = msg.broadcase_seq_num() { + let prev_seq_num = broadcast_seq_num.load(Ordering::SeqCst); + broadcast_seq_num.store(msg_seq_num, Ordering::SeqCst); + + // Check if the received seq_num indicates missing updates. + if msg_seq_num > prev_seq_num + NUMBER_OF_UPDATE_TRIGGER_INIT_SYNC + && sink.can_queue_init_sync() + && last_sync_time.should_sync(DEBOUNCE_DURATION).await + { + if let Some(lock_guard) = collab.try_lock() { + info!( + "collab:{} missing updates, start init sync", + object.object_id + ); + _init_sync(origin.clone(), object, &lock_guard, sink); + return Ok(()); + } } } @@ -310,31 +348,39 @@ where _ => return Ok(()), }; - trace!("start process message:{:?}", msg.msg_id()); - SyncStream::::process_payload(origin, payload, object_id, protocol, collab, sink) - .await?; + trace!( + "start process message:{:?}, len:{}", + msg.msg_id(), + msg.len() + ); + ObserveCollab::::process_payload( + origin, + payload, + &object.object_id, + collab, + sink, + broadcast_seq_num, + ) + .await?; trace!("end process message: {:?}", msg.msg_id()); Ok(()) } - async fn process_payload

( + async fn process_payload( origin: &CollabOrigin, payload: &Bytes, object_id: &str, - protocol: &P, collab: &Arc, sink: &Arc>, - ) -> Result<(), SyncError> - where - P: CollabSyncProtocol + Send + Sync + 'static, - { + _broadcast_seq_num: &Arc, + ) -> Result<(), SyncError> { let mut decoder = DecoderV1::new(Cursor::new(payload)); let reader = MessageReader::new(&mut decoder); for msg in reader { let msg = msg?; trace!(" {}", msg); let is_sync_step_1 = matches!(msg, Message::Sync(SyncMessage::SyncStep1(_))); - if let Some(payload) = handle_collab_message(origin, protocol, collab, msg)? { + if let Some(payload) = handle_collab_message(origin, &ClientSyncProtocol, collab, msg)? { if is_sync_step_1 { // flush match collab.try_lock() { @@ -356,3 +402,26 @@ where Ok(()) } } + +struct LastSyncTime { + last_sync: Mutex, +} + +impl LastSyncTime { + fn new() -> Self { + LastSyncTime { + last_sync: Mutex::new(Instant::now() - Duration::from_secs(3600)), + } + } + + async fn should_sync(&self, debounce_duration: Duration) -> bool { + let now = Instant::now(); + let mut last_sync_locked = self.last_sync.lock().await; + if now.duration_since(*last_sync_locked) > debounce_duration { + *last_sync_locked = now; + true + } else { + false + } + } +} diff --git a/libs/client-api/src/ws/client.rs b/libs/client-api/src/ws/client.rs index ea734ff9..3deed3a2 100644 --- a/libs/client-api/src/ws/client.rs +++ b/libs/client-api/src/ws/client.rs @@ -51,6 +51,7 @@ pub trait WSClientHttpSender: Send + Sync { type WeakChannel = Weak>; type ChannelByObjectId = HashMap>; +type AFRateLimiter = RateLimiter; pub type WSConnectStateReceiver = Receiver; pub(crate) type StateNotify = parking_lot::Mutex; @@ -66,8 +67,10 @@ pub struct WSClient { collab_channels: Arc>, ping: Arc>>, stop_tx: Mutex>>, - rate_limiter: - Arc>>, + rate_limiter: Arc>, + + #[cfg(debug_assertions)] + skip_realtime_message: Arc, } impl WSClient { pub fn new(config: WSClientConfig, http_sender: H) -> Self @@ -92,6 +95,9 @@ impl WSClient { ping, stop_tx: Mutex::new(None), rate_limiter: Arc::new(tokio::sync::RwLock::new(rate_limiter)), + + #[cfg(debug_assertions)] + skip_realtime_message: Default::default(), } } @@ -156,11 +162,22 @@ impl WSClient { let user_message_tx = self.user_channel.as_ref().clone(); let rate_limiter = self.rate_limiter.clone(); + + #[cfg(debug_assertions)] + let cloned_skip_realtime_message = self.skip_realtime_message.clone(); + // Receive messages from the websocket, and send them to the channels. platform_spawn(async move { while let Some(Ok(ws_msg)) = stream.next().await { match ws_msg { Message::Binary(_) => { + #[cfg(debug_assertions)] + { + if cloned_skip_realtime_message.load(std::sync::atomic::Ordering::SeqCst) { + continue; + } + } + match RealtimeMessage::try_from(&ws_msg) { Ok(msg) => { match msg { @@ -341,3 +358,18 @@ fn gen_rate_limiter( let quota = Quota::per_second(NonZeroU32::new(times_per_sec).unwrap()); RateLimiter::direct(quota) } + +#[cfg(debug_assertions)] +impl WSClient { + pub fn disable_receive_message(&self) { + self + .skip_realtime_message + .store(true, std::sync::atomic::Ordering::SeqCst); + } + + pub fn enable_receive_message(&self) { + self + .skip_realtime_message + .store(false, std::sync::atomic::Ordering::SeqCst); + } +} diff --git a/libs/realtime-entity/src/collab_msg.rs b/libs/realtime-entity/src/collab_msg.rs index 733c6c9a..db3136ee 100644 --- a/libs/realtime-entity/src/collab_msg.rs +++ b/libs/realtime-entity/src/collab_msg.rs @@ -106,6 +106,13 @@ impl CollabMessage { matches!(self, CollabMessage::ServerInitSync(_)) } + pub fn broadcase_seq_num(&self) -> Option { + match self { + CollabMessage::ServerBroadcast(data) => Some(data.seq_num), + _ => None, + } + } + pub fn type_str(&self) -> String { match self { CollabMessage::ClientInitSync(_) => "ClientInitSync".to_string(), @@ -468,14 +475,16 @@ pub struct CollabBroadcastData { /// "The payload is encoded using the `EncoderV1` with the `Message` struct. /// It can be parsed into: Message::Sync::(SyncMessage::Update(update)) payload: Bytes, + seq_num: u32, } impl CollabBroadcastData { - pub fn new(origin: CollabOrigin, object_id: String, payload: Vec) -> Self { + pub fn new(origin: CollabOrigin, object_id: String, payload: Vec, seq_num: u32) -> Self { Self { origin, object_id, payload: Bytes::from(payload), + seq_num, } } } diff --git a/libs/realtime-protocol/src/protocol.rs b/libs/realtime-protocol/src/protocol.rs index 0c3e153a..30981f33 100644 --- a/libs/realtime-protocol/src/protocol.rs +++ b/libs/realtime-protocol/src/protocol.rs @@ -47,7 +47,11 @@ pub trait CollabSyncProtocol { fn start(&self, awareness: &Awareness, encoder: &mut E) -> Result<(), Error> { let (sv, update) = { - let sv = awareness.doc().transact().state_vector(); + let sv = awareness + .doc() + .try_transact() + .map_err(|e| Error::YrsTransaction(e.to_string()))? + .state_vector(); let update = awareness.update()?; (sv, update) }; diff --git a/libs/realtime/src/collaborate/broadcast.rs b/libs/realtime/src/collaborate/broadcast.rs index 2357eed6..756ab1f5 100644 --- a/libs/realtime/src/collaborate/broadcast.rs +++ b/libs/realtime/src/collaborate/broadcast.rs @@ -1,25 +1,19 @@ -use anyhow::anyhow; -use collab::core::awareness; -use std::future::Future; -use std::iter::Take; -use std::pin::Pin; -use std::sync::Arc; -use std::time::Duration; - use crate::collaborate::sync_protocol::ServerSyncProtocol; +use collab::core::awareness; use collab::core::awareness::{Awareness, AwarenessUpdate}; use collab::core::collab::MutexCollab; use collab::core::origin::CollabOrigin; use futures_util::{SinkExt, StreamExt}; use realtime_protocol::{handle_collab_message, Error}; use realtime_protocol::{Message, MessageReader, MSG_SYNC, MSG_SYNC_UPDATE}; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; use tokio::select; use tokio::sync::broadcast::error::SendError; use tokio::sync::broadcast::{channel, Sender}; use tokio::sync::Mutex; use tokio::time::Instant; -use tokio_retry::strategy::FixedInterval; -use tokio_retry::{Action, Retry}; + use yrs::updates::decoder::DecoderV1; use yrs::updates::encoder::{Encode, Encoder, EncoderV1}; use yrs::UpdateSubscription; @@ -34,12 +28,18 @@ use yrs::encoding::write::Write; /// A broadcast can be used to propagate updates produced by yrs [yrs::Doc] and [Awareness] /// to subscribes. One broadcast can be used to propagate updates for a single document with /// object_id. +/// pub struct CollabBroadcast { object_id: String, collab: MutexCollab, sender: Sender, awareness_sub: Mutex>, + /// Keep the lifetime of the document observer subscription. The subscription will be stopped + /// when the broadcast is dropped. doc_subscription: Mutex>, + broadcast_seq_num_counter: Arc, + /// The last modified time of the document. + pub modified_at: Arc>, } impl Drop for CollabBroadcast { @@ -65,6 +65,8 @@ impl CollabBroadcast { sender, awareness_sub: Default::default(), doc_subscription: Default::default(), + broadcast_seq_num_counter: Arc::new(Default::default()), + modified_at: Arc::new(Mutex::new(Instant::now())), } } @@ -75,14 +77,23 @@ impl CollabBroadcast { // Observer the document's update and broadcast it to all subscribers. let cloned_oid = self.object_id.clone(); let broadcast_sink = self.sender.clone(); + let seq_num_counter = self.broadcast_seq_num_counter.clone(); + let modified_at = self.modified_at.clone(); + + // Observer the document's update and broadcast it to all subscribers. When one of the clients + // sends an update to the document that alters its state, the document observer will trigger + // an update event. This event is then broadcast to all connected clients. After broadcasting, all + // connected clients will receive the update and apply it to their local document state. let doc_sub = mutex_collab .get_mut_awareness() .doc_mut() .observe_update_v1(move |txn, event| { + let value = seq_num_counter.fetch_add(1, Ordering::SeqCst); + let update_len = event.update.len(); let origin = CollabOrigin::from(txn); let payload = gen_update_message(&event.update); - let msg = CollabBroadcastData::new(origin, cloned_oid.clone(), payload); + let msg = CollabBroadcastData::new(origin, cloned_oid.clone(), payload, value + 1); match broadcast_sink.send(msg.into()) { Ok(_) => trace!("observe doc update with len:{}", update_len), @@ -91,6 +102,10 @@ impl CollabBroadcast { update_len, e ), } + + if let Ok(mut modified_at) = modified_at.try_lock() { + *modified_at = Instant::now(); + } }) .unwrap(); @@ -138,7 +153,6 @@ impl CollabBroadcast { /// - `subscriber_origin`: Identifies the subscriber's origin to avoid echoing messages back. /// - `sink`: A `Sink` implementation for sending messages to the subscriber(Each connected client). /// - `stream`: A `Stream` implementation for receiving messages from the subscriber((Each connected client)). - /// - `modified_at`: A shared, mutable reference to track the last modification time of the document. /// /// # Behavior /// - [Sink] Forwards updates received from the document observer to all subscribers through 'sink', excluding the originator @@ -163,7 +177,6 @@ impl CollabBroadcast { subscriber_origin: CollabOrigin, mut sink: Sink, mut stream: Stream, - modified_at: Arc>, ) -> Subscription where Sink: SinkExt + Clone + Send + Sync + Unpin + 'static, @@ -173,11 +186,12 @@ impl CollabBroadcast { { let cloned_origin = subscriber_origin.clone(); trace!("[realtime]: new subscriber: {}", subscriber_origin); - // Receive a update from the document observer and forward the update to all - // connected subscribers using its Sink. let sink_stop_tx = { - let sink = sink.clone(); + let mut sink = sink.clone(); let (stop_tx, mut stop_rx) = tokio::sync::mpsc::channel::<()>(1); + + // the receiver will continue to receive updates from the document observer and forward the update to + // connected subscriber using its Sink. The loop will break if the stop_rx receives a message. let mut receiver = self.sender.subscribe(); tokio::spawn(async move { loop { @@ -190,13 +204,8 @@ impl CollabBroadcast { continue; } - trace!("[realtime]: broadcast collab message: {}", message); - let action = SinkCollabMessageAction { - sink: &sink, - message, - }; - - if let Err(err) = action.run().await { + trace!("[realtime]: broadcast message to client: {}", message); + if let Err(err) = sink.send(message).await { error!("fail to broadcast message:{}", err); } } @@ -209,16 +218,14 @@ impl CollabBroadcast { stop_tx }; - // Receive messages from clients and reply with the response. The message may alter the - // document that the current broadcast group is associated with. If the message alter - // the document state then the document observer will be triggered and the update will be - // broadcast to all connected subscribers. Check out the [observe_update_v1] and [sink_task] - // above. let stream_stop_tx = { let (stream_stop_tx, mut stop_rx) = tokio::sync::mpsc::channel::<()>(1); let collab = self.collab().clone(); let object_id = self.object_id.clone(); + // the stream will continue to receive messages from the client and it will stop if the stop_rx + // receives a message. If the client's message alter the document state, it will trigger the + // document observer and broadcast the update to all connected subscribers. Check out the [observe_update_v1] and [sink_task] above. tokio::spawn(async move { loop { select! { @@ -226,11 +233,9 @@ impl CollabBroadcast { result = stream.next() => { match result { Some(Ok(collab_msg)) => { + // The message is valid if it has a payload and the object_id matches the broadcast's object_id. if object_id == collab_msg.object_id() && collab_msg.payload().is_some() { handle_client_collab_message(&object_id, &mut sink, &collab_msg, &collab).await; - if let Ok(mut modified_at) = modified_at.try_lock() { - *modified_at = Instant::now(); - } } else { warn!("Invalid collab message: {:?}", collab_msg); } @@ -267,7 +272,6 @@ async fn handle_client_collab_message( match collab_msg.payload() { None => {}, Some(payload) => { - let object_id = object_id.to_string(); let mut decoder = DecoderV1::from(payload.as_ref()); let origin = collab_msg.origin().clone(); let reader = MessageReader::new(&mut decoder); @@ -279,7 +283,7 @@ async fn handle_client_collab_message( if let Some(msg_id) = collab_msg.msg_id() { match result { Ok(payload) => { - let resp = CollabAck::new(origin.clone(), object_id.clone(), msg_id) + let resp = CollabAck::new(origin.clone(), object_id.to_string(), msg_id) .with_payload(payload.unwrap_or_default()); trace!("Send response to client: {}", resp); @@ -289,7 +293,7 @@ async fn handle_client_collab_message( }, Err(err) => { error!("handle collab:{} message error:{}", object_id, err); - let resp = CollabAck::new(origin.clone(), object_id.clone(), msg_id) + let resp = CollabAck::new(origin.clone(), object_id.to_string(), msg_id) .with_code(ack_code_from_error(&err)); if let Err(err) = sink.send(resp.into()).await { @@ -382,39 +386,3 @@ fn gen_awareness_update_message( let update = awareness.update_with_clients(changed)?; Ok(update) } - -pub struct SinkCollabMessageAction<'a, Sink: Clone> { - pub sink: &'a Sink, - pub message: CollabMessage, -} - -impl<'a, Sink> SinkCollabMessageAction<'a, Sink> -where - Sink: SinkExt + Clone + Send + Sync + Unpin + 'a, -{ - pub fn run(self) -> Retry, SinkCollabMessageAction<'a, Sink>> { - let retry_strategy = FixedInterval::new(Duration::from_secs(2)).take(5); - Retry::spawn(retry_strategy, self) - } -} - -impl<'a, Sink> Action for SinkCollabMessageAction<'a, Sink> -where - Sink: SinkExt + Clone + Send + Sync + Unpin + 'a, -{ - type Future = Pin> + Send + Sync + 'a>>; - type Item = (); - type Error = RealtimeError; - - fn run(&mut self) -> Self::Future { - let mut sink = self.sink.clone(); - let message = self.message.clone(); - Box::pin(async move { - sink - .send(message) - .await - .map_err(|_err| RealtimeError::Internal(anyhow!("Sink message fail")))?; - Ok(()) - }) - } -} diff --git a/libs/realtime/src/collaborate/group_control.rs b/libs/realtime/src/collaborate/group_control.rs index ac29b998..59ecc7fe 100644 --- a/libs/realtime/src/collaborate/group_control.rs +++ b/libs/realtime/src/collaborate/group_control.rs @@ -11,9 +11,9 @@ use database::collab::CollabStorage; use futures_util::{SinkExt, StreamExt}; use realtime_entity::collab_msg::CollabMessage; use std::sync::Arc; -use tokio::sync::Mutex; + use tokio::task::spawn_blocking; -use tokio::time::Instant; + use tracing::{debug, error, event, instrument, trace}; pub struct CollabGroupControl { @@ -171,7 +171,6 @@ pub struct CollabGroup { /// broadcast. subscribers: DashMap, user_by_user_device: DashMap, - pub modified_at: Arc>, } impl Drop for CollabGroup { @@ -189,14 +188,12 @@ where collab: Arc, broadcast: CollabBroadcast, ) -> Self { - let modified_at = Arc::new(Mutex::new(Instant::now())); Self { collab_type, collab, broadcast, subscribers: Default::default(), user_by_user_device: Default::default(), - modified_at, } } @@ -294,9 +291,7 @@ where >::Error: std::error::Error + Send + Sync, E: Into + Send + Sync + 'static, { - let sub = self - .broadcast - .subscribe(subscriber_origin, sink, stream, self.modified_at.clone()); + let sub = self.broadcast.subscribe(subscriber_origin, sink, stream); // Remove the old user if it exists let user_device = user.user_device(); @@ -332,7 +327,7 @@ where /// Check if the group is active. A group is considered active if it has at least one /// subscriber or has been modified within the last 10 minutes. pub async fn is_inactive(&self) -> bool { - let modified_at = self.modified_at.lock().await; + let modified_at = self.broadcast.modified_at.lock().await; if cfg!(debug_assertions) { modified_at.elapsed().as_secs() > 60 } else { diff --git a/libs/realtime/src/collaborate/plugin.rs b/libs/realtime/src/collaborate/plugin.rs index d5abee81..e6ecb0bd 100644 --- a/libs/realtime/src/collaborate/plugin.rs +++ b/libs/realtime/src/collaborate/plugin.rs @@ -6,12 +6,12 @@ use std::fmt::Display; use crate::collaborate::CollabAccessControl; use anyhow::anyhow; -use collab::core::awareness::Awareness; + use collab::core::collab::TransactionMutExt; use collab::core::collab_plugin::EncodedCollab; use collab::core::origin::CollabOrigin; use collab::core::transaction::DocTransactionExtension; -use collab::preclude::{CollabPlugin, Doc, TransactionMut}; +use collab::preclude::{Collab, CollabPlugin, Doc, TransactionMut}; use collab_entity::CollabType; use database::collab::CollabStorage; use database_entity::dto::{ @@ -208,7 +208,7 @@ where }, } } - fn did_init(&self, _awareness: &Awareness, _object_id: &str, _last_sync_at: i64) { + fn did_init(&self, _collab: &Collab, _object_id: &str, _last_sync_at: i64) { self.edit_state.set_did_load() } diff --git a/tests/collab/edit_permission.rs b/tests/collab/edit_permission.rs index e853536d..d02da7f5 100644 --- a/tests/collab/edit_permission.rs +++ b/tests/collab/edit_permission.rs @@ -1,7 +1,8 @@ use crate::collab::util::generate_random_string; use assert_json_diff::{assert_json_eq, assert_json_include}; use client_api_test_util::{ - assert_client_collab, assert_client_collab_include_value, assert_server_collab, TestClient, + assert_client_collab_include_value_within_30_secs, assert_client_collab_within_30_secs, + assert_server_collab, TestClient, }; use collab_entity::CollabType; use database_entity::dto::{AFAccessLevel, AFRole}; @@ -37,7 +38,7 @@ async fn recv_updates_without_permission_test() { .lock() .insert("name", "AppFlowy"); client_1.wait_object_sync_complete(&object_id).await; - assert_client_collab(&mut client_2, &object_id, "name", json!({}), 3).await; + assert_client_collab_within_30_secs(&mut client_2, &object_id, "name", json!({})).await; } #[tokio::test] @@ -78,7 +79,7 @@ async fn recv_remote_updates_with_readonly_permission_test() { let expected = json!({ "name": "AppFlowy" }); - assert_client_collab(&mut client_2, &object_id, "name", expected.clone(), 10).await; + assert_client_collab_within_30_secs(&mut client_2, &object_id, "name", expected.clone()).await; assert_server_collab( &workspace_id, &mut client_1.api_client, @@ -136,7 +137,7 @@ async fn init_sync_with_readonly_permission_test() { client_2 .open_collab(&workspace_id, &object_id, collab_type.clone()) .await; - assert_client_collab_include_value(&mut client_2, &object_id, expected).await; + assert_client_collab_include_value_within_30_secs(&mut client_2, &object_id, expected).await; } #[tokio::test] @@ -173,7 +174,7 @@ async fn edit_collab_with_readonly_permission_test() { .collab .lock() .insert("name", "AppFlowy"); - assert_client_collab_include_value( + assert_client_collab_include_value_within_30_secs( &mut client_2, &object_id, json!({ @@ -230,7 +231,8 @@ async fn edit_collab_with_read_and_write_permission_test() { let expected = json!({ "name": "AppFlowy" }); - assert_client_collab_include_value(&mut client_2, &object_id, expected.clone()).await; + assert_client_collab_include_value_within_30_secs(&mut client_2, &object_id, expected.clone()) + .await; assert_server_collab( &workspace_id, @@ -280,7 +282,7 @@ async fn edit_collab_with_full_access_permission_test() { let expected = json!({ "name": "AppFlowy" }); - assert_client_collab(&mut client_2, &object_id, "name", expected.clone(), 5).await; + assert_client_collab_within_30_secs(&mut client_2, &object_id, "name", expected.clone()).await; assert_server_collab( &workspace_id, @@ -349,7 +351,7 @@ async fn edit_collab_with_full_access_then_readonly_permission() { .insert("subtitle", "Writing Rust, fun"); } - assert_client_collab_include_value( + assert_client_collab_include_value_within_30_secs( &mut client_2, &object_id, json!({ diff --git a/tests/collab/edit_workspace.rs b/tests/collab/edit_workspace.rs index fbf0ee68..2d54017e 100644 --- a/tests/collab/edit_workspace.rs +++ b/tests/collab/edit_workspace.rs @@ -23,11 +23,15 @@ async fn edit_workspace_without_permission() { .insert("name", "AppFlowy"); client_1.wait_object_sync_complete(&workspace_id).await; - assert_client_collab_include_value(&mut client_1, &workspace_id, json!({"name": "AppFlowy"})) - .await; + assert_client_collab_include_value_within_30_secs( + &mut client_1, + &workspace_id, + json!({"name": "AppFlowy"}), + ) + .await; // client 2 has not permission to read/edit the workspace - assert_client_collab_include_value(&mut client_2, &workspace_id, json!({})).await; + assert_client_collab_include_value_within_30_secs(&mut client_2, &workspace_id, json!({})).await; } #[tokio::test] @@ -52,20 +56,18 @@ async fn init_sync_workspace_with_guest_permission() { .insert("name", "AppFlowy"); client_1.wait_object_sync_complete(&workspace_id).await; - assert_client_collab( + assert_client_collab_within_30_secs( &mut client_1, &workspace_id, "name", json!({"name": "AppFlowy"}), - 3, ) .await; - assert_client_collab( + assert_client_collab_within_30_secs( &mut client_2, &workspace_id, "name", json!({"name": "AppFlowy"}), - 3, ) .await; } @@ -104,8 +106,18 @@ async fn edit_workspace_with_guest_permission() { .lock() .insert("name", "nathan"); - assert_client_collab_include_value(&mut client_1, &workspace_id, json!({"name": "zack"})).await; - assert_client_collab_include_value(&mut client_2, &workspace_id, json!({"name": "nathan"})).await; + assert_client_collab_include_value_within_30_secs( + &mut client_1, + &workspace_id, + json!({"name": "zack"}), + ) + .await; + assert_client_collab_include_value_within_30_secs( + &mut client_2, + &workspace_id, + json!({"name": "nathan"}), + ) + .await; assert_server_collab( &workspace_id, diff --git a/tests/collab/multi_devices_edit.rs b/tests/collab/multi_devices_edit.rs index 7b1b67e8..1acc85a6 100644 --- a/tests/collab/multi_devices_edit.rs +++ b/tests/collab/multi_devices_edit.rs @@ -1,3 +1,5 @@ +use crate::collab::util::generate_random_string; +use client_api::collab_sync::NUMBER_OF_UPDATE_TRIGGER_INIT_SYNC; use client_api_test_util::*; use collab_entity::CollabType; use database_entity::dto::{AFAccessLevel, QueryCollabParams}; @@ -8,7 +10,7 @@ use tokio::time::sleep; use tracing::trace; #[tokio::test] -async fn edit_collab_with_ws_reconnect_sync_test() { +async fn sync_collab_content_after_reconnect_test() { let object_id = uuid::Uuid::new_v4().to_string(); let collab_type = CollabType::Document; @@ -66,7 +68,7 @@ async fn edit_collab_with_ws_reconnect_sync_test() { } #[tokio::test] -async fn edit_collab_with_different_devices_test() { +async fn same_client_with_diff_devices_edit_same_collab_test() { let collab_type = CollabType::Document; let registered_user = generate_unique_registered_user().await; let mut client_1 = TestClient::user_with_new_device(registered_user.clone()).await; @@ -110,13 +112,80 @@ async fn edit_collab_with_different_devices_test() { let expected_json = json!({ "name": "workspace" }); - assert_client_collab(&mut client_1, &object_id, "name", expected_json.clone(), 10).await; - assert_client_collab(&mut client_2, &object_id, "name", expected_json.clone(), 10).await; + assert_client_collab_within_30_secs(&mut client_1, &object_id, "name", expected_json.clone()) + .await; + assert_client_collab_within_30_secs(&mut client_2, &object_id, "name", expected_json.clone()) + .await; +} + +#[tokio::test] +async fn same_client_with_diff_devices_edit_diff_collab_test() { + let registered_user = generate_unique_registered_user().await; + let collab_type = CollabType::Document; + let mut device_1 = TestClient::user_with_new_device(registered_user.clone()).await; + let mut device_2 = TestClient::user_with_new_device(registered_user.clone()).await; + + let workspace_id = device_1.workspace_id().await; + + // different devices create different collabs. the collab will be synced between devices + let object_id_1 = device_1 + .create_and_edit_collab(&workspace_id, collab_type.clone()) + .await; + let object_id_2 = device_2 + .create_and_edit_collab(&workspace_id, collab_type.clone()) + .await; + + // client 1 edit the collab with object_id_1 + device_1 + .collab_by_object_id + .get_mut(&object_id_1) + .unwrap() + .collab + .lock() + .insert("name", "object 1"); + device_1.wait_object_sync_complete(&object_id_1).await; + + // client 2 edit the collab with object_id_2 + device_2 + .collab_by_object_id + .get_mut(&object_id_2) + .unwrap() + .collab + .lock() + .insert("name", "object 2"); + device_2.wait_object_sync_complete(&object_id_2).await; + + // client1 open the collab with object_id_2 + device_1 + .open_collab(&workspace_id, &object_id_2, collab_type.clone()) + .await; + assert_client_collab_within_30_secs( + &mut device_1, + &object_id_2, + "name", + json!({ + "name": "object 2" + }), + ) + .await; + + // client2 open the collab with object_id_1 + device_2 + .open_collab(&workspace_id, &object_id_1, collab_type.clone()) + .await; + assert_client_collab_within_30_secs( + &mut device_2, + &object_id_1, + "name", + json!({ + "name": "object 1" + }), + ) + .await; } #[tokio::test] async fn edit_document_with_both_clients_offline_then_online_sync_test() { - let _object_id = uuid::Uuid::new_v4().to_string(); let collab_type = CollabType::Document; let mut client_1 = TestClient::new_user().await; let mut client_2 = TestClient::new_user().await; @@ -180,6 +249,121 @@ async fn edit_document_with_both_clients_offline_then_online_sync_test() { "8": "Task 8", "9": "Task 9" }); - assert_client_collab_include_value(&mut client_1, &object_id, expected_json.clone()).await; - assert_client_collab_include_value(&mut client_2, &object_id, expected_json.clone()).await; + assert_client_collab_include_value_within_30_secs( + &mut client_1, + &object_id, + expected_json.clone(), + ) + .await; + assert_client_collab_include_value_within_30_secs( + &mut client_2, + &object_id, + expected_json.clone(), + ) + .await; +} + +#[tokio::test] +async fn init_sync_when_missing_updates_test() { + let text = generate_random_string(1024); + let collab_type = CollabType::Document; + let mut client_1 = TestClient::new_user().await; + let mut client_2 = TestClient::new_user().await; + + // Create a collaborative document with client_1 and invite client_2 to collaborate. + let workspace_id = client_1.workspace_id().await; + let object_id = client_1 + .create_and_edit_collab(&workspace_id, collab_type.clone()) + .await; + client_1 + .add_client_as_collab_member( + &workspace_id, + &object_id, + &client_2, + AFAccessLevel::ReadAndWrite, + ) + .await; + + // Client_1 makes the first edit by inserting "task 1". + client_1 + .collab_by_object_id + .get_mut(&object_id) + .unwrap() + .collab + .lock() + .insert("1", "task 1"); + client_1.wait_object_sync_complete(&object_id).await; + + // Client_2 opens the collaboration, triggering an initial sync to receive "task 1". + client_2 + .open_collab(&workspace_id, &object_id, collab_type.clone()) + .await; + client_2.wait_object_sync_complete(&object_id).await; + + // Validate both clients have "task 1" after the initial sync. + assert_eq!( + client_1.get_edit_collab_json(&object_id).await, + json!({ "1": "task 1" }) + ); + assert_eq!( + client_2.get_edit_collab_json(&object_id).await, + json!({ "1": "task 1" }) + ); + + // Simulate client_2 missing updates by enabling skip_realtime_message. + client_2.ws_client.disable_receive_message(); + client_1.wait_object_sync_complete(&object_id).await; + + // Client_1 inserts "task 2", which client_2 misses due to skipping realtime messages. + for _ in 0..NUMBER_OF_UPDATE_TRIGGER_INIT_SYNC { + client_1 + .collab_by_object_id + .get_mut(&object_id) + .unwrap() + .collab + .lock() + .insert("2", text.clone()); + sleep(Duration::from_millis(300)).await; + } + client_1.wait_object_sync_complete(&object_id).await; + + client_2 + .collab_by_object_id + .get_mut(&object_id) + .unwrap() + .collab + .lock() + .insert("3", "task 3"); + client_2.wait_object_sync_complete(&object_id).await; + + // Validate client_1's view includes "task 2", and "task 3", while client_2 missed key2 and key3. + assert_client_collab_include_value_within_30_secs( + &mut client_1, + &object_id, + json!({ "1": "task 1", "2": text.clone(), "3": "task 3" }), + ) + .await; + assert_eq!( + client_2.get_edit_collab_json(&object_id).await, + json!({ "1": "task 1", "3": "task 3" }) + ); + + // client_2 resumes receiving messages + // client_1 triggers a sync that will trigger broadcast message to client_2 + client_2.ws_client.enable_receive_message(); + client_1 + .collab_by_object_id + .get_mut(&object_id) + .unwrap() + .collab + .lock() + .insert("4", "task 4"); + client_1.wait_object_sync_complete(&object_id).await; + + assert_client_collab_include_value_within_30_secs( + &mut client_2, + &object_id, + json!({ "1": "task 1", "2": text.clone(), "3": "task 3", "4": "task 4" }), + ) + .await; } diff --git a/tests/collab/single_device_edit.rs b/tests/collab/single_device_edit.rs index cfc35be4..e55cc118 100644 --- a/tests/collab/single_device_edit.rs +++ b/tests/collab/single_device_edit.rs @@ -258,7 +258,7 @@ async fn user_with_duplicate_devices_connect_edit_test() { new_client.wait_object_sync_complete(&object_id).await; // Old client shouldn't receive the new client's edit - assert_client_collab_include_value( + assert_client_collab_include_value_within_30_secs( &mut old_client, &object_id, json!({ @@ -268,7 +268,7 @@ async fn user_with_duplicate_devices_connect_edit_test() { ) .await; - assert_client_collab_include_value( + assert_client_collab_include_value_within_30_secs( &mut new_client, &object_id, json!({ @@ -342,8 +342,18 @@ async fn two_direction_peer_sync_test() { "name": "AppFlowy", "support platform": "macOS, Windows, Linux, iOS, Android" }); - assert_client_collab_include_value(&mut client_1, &object_id, expected_json.clone()).await; - assert_client_collab_include_value(&mut client_2, &object_id, expected_json.clone()).await; + assert_client_collab_include_value_within_30_secs( + &mut client_1, + &object_id, + expected_json.clone(), + ) + .await; + assert_client_collab_include_value_within_30_secs( + &mut client_2, + &object_id, + expected_json.clone(), + ) + .await; } #[tokio::test]