chore: enable setting up periodic sync step 1 sends from sync plugin (#738)

* chore: enable setting up periodic sync step 1 sends from sync plugin

* chore: rollback to server sending sync step 1 to response to the client

* chore: increase sync step 1 interval in tests
This commit is contained in:
Bartosz Sypytkowski 2024-08-27 05:55:27 +02:00 committed by GitHub
parent cbb9adc29d
commit 15b2e81579
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 66 additions and 38 deletions

View File

@ -600,6 +600,7 @@ impl TestClient {
stream,
Some(handler),
ws_connect_state,
Some(Duration::from_secs(10)),
);
let lock = collab.read().await;
let collab = (*lock).borrow();
@ -672,6 +673,7 @@ impl TestClient {
stream,
Some(handler),
ws_connect_state,
Some(Duration::from_secs(10)),
);
let lock = collab.read().await;

View File

@ -2,6 +2,7 @@ use std::borrow::BorrowMut;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Weak};
use std::time::Duration;
use arc_swap::ArcSwap;
use collab::core::origin::CollabOrigin;
@ -13,6 +14,8 @@ use tokio_util::sync::CancellationToken;
use tracing::{error, instrument, trace, warn};
use yrs::encoding::read::Cursor;
use yrs::updates::decoder::DecoderV1;
use yrs::updates::encoder::Encode;
use yrs::ReadTxn;
use client_api_entity::{validate_data_for_folder, CollabType};
use collab_rt_entity::{AckCode, ClientCollabMessage, ServerCollabMessage, ServerInit, UpdateSync};
@ -58,6 +61,7 @@ where
stream: Stream,
weak_collab: CollabRef,
sink: Weak<CollabSink<Sink>>,
periodic_sync_interval: Option<Duration>,
) -> Self {
let object_id = object.object_id.clone();
let cloned_weak_collab = weak_collab.clone() as CollabRef;
@ -65,6 +69,17 @@ where
let cloned_seq_num_counter = seq_num_counter.clone();
let init_sync_cancel_token = ArcSwap::new(Arc::new(CancellationToken::new()));
let arc_object = Arc::new(object);
if let Some(interval) = periodic_sync_interval {
tracing::trace!("setting periodic sync step 1 for {}", object_id);
af_spawn(ObserveCollab::<Sink, Stream>::periodic_sync_step_1(
origin.clone(),
sink.clone(),
cloned_weak_collab.clone(),
interval,
object_id.clone(),
));
}
af_spawn(ObserveCollab::<Sink, Stream>::observer_collab_message(
origin,
arc_object,
@ -83,6 +98,43 @@ where
}
}
/// Periodically run sync step 1 to make sure that there are no missing updates from other clients.
async fn periodic_sync_step_1(
origin: CollabOrigin,
weak_sink: Weak<CollabSink<Sink>>,
weak_collab: CollabRef,
interval: Duration,
object_id: String,
) {
loop {
tokio::time::sleep(interval).await;
let sink = match weak_sink.upgrade() {
Some(sink) => sink,
None => break,
};
let collab = match weak_collab.upgrade() {
Some(collab) => collab,
None => break,
};
let sv = {
let lock = collab.read().await;
let sv = (*lock).borrow().transact().state_vector();
sv
};
let msg = Message::Sync(SyncMessage::SyncStep1(sv)).encode_v1();
sink.queue_msg(|msg_id| {
ClientCollabMessage::new_update_sync(UpdateSync::new(
origin.clone(),
object_id.clone(),
msg,
msg_id,
))
});
}
}
// Spawn the stream that continuously reads the doc's updates from remote.
async fn observer_collab_message(
origin: CollabOrigin,

View File

@ -64,6 +64,7 @@ where
stream: Stream,
channel: Option<Arc<Channel>>,
mut ws_connect_state: WSConnectStateReceiver,
periodic_sync: Option<Duration>,
) -> Self {
let sync_queue = SyncControl::new(
object.clone(),
@ -72,6 +73,7 @@ where
sink_config,
stream,
collab.clone(),
periodic_sync,
);
let mut sync_state_stream = sync_queue.subscribe_sync_state();

View File

@ -60,6 +60,7 @@ where
sink_config: SinkConfig,
stream: Stream,
collab: CollabRef,
periodic_sync: Option<Duration>,
) -> Self {
let protocol = ClientSyncProtocol;
let (notifier, notifier_rx) = watch::channel(SinkSignal::Proceed);
@ -86,6 +87,7 @@ where
stream,
collab.clone(),
Arc::downgrade(&sink),
periodic_sync,
);
Self {

View File

@ -1,13 +1,12 @@
use collab::core::awareness::Awareness;
use collab::core::collab::{TransactionExt, TransactionMutExt};
use collab::core::origin::CollabOrigin;
use yrs::updates::encoder::{Encode, Encoder, EncoderV1};
use yrs::{ReadTxn, StateVector, Transact, Update};
use collab_rt_protocol::CollabSyncProtocol;
use collab_rt_protocol::{CustomMessage, Message, RTProtocolError, SyncMessage};
use yrs::updates::encoder::{Encode, Encoder, EncoderV1};
use yrs::{ReadTxn, StateVector, Transact, Update};
#[derive(Clone)]
pub struct ServerSyncProtocol;
impl CollabSyncProtocol for ServerSyncProtocol {
@ -30,11 +29,12 @@ impl CollabSyncProtocol for ServerSyncProtocol {
})?;
// Retrieve the latest document state from the client after they return online from offline editing.
let server_step1_update = txn.state_vector();
let mut encoder = EncoderV1::new();
Message::Sync(SyncMessage::SyncStep2(client_step2_update)).encode(&mut encoder);
Message::Sync(SyncMessage::SyncStep1(server_step1_update)).encode(&mut encoder);
//FIXME: this should never happen as response to sync step 1 from the client, but rather be
// send when a connection is established
Message::Sync(SyncMessage::SyncStep1(txn.state_vector())).encode(&mut encoder);
Ok(Some(encoder.to_vec()))
}

View File

@ -9,7 +9,7 @@ use database_entity::dto::AFAccessLevel;
#[tokio::test]
async fn client_apply_update_find_missing_update_test() {
let (mut client_1, mut client_2, object_id, mut expected_json) = make_clients().await;
let (_client_1, mut client_2, object_id, mut expected_json) = make_clients().await;
// "title" => "hello world" is not delivered to client_2 and is considered a missing update
client_2.ws_client.enable_receive_message();
{
@ -23,23 +23,8 @@ async fn client_apply_update_find_missing_update_test() {
let collab = (*lock).borrow_mut();
collab.insert("content", "hello world");
}
{
// in order to detect missing update, we need to make another edit on client_1 - when this
// update is received by client_2 it will figure out that it was also missing
// "title" => "hello world"
let mut lock = client_1
.collabs
.get_mut(&object_id)
.unwrap()
.collab
.write()
.await;
let collab = (*lock).borrow_mut();
collab.insert("ping", "pong");
}
expected_json["content"] = Value::String("hello world".to_string());
expected_json["ping"] = Value::String("pong".to_string());
// the collab ping will trigger a init sync with reason InitSyncReason::MissUpdates after a period of time
assert_client_collab_include_value(&mut client_2, &object_id, expected_json)
@ -49,24 +34,9 @@ async fn client_apply_update_find_missing_update_test() {
#[tokio::test]
async fn client_ping_find_missing_update_test() {
let (mut client_1, mut client_2, object_id, mut expected_json) = make_clients().await;
let (_client_1, mut client_2, object_id, expected_json) = make_clients().await;
// "title" => "hello world" is not delivered to client_2 and is considered a missing update
client_2.ws_client.enable_receive_message();
{
// in order to detect missing update, we need to make another edit on client_1 - when this
// update is received by client_2 it will figure out that it was also missing
// "title" => "hello world"
let mut lock = client_1
.collabs
.get_mut(&object_id)
.unwrap()
.collab
.write()
.await;
let collab = (*lock).borrow_mut();
collab.insert("ping", "pong");
}
expected_json["ping"] = Value::String("pong".to_string());
// the collab ping will trigger a init sync with reason InitSyncReason::MissUpdates after a period of time
assert_client_collab_include_value(&mut client_2, &object_id, expected_json)