diff --git a/libs/client-api-test/src/test_client.rs b/libs/client-api-test/src/test_client.rs index 3e1637af..961c9a0b 100644 --- a/libs/client-api-test/src/test_client.rs +++ b/libs/client-api-test/src/test_client.rs @@ -600,6 +600,7 @@ impl TestClient { stream, Some(handler), ws_connect_state, + Some(Duration::from_secs(10)), ); let lock = collab.read().await; let collab = (*lock).borrow(); @@ -672,6 +673,7 @@ impl TestClient { stream, Some(handler), ws_connect_state, + Some(Duration::from_secs(10)), ); let lock = collab.read().await; diff --git a/libs/client-api/src/collab_sync/collab_stream.rs b/libs/client-api/src/collab_sync/collab_stream.rs index f7a1b0fa..fa4b65df 100644 --- a/libs/client-api/src/collab_sync/collab_stream.rs +++ b/libs/client-api/src/collab_sync/collab_stream.rs @@ -2,6 +2,7 @@ use std::borrow::BorrowMut; use std::marker::PhantomData; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, Weak}; +use std::time::Duration; use arc_swap::ArcSwap; use collab::core::origin::CollabOrigin; @@ -13,6 +14,8 @@ use tokio_util::sync::CancellationToken; use tracing::{error, instrument, trace, warn}; use yrs::encoding::read::Cursor; use yrs::updates::decoder::DecoderV1; +use yrs::updates::encoder::Encode; +use yrs::ReadTxn; use client_api_entity::{validate_data_for_folder, CollabType}; use collab_rt_entity::{AckCode, ClientCollabMessage, ServerCollabMessage, ServerInit, UpdateSync}; @@ -58,6 +61,7 @@ where stream: Stream, weak_collab: CollabRef, sink: Weak>, + periodic_sync_interval: Option, ) -> Self { let object_id = object.object_id.clone(); let cloned_weak_collab = weak_collab.clone() as CollabRef; @@ -65,6 +69,17 @@ where let cloned_seq_num_counter = seq_num_counter.clone(); let init_sync_cancel_token = ArcSwap::new(Arc::new(CancellationToken::new())); let arc_object = Arc::new(object); + + if let Some(interval) = periodic_sync_interval { + tracing::trace!("setting periodic sync step 1 for {}", object_id); + af_spawn(ObserveCollab::::periodic_sync_step_1( + origin.clone(), + sink.clone(), + cloned_weak_collab.clone(), + interval, + object_id.clone(), + )); + } af_spawn(ObserveCollab::::observer_collab_message( origin, arc_object, @@ -83,6 +98,43 @@ where } } + /// Periodically run sync step 1 to make sure that there are no missing updates from other clients. + async fn periodic_sync_step_1( + origin: CollabOrigin, + weak_sink: Weak>, + weak_collab: CollabRef, + interval: Duration, + object_id: String, + ) { + loop { + tokio::time::sleep(interval).await; + let sink = match weak_sink.upgrade() { + Some(sink) => sink, + None => break, + }; + + let collab = match weak_collab.upgrade() { + Some(collab) => collab, + None => break, + }; + + let sv = { + let lock = collab.read().await; + let sv = (*lock).borrow().transact().state_vector(); + sv + }; + let msg = Message::Sync(SyncMessage::SyncStep1(sv)).encode_v1(); + sink.queue_msg(|msg_id| { + ClientCollabMessage::new_update_sync(UpdateSync::new( + origin.clone(), + object_id.clone(), + msg, + msg_id, + )) + }); + } + } + // Spawn the stream that continuously reads the doc's updates from remote. async fn observer_collab_message( origin: CollabOrigin, diff --git a/libs/client-api/src/collab_sync/plugin.rs b/libs/client-api/src/collab_sync/plugin.rs index 3ead55ad..e7beea32 100644 --- a/libs/client-api/src/collab_sync/plugin.rs +++ b/libs/client-api/src/collab_sync/plugin.rs @@ -64,6 +64,7 @@ where stream: Stream, channel: Option>, mut ws_connect_state: WSConnectStateReceiver, + periodic_sync: Option, ) -> Self { let sync_queue = SyncControl::new( object.clone(), @@ -72,6 +73,7 @@ where sink_config, stream, collab.clone(), + periodic_sync, ); let mut sync_state_stream = sync_queue.subscribe_sync_state(); diff --git a/libs/client-api/src/collab_sync/sync_control.rs b/libs/client-api/src/collab_sync/sync_control.rs index 4d1b2b04..4efb328d 100644 --- a/libs/client-api/src/collab_sync/sync_control.rs +++ b/libs/client-api/src/collab_sync/sync_control.rs @@ -60,6 +60,7 @@ where sink_config: SinkConfig, stream: Stream, collab: CollabRef, + periodic_sync: Option, ) -> Self { let protocol = ClientSyncProtocol; let (notifier, notifier_rx) = watch::channel(SinkSignal::Proceed); @@ -86,6 +87,7 @@ where stream, collab.clone(), Arc::downgrade(&sink), + periodic_sync, ); Self { diff --git a/services/appflowy-collaborate/src/group/protocol.rs b/services/appflowy-collaborate/src/group/protocol.rs index 73fa4763..f4f7cc0a 100644 --- a/services/appflowy-collaborate/src/group/protocol.rs +++ b/services/appflowy-collaborate/src/group/protocol.rs @@ -1,13 +1,12 @@ use collab::core::awareness::Awareness; use collab::core::collab::{TransactionExt, TransactionMutExt}; use collab::core::origin::CollabOrigin; +use yrs::updates::encoder::{Encode, Encoder, EncoderV1}; +use yrs::{ReadTxn, StateVector, Transact, Update}; use collab_rt_protocol::CollabSyncProtocol; use collab_rt_protocol::{CustomMessage, Message, RTProtocolError, SyncMessage}; -use yrs::updates::encoder::{Encode, Encoder, EncoderV1}; -use yrs::{ReadTxn, StateVector, Transact, Update}; - #[derive(Clone)] pub struct ServerSyncProtocol; impl CollabSyncProtocol for ServerSyncProtocol { @@ -30,11 +29,12 @@ impl CollabSyncProtocol for ServerSyncProtocol { })?; // Retrieve the latest document state from the client after they return online from offline editing. - let server_step1_update = txn.state_vector(); - let mut encoder = EncoderV1::new(); Message::Sync(SyncMessage::SyncStep2(client_step2_update)).encode(&mut encoder); - Message::Sync(SyncMessage::SyncStep1(server_step1_update)).encode(&mut encoder); + + //FIXME: this should never happen as response to sync step 1 from the client, but rather be + // send when a connection is established + Message::Sync(SyncMessage::SyncStep1(txn.state_vector())).encode(&mut encoder); Ok(Some(encoder.to_vec())) } diff --git a/tests/collab/missing_update_test.rs b/tests/collab/missing_update_test.rs index d7f30356..545c8ec9 100644 --- a/tests/collab/missing_update_test.rs +++ b/tests/collab/missing_update_test.rs @@ -9,7 +9,7 @@ use database_entity::dto::AFAccessLevel; #[tokio::test] async fn client_apply_update_find_missing_update_test() { - let (mut client_1, mut client_2, object_id, mut expected_json) = make_clients().await; + let (_client_1, mut client_2, object_id, mut expected_json) = make_clients().await; // "title" => "hello world" is not delivered to client_2 and is considered a missing update client_2.ws_client.enable_receive_message(); { @@ -23,23 +23,8 @@ async fn client_apply_update_find_missing_update_test() { let collab = (*lock).borrow_mut(); collab.insert("content", "hello world"); } - { - // in order to detect missing update, we need to make another edit on client_1 - when this - // update is received by client_2 it will figure out that it was also missing - // "title" => "hello world" - let mut lock = client_1 - .collabs - .get_mut(&object_id) - .unwrap() - .collab - .write() - .await; - let collab = (*lock).borrow_mut(); - collab.insert("ping", "pong"); - } expected_json["content"] = Value::String("hello world".to_string()); - expected_json["ping"] = Value::String("pong".to_string()); // the collab ping will trigger a init sync with reason InitSyncReason::MissUpdates after a period of time assert_client_collab_include_value(&mut client_2, &object_id, expected_json) @@ -49,24 +34,9 @@ async fn client_apply_update_find_missing_update_test() { #[tokio::test] async fn client_ping_find_missing_update_test() { - let (mut client_1, mut client_2, object_id, mut expected_json) = make_clients().await; + let (_client_1, mut client_2, object_id, expected_json) = make_clients().await; // "title" => "hello world" is not delivered to client_2 and is considered a missing update client_2.ws_client.enable_receive_message(); - { - // in order to detect missing update, we need to make another edit on client_1 - when this - // update is received by client_2 it will figure out that it was also missing - // "title" => "hello world" - let mut lock = client_1 - .collabs - .get_mut(&object_id) - .unwrap() - .collab - .write() - .await; - let collab = (*lock).borrow_mut(); - collab.insert("ping", "pong"); - } - expected_json["ping"] = Value::String("pong".to_string()); // the collab ping will trigger a init sync with reason InitSyncReason::MissUpdates after a period of time assert_client_collab_include_value(&mut client_2, &object_id, expected_json)