From cccdf76116f41631daa5533b56238aec323684de Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Wed, 28 Aug 2024 14:46:55 +0800 Subject: [PATCH] chore: Update client protocol sync interval (#761) * chore: separate server/client miss update * chore: short pull timeout * chore: short tick interval * chore: fix clippy --- .../src/collab_sync/collab_stream.rs | 22 +++--- .../src/collab_sync/sync_control.rs | 71 +++++++++---------- libs/client-api/src/ws/msg_queue.rs | 8 +-- libs/collab-rt-protocol/src/protocol.rs | 6 +- 4 files changed, 52 insertions(+), 55 deletions(-) diff --git a/libs/client-api/src/collab_sync/collab_stream.rs b/libs/client-api/src/collab_sync/collab_stream.rs index cc7e0828..9907f3c2 100644 --- a/libs/client-api/src/collab_sync/collab_stream.rs +++ b/libs/client-api/src/collab_sync/collab_stream.rs @@ -191,15 +191,20 @@ where let cloned_object = object.clone(); let collab = collab.clone(); let sink = sink.clone(); + let sync_reason = match state_vector_v1 { + None => SyncReason::ClientMissUpdates { reason }, + Some(sv) => SyncReason::ServerMissUpdates { + state_vector_v1: sv, + reason, + }, + }; tokio::spawn(async move { select! { _ = new_cancel_token.cancelled() => { - if cfg!(feature = "sync_verbose_log") { - trace!("{} receive cancel signal, cancel pull missing updates", cloned_object.object_id); - } + trace!("{} cancel pull missing updates", cloned_object.object_id); }, - _ = tokio::time::sleep(tokio::time::Duration::from_secs(3)) => { - Self::pull_missing_updates(&cloned_origin, &cloned_object, &collab, &sink, state_vector_v1, reason) + _ = tokio::time::sleep(tokio::time::Duration::from_secs(1)) => { + Self::pull_missing_updates(&cloned_origin, &cloned_object, &collab, &sink, sync_reason) .await; } } @@ -289,14 +294,9 @@ where object: &SyncObject, collab: &Arc + Send + Sync + 'static>>, sink: &Arc>, - state_vector_v1: Option>, - reason: MissUpdateReason, + reason: SyncReason, ) { let lock = collab.read().await; - let reason = SyncReason::MissUpdates { - state_vector_v1, - reason, - }; if let Err(err) = start_sync(origin.clone(), object, (*lock).borrow(), sink, reason) { error!("Error while start sync: {}", err); } diff --git a/libs/client-api/src/collab_sync/sync_control.rs b/libs/client-api/src/collab_sync/sync_control.rs index 4525cc2e..909d59f6 100644 --- a/libs/client-api/src/collab_sync/sync_control.rs +++ b/libs/client-api/src/collab_sync/sync_control.rs @@ -135,8 +135,11 @@ where pub enum SyncReason { CollabInitialize, - MissUpdates { - state_vector_v1: Option>, + ServerMissUpdates { + state_vector_v1: Vec, + reason: MissUpdateReason, + }, + ClientMissUpdates { reason: MissUpdateReason, }, ServerCannotApplyUpdate, @@ -147,7 +150,8 @@ impl Display for SyncReason { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { SyncReason::CollabInitialize => write!(f, "CollabInitialize"), - SyncReason::MissUpdates { reason, .. } => write!(f, "MissUpdates: {}", reason), + SyncReason::ServerMissUpdates { reason, .. } => write!(f, "ServerMissUpdates: {}", reason), + SyncReason::ClientMissUpdates { reason } => write!(f, "ClientMissUpdates: {}", reason), SyncReason::ServerCannotApplyUpdate => write!(f, "ServerCannotApplyUpdate"), SyncReason::NetworkResume => write!(f, "NetworkResume"), } @@ -186,46 +190,37 @@ where E: Into + Send + Sync + 'static, Sink: SinkExt, Error = E> + Send + Sync + Unpin + 'static, { - if !sink.should_queue_init_sync() { - return Ok(false); - } - if let Err(err) = sync_object.collab_type.validate_require_data(collab) { - error!("start init sync :{}:{}", sync_object.object_id, err); return Err(SyncError::Internal(err)); } match reason { - SyncReason::MissUpdates { + SyncReason::ClientMissUpdates { reason } => { + if !sink.should_queue_init_sync() { + return Ok(false); + } + + trace!("🔥{} start sync, reason:{}", &sync_object.object_id, reason); + let awareness = collab.get_awareness(); + let payload = gen_sync_state(awareness, &ClientSyncProtocol)?; + sink.queue_init_sync(|msg_id| { + let init_sync = InitSync::new( + origin, + sync_object.object_id.clone(), + sync_object.collab_type.clone(), + sync_object.workspace_id.clone(), + msg_id, + payload, + ); + ClientCollabMessage::new_init_sync(init_sync) + }); + }, + SyncReason::ServerMissUpdates { state_vector_v1, reason, - } => match state_vector_v1.and_then(|sv| StateVector::decode_v1(&sv).ok()) { - None => { - trace!( - "🔥{} start init sync, reason:{}", - &sync_object.object_id, - reason - ); - let awareness = collab.get_awareness(); - let payload = gen_sync_state(awareness, &ClientSyncProtocol)?; - sink.queue_init_sync(|msg_id| { - let init_sync = InitSync::new( - origin, - sync_object.object_id.clone(), - sync_object.collab_type.clone(), - sync_object.workspace_id.clone(), - msg_id, - payload, - ); - ClientCollabMessage::new_init_sync(init_sync) - }); - }, - Some(sv) => { - trace!( - "🔥{} start init sync with state vector, reason:{}", - &sync_object.object_id, - reason - ); + } => match StateVector::decode_v1(&state_vector_v1) { + Ok(sv) => { + trace!("🔥{} start sync, reason:{}", &sync_object.object_id, reason); let update = gen_missing_updates(collab, sv)?; sink.queue_msg(|msg_id| { let update_sync = UpdateSync::new( @@ -237,18 +232,18 @@ where ClientCollabMessage::new_update_sync(update_sync) }); }, + Err(err) => error!("fail to decode server state vector: {}", err), }, SyncReason::CollabInitialize | SyncReason::ServerCannotApplyUpdate | SyncReason::NetworkResume => { trace!( - "🔥{} start init sync, reason: {}", + "🔥{} start sync, reason: {}", &sync_object.object_id, reason ); let awareness = collab.get_awareness(); let payload = gen_sync_state(awareness, &ClientSyncProtocol)?; - sink.queue_init_sync(|msg_id| { let init_sync = InitSync::new( origin, diff --git a/libs/client-api/src/ws/msg_queue.rs b/libs/client-api/src/ws/msg_queue.rs index 81cec527..7394589f 100644 --- a/libs/client-api/src/ws/msg_queue.rs +++ b/libs/client-api/src/ws/msg_queue.rs @@ -83,7 +83,7 @@ async fn handle_tick( maximum_payload_size: usize, weak_seen_ids: Weak>>, ) -> (usize, usize) { - let (did_sent_seen_ids, messages_map) = next_batch_message(10, maximum_payload_size, queue).await; + let (did_sent_seen_ids, messages_map) = next_batch_message(20, maximum_payload_size, queue).await; if messages_map.is_empty() { return (0, 0); } @@ -227,9 +227,9 @@ fn calculate_next_tick_duration( Duration::from_secs(1) } else { match num_init_sync { - 0..=3 => default_interval, - 4..=7 => Duration::from_secs(4), - _ => Duration::from_secs(6), + 0..=10 => default_interval, + 11..=20 => Duration::from_secs(2), + _ => Duration::from_secs(4), } } } diff --git a/libs/collab-rt-protocol/src/protocol.rs b/libs/collab-rt-protocol/src/protocol.rs index 3e2db9c2..b8ff0738 100644 --- a/libs/collab-rt-protocol/src/protocol.rs +++ b/libs/collab-rt-protocol/src/protocol.rs @@ -68,9 +68,11 @@ impl CollabSyncProtocol for ClientSyncProtocol { update.missing.is_empty() ); } - let state_vector_v1 = txn.state_vector().encode_v1(); + + // when client handle sync step 2 and found missing updates, just return MissUpdates Error. + // the state vector should be none that will trigger a client init sync Err(RTProtocolError::MissUpdates { - state_vector_v1: Some(state_vector_v1), + state_vector_v1: None, reason: "client miss updates".to_string(), }) },