diff --git a/libs/client-api-test/src/test_client.rs b/libs/client-api-test/src/test_client.rs index 3e1637af..961c9a0b 100644 --- a/libs/client-api-test/src/test_client.rs +++ b/libs/client-api-test/src/test_client.rs @@ -600,6 +600,7 @@ impl TestClient { stream, Some(handler), ws_connect_state, + Some(Duration::from_secs(10)), ); let lock = collab.read().await; let collab = (*lock).borrow(); @@ -672,6 +673,7 @@ impl TestClient { stream, Some(handler), ws_connect_state, + Some(Duration::from_secs(10)), ); let lock = collab.read().await; diff --git a/libs/client-api/src/collab_sync/collab_stream.rs b/libs/client-api/src/collab_sync/collab_stream.rs index f7a1b0fa..fa4b65df 100644 --- a/libs/client-api/src/collab_sync/collab_stream.rs +++ b/libs/client-api/src/collab_sync/collab_stream.rs @@ -2,6 +2,7 @@ use std::borrow::BorrowMut; use std::marker::PhantomData; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, Weak}; +use std::time::Duration; use arc_swap::ArcSwap; use collab::core::origin::CollabOrigin; @@ -13,6 +14,8 @@ use tokio_util::sync::CancellationToken; use tracing::{error, instrument, trace, warn}; use yrs::encoding::read::Cursor; use yrs::updates::decoder::DecoderV1; +use yrs::updates::encoder::Encode; +use yrs::ReadTxn; use client_api_entity::{validate_data_for_folder, CollabType}; use collab_rt_entity::{AckCode, ClientCollabMessage, ServerCollabMessage, ServerInit, UpdateSync}; @@ -58,6 +61,7 @@ where stream: Stream, weak_collab: CollabRef, sink: Weak>, + periodic_sync_interval: Option, ) -> Self { let object_id = object.object_id.clone(); let cloned_weak_collab = weak_collab.clone() as CollabRef; @@ -65,6 +69,17 @@ where let cloned_seq_num_counter = seq_num_counter.clone(); let init_sync_cancel_token = ArcSwap::new(Arc::new(CancellationToken::new())); let arc_object = Arc::new(object); + + if let Some(interval) = periodic_sync_interval { + tracing::trace!("setting periodic sync step 1 for {}", object_id); + af_spawn(ObserveCollab::::periodic_sync_step_1( + origin.clone(), + sink.clone(), + cloned_weak_collab.clone(), + interval, + object_id.clone(), + )); + } af_spawn(ObserveCollab::::observer_collab_message( origin, arc_object, @@ -83,6 +98,43 @@ where } } + /// Periodically run sync step 1 to make sure that there are no missing updates from other clients. + async fn periodic_sync_step_1( + origin: CollabOrigin, + weak_sink: Weak>, + weak_collab: CollabRef, + interval: Duration, + object_id: String, + ) { + loop { + tokio::time::sleep(interval).await; + let sink = match weak_sink.upgrade() { + Some(sink) => sink, + None => break, + }; + + let collab = match weak_collab.upgrade() { + Some(collab) => collab, + None => break, + }; + + let sv = { + let lock = collab.read().await; + let sv = (*lock).borrow().transact().state_vector(); + sv + }; + let msg = Message::Sync(SyncMessage::SyncStep1(sv)).encode_v1(); + sink.queue_msg(|msg_id| { + ClientCollabMessage::new_update_sync(UpdateSync::new( + origin.clone(), + object_id.clone(), + msg, + msg_id, + )) + }); + } + } + // Spawn the stream that continuously reads the doc's updates from remote. async fn observer_collab_message( origin: CollabOrigin, diff --git a/libs/client-api/src/collab_sync/plugin.rs b/libs/client-api/src/collab_sync/plugin.rs index 3ead55ad..e7beea32 100644 --- a/libs/client-api/src/collab_sync/plugin.rs +++ b/libs/client-api/src/collab_sync/plugin.rs @@ -64,6 +64,7 @@ where stream: Stream, channel: Option>, mut ws_connect_state: WSConnectStateReceiver, + periodic_sync: Option, ) -> Self { let sync_queue = SyncControl::new( object.clone(), @@ -72,6 +73,7 @@ where sink_config, stream, collab.clone(), + periodic_sync, ); let mut sync_state_stream = sync_queue.subscribe_sync_state(); diff --git a/libs/client-api/src/collab_sync/sync_control.rs b/libs/client-api/src/collab_sync/sync_control.rs index 4d1b2b04..4efb328d 100644 --- a/libs/client-api/src/collab_sync/sync_control.rs +++ b/libs/client-api/src/collab_sync/sync_control.rs @@ -60,6 +60,7 @@ where sink_config: SinkConfig, stream: Stream, collab: CollabRef, + periodic_sync: Option, ) -> Self { let protocol = ClientSyncProtocol; let (notifier, notifier_rx) = watch::channel(SinkSignal::Proceed); @@ -86,6 +87,7 @@ where stream, collab.clone(), Arc::downgrade(&sink), + periodic_sync, ); Self { diff --git a/services/appflowy-collaborate/src/group/protocol.rs b/services/appflowy-collaborate/src/group/protocol.rs index 73fa4763..f4f7cc0a 100644 --- a/services/appflowy-collaborate/src/group/protocol.rs +++ b/services/appflowy-collaborate/src/group/protocol.rs @@ -1,13 +1,12 @@ use collab::core::awareness::Awareness; use collab::core::collab::{TransactionExt, TransactionMutExt}; use collab::core::origin::CollabOrigin; +use yrs::updates::encoder::{Encode, Encoder, EncoderV1}; +use yrs::{ReadTxn, StateVector, Transact, Update}; use collab_rt_protocol::CollabSyncProtocol; use collab_rt_protocol::{CustomMessage, Message, RTProtocolError, SyncMessage}; -use yrs::updates::encoder::{Encode, Encoder, EncoderV1}; -use yrs::{ReadTxn, StateVector, Transact, Update}; - #[derive(Clone)] pub struct ServerSyncProtocol; impl CollabSyncProtocol for ServerSyncProtocol { @@ -30,11 +29,12 @@ impl CollabSyncProtocol for ServerSyncProtocol { })?; // Retrieve the latest document state from the client after they return online from offline editing. - let server_step1_update = txn.state_vector(); - let mut encoder = EncoderV1::new(); Message::Sync(SyncMessage::SyncStep2(client_step2_update)).encode(&mut encoder); - Message::Sync(SyncMessage::SyncStep1(server_step1_update)).encode(&mut encoder); + + //FIXME: this should never happen as response to sync step 1 from the client, but rather be + // send when a connection is established + Message::Sync(SyncMessage::SyncStep1(txn.state_vector())).encode(&mut encoder); Ok(Some(encoder.to_vec())) } diff --git a/services/appflowy-collaborate/src/group/state.rs b/services/appflowy-collaborate/src/group/state.rs index 4ac4dddc..887026b3 100644 --- a/services/appflowy-collaborate/src/group/state.rs +++ b/services/appflowy-collaborate/src/group/state.rs @@ -135,16 +135,29 @@ impl GroupManagementState { object_id: object_id.to_string(), }; - self - .editing_by_user - .entry(user.clone()) - .or_default() - .insert(editing); + let entry = self.editing_by_user.entry(user.clone()); + match entry { + dashmap::mapref::entry::Entry::Occupied(_) => {}, + dashmap::mapref::entry::Entry::Vacant(_) => { + self + .metrics_calculate + .num_of_editing_users + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + }, + } + + entry.or_default().insert(editing); Ok(()) } pub(crate) async fn remove_user(&self, user: &RealtimeUser) { let entry = self.editing_by_user.remove(user); + if entry.is_some() { + self + .metrics_calculate + .num_of_editing_users + .fetch_sub(1, std::sync::atomic::Ordering::Relaxed); + } if let Some(editing_objects) = entry.map(|(_, e)| e) { for editing in editing_objects { match self.group_by_object_id.try_get(&editing.object_id) { diff --git a/services/appflowy-collaborate/src/metrics.rs b/services/appflowy-collaborate/src/metrics.rs index 36cb3e46..a1b73c48 100644 --- a/services/appflowy-collaborate/src/metrics.rs +++ b/services/appflowy-collaborate/src/metrics.rs @@ -12,6 +12,7 @@ pub struct CollabRealtimeMetrics { total_success_get_encode_collab_from_redis: Gauge, total_attempt_get_encode_collab_from_redis: Gauge, opening_collab_count: Gauge, + num_of_editing_users: Gauge, /// The number of apply update apply_update_count: Gauge, /// The number of apply update failed @@ -27,6 +28,7 @@ impl CollabRealtimeMetrics { total_success_get_encode_collab_from_redis: Gauge::default(), total_attempt_get_encode_collab_from_redis: Gauge::default(), opening_collab_count: Gauge::default(), + num_of_editing_users: Gauge::default(), apply_update_count: Default::default(), apply_update_failed_count: Default::default(), acquire_collab_lock_count: Default::default(), @@ -57,6 +59,11 @@ impl CollabRealtimeMetrics { "number of opening collabs", metrics.opening_collab_count.clone(), ); + realtime_registry.register( + "editing_users_count", + "number of editing users", + metrics.num_of_editing_users.clone(), + ); realtime_registry.register( "apply_update_count", "number of apply update", @@ -91,6 +98,7 @@ pub(crate) struct CollabMetricsCalculate { pub(crate) apply_update_count: Arc, pub(crate) apply_update_failed_count: Arc, pub(crate) num_of_active_collab: Arc, + pub(crate) num_of_editing_users: Arc, } pub(crate) fn spawn_metrics( @@ -115,6 +123,13 @@ pub(crate) fn spawn_metrics( .load(std::sync::atomic::Ordering::Relaxed), ); + // editing users + metrics.num_of_editing_users.set( + metrics_calculation + .num_of_editing_users + .load(std::sync::atomic::Ordering::Relaxed), + ); + // connect user metrics.connected_users.set( metrics_calculation diff --git a/services/appflowy-collaborate/src/rt_server.rs b/services/appflowy-collaborate/src/rt_server.rs index 06d81c69..56a4ff17 100644 --- a/services/appflowy-collaborate/src/rt_server.rs +++ b/services/appflowy-collaborate/src/rt_server.rs @@ -23,6 +23,7 @@ use crate::group::cmd::{GroupCommand, GroupCommandRunner, GroupCommandSender}; use crate::group::manager::GroupManager; use crate::indexer::IndexerProvider; use crate::metrics::CollabMetricsCalculate; +use crate::rt_server::collaboration_runtime::COLLAB_RUNTIME; use crate::state::RedisConnectionManager; use crate::{spawn_metrics, CollabRealtimeMetrics, RealtimeClientWebsocketSink}; @@ -199,7 +200,7 @@ where let object_id = entry.key().clone(); let clone_notify = notify.clone(); - tokio::spawn(runner.run(object_id, clone_notify)); + COLLAB_RUNTIME.spawn(runner.run(object_id, clone_notify)); entry.insert(new_sender.clone()); // wait for the runner to be ready to handle the message. @@ -287,9 +288,7 @@ fn spawn_period_check_inactive_group( /// multiple threads, we've incorporated a multi-thread feature. /// /// When appflowy-collaborate is deployed as a standalone service, we can use tokio multi-thread. -#[cfg(feature = "collab-rt-multi-thread")] mod collaboration_runtime { - use std::future::Future; use std::io; use lazy_static::lazy_static; @@ -307,12 +306,4 @@ mod collaboration_runtime { .enable_time() .build() } - - pub(crate) fn spawn(future: T) -> tokio::task::JoinHandle - where - T: Future + Send + 'static, - T::Output: Send + 'static, - { - COLLAB_RUNTIME.spawn(future) - } } diff --git a/tests/collab/missing_update_test.rs b/tests/collab/missing_update_test.rs index d7f30356..545c8ec9 100644 --- a/tests/collab/missing_update_test.rs +++ b/tests/collab/missing_update_test.rs @@ -9,7 +9,7 @@ use database_entity::dto::AFAccessLevel; #[tokio::test] async fn client_apply_update_find_missing_update_test() { - let (mut client_1, mut client_2, object_id, mut expected_json) = make_clients().await; + let (_client_1, mut client_2, object_id, mut expected_json) = make_clients().await; // "title" => "hello world" is not delivered to client_2 and is considered a missing update client_2.ws_client.enable_receive_message(); { @@ -23,23 +23,8 @@ async fn client_apply_update_find_missing_update_test() { let collab = (*lock).borrow_mut(); collab.insert("content", "hello world"); } - { - // in order to detect missing update, we need to make another edit on client_1 - when this - // update is received by client_2 it will figure out that it was also missing - // "title" => "hello world" - let mut lock = client_1 - .collabs - .get_mut(&object_id) - .unwrap() - .collab - .write() - .await; - let collab = (*lock).borrow_mut(); - collab.insert("ping", "pong"); - } expected_json["content"] = Value::String("hello world".to_string()); - expected_json["ping"] = Value::String("pong".to_string()); // the collab ping will trigger a init sync with reason InitSyncReason::MissUpdates after a period of time assert_client_collab_include_value(&mut client_2, &object_id, expected_json) @@ -49,24 +34,9 @@ async fn client_apply_update_find_missing_update_test() { #[tokio::test] async fn client_ping_find_missing_update_test() { - let (mut client_1, mut client_2, object_id, mut expected_json) = make_clients().await; + let (_client_1, mut client_2, object_id, expected_json) = make_clients().await; // "title" => "hello world" is not delivered to client_2 and is considered a missing update client_2.ws_client.enable_receive_message(); - { - // in order to detect missing update, we need to make another edit on client_1 - when this - // update is received by client_2 it will figure out that it was also missing - // "title" => "hello world" - let mut lock = client_1 - .collabs - .get_mut(&object_id) - .unwrap() - .collab - .write() - .await; - let collab = (*lock).borrow_mut(); - collab.insert("ping", "pong"); - } - expected_json["ping"] = Value::String("pong".to_string()); // the collab ping will trigger a init sync with reason InitSyncReason::MissUpdates after a period of time assert_client_collab_include_value(&mut client_2, &object_id, expected_json)