chore: Update client protocol sync interval (#761)
* chore: separate server/client miss update * chore: short pull timeout * chore: short tick interval * chore: fix clippy
This commit is contained in:
parent
8beac5c85f
commit
cccdf76116
|
|
@ -191,15 +191,20 @@ where
|
|||
let cloned_object = object.clone();
|
||||
let collab = collab.clone();
|
||||
let sink = sink.clone();
|
||||
let sync_reason = match state_vector_v1 {
|
||||
None => SyncReason::ClientMissUpdates { reason },
|
||||
Some(sv) => SyncReason::ServerMissUpdates {
|
||||
state_vector_v1: sv,
|
||||
reason,
|
||||
},
|
||||
};
|
||||
tokio::spawn(async move {
|
||||
select! {
|
||||
_ = new_cancel_token.cancelled() => {
|
||||
if cfg!(feature = "sync_verbose_log") {
|
||||
trace!("{} receive cancel signal, cancel pull missing updates", cloned_object.object_id);
|
||||
}
|
||||
trace!("{} cancel pull missing updates", cloned_object.object_id);
|
||||
},
|
||||
_ = tokio::time::sleep(tokio::time::Duration::from_secs(3)) => {
|
||||
Self::pull_missing_updates(&cloned_origin, &cloned_object, &collab, &sink, state_vector_v1, reason)
|
||||
_ = tokio::time::sleep(tokio::time::Duration::from_secs(1)) => {
|
||||
Self::pull_missing_updates(&cloned_origin, &cloned_object, &collab, &sink, sync_reason)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
|
@ -289,14 +294,9 @@ where
|
|||
object: &SyncObject,
|
||||
collab: &Arc<RwLock<dyn BorrowMut<Collab> + Send + Sync + 'static>>,
|
||||
sink: &Arc<CollabSink<Sink>>,
|
||||
state_vector_v1: Option<Vec<u8>>,
|
||||
reason: MissUpdateReason,
|
||||
reason: SyncReason,
|
||||
) {
|
||||
let lock = collab.read().await;
|
||||
let reason = SyncReason::MissUpdates {
|
||||
state_vector_v1,
|
||||
reason,
|
||||
};
|
||||
if let Err(err) = start_sync(origin.clone(), object, (*lock).borrow(), sink, reason) {
|
||||
error!("Error while start sync: {}", err);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -135,8 +135,11 @@ where
|
|||
|
||||
pub enum SyncReason {
|
||||
CollabInitialize,
|
||||
MissUpdates {
|
||||
state_vector_v1: Option<Vec<u8>>,
|
||||
ServerMissUpdates {
|
||||
state_vector_v1: Vec<u8>,
|
||||
reason: MissUpdateReason,
|
||||
},
|
||||
ClientMissUpdates {
|
||||
reason: MissUpdateReason,
|
||||
},
|
||||
ServerCannotApplyUpdate,
|
||||
|
|
@ -147,7 +150,8 @@ impl Display for SyncReason {
|
|||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
SyncReason::CollabInitialize => write!(f, "CollabInitialize"),
|
||||
SyncReason::MissUpdates { reason, .. } => write!(f, "MissUpdates: {}", reason),
|
||||
SyncReason::ServerMissUpdates { reason, .. } => write!(f, "ServerMissUpdates: {}", reason),
|
||||
SyncReason::ClientMissUpdates { reason } => write!(f, "ClientMissUpdates: {}", reason),
|
||||
SyncReason::ServerCannotApplyUpdate => write!(f, "ServerCannotApplyUpdate"),
|
||||
SyncReason::NetworkResume => write!(f, "NetworkResume"),
|
||||
}
|
||||
|
|
@ -186,46 +190,37 @@ where
|
|||
E: Into<anyhow::Error> + Send + Sync + 'static,
|
||||
Sink: SinkExt<Vec<ClientCollabMessage>, Error = E> + Send + Sync + Unpin + 'static,
|
||||
{
|
||||
if !sink.should_queue_init_sync() {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
if let Err(err) = sync_object.collab_type.validate_require_data(collab) {
|
||||
error!("start init sync :{}:{}", sync_object.object_id, err);
|
||||
return Err(SyncError::Internal(err));
|
||||
}
|
||||
|
||||
match reason {
|
||||
SyncReason::MissUpdates {
|
||||
SyncReason::ClientMissUpdates { reason } => {
|
||||
if !sink.should_queue_init_sync() {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
trace!("🔥{} start sync, reason:{}", &sync_object.object_id, reason);
|
||||
let awareness = collab.get_awareness();
|
||||
let payload = gen_sync_state(awareness, &ClientSyncProtocol)?;
|
||||
sink.queue_init_sync(|msg_id| {
|
||||
let init_sync = InitSync::new(
|
||||
origin,
|
||||
sync_object.object_id.clone(),
|
||||
sync_object.collab_type.clone(),
|
||||
sync_object.workspace_id.clone(),
|
||||
msg_id,
|
||||
payload,
|
||||
);
|
||||
ClientCollabMessage::new_init_sync(init_sync)
|
||||
});
|
||||
},
|
||||
SyncReason::ServerMissUpdates {
|
||||
state_vector_v1,
|
||||
reason,
|
||||
} => match state_vector_v1.and_then(|sv| StateVector::decode_v1(&sv).ok()) {
|
||||
None => {
|
||||
trace!(
|
||||
"🔥{} start init sync, reason:{}",
|
||||
&sync_object.object_id,
|
||||
reason
|
||||
);
|
||||
let awareness = collab.get_awareness();
|
||||
let payload = gen_sync_state(awareness, &ClientSyncProtocol)?;
|
||||
sink.queue_init_sync(|msg_id| {
|
||||
let init_sync = InitSync::new(
|
||||
origin,
|
||||
sync_object.object_id.clone(),
|
||||
sync_object.collab_type.clone(),
|
||||
sync_object.workspace_id.clone(),
|
||||
msg_id,
|
||||
payload,
|
||||
);
|
||||
ClientCollabMessage::new_init_sync(init_sync)
|
||||
});
|
||||
},
|
||||
Some(sv) => {
|
||||
trace!(
|
||||
"🔥{} start init sync with state vector, reason:{}",
|
||||
&sync_object.object_id,
|
||||
reason
|
||||
);
|
||||
} => match StateVector::decode_v1(&state_vector_v1) {
|
||||
Ok(sv) => {
|
||||
trace!("🔥{} start sync, reason:{}", &sync_object.object_id, reason);
|
||||
let update = gen_missing_updates(collab, sv)?;
|
||||
sink.queue_msg(|msg_id| {
|
||||
let update_sync = UpdateSync::new(
|
||||
|
|
@ -237,18 +232,18 @@ where
|
|||
ClientCollabMessage::new_update_sync(update_sync)
|
||||
});
|
||||
},
|
||||
Err(err) => error!("fail to decode server state vector: {}", err),
|
||||
},
|
||||
SyncReason::CollabInitialize
|
||||
| SyncReason::ServerCannotApplyUpdate
|
||||
| SyncReason::NetworkResume => {
|
||||
trace!(
|
||||
"🔥{} start init sync, reason: {}",
|
||||
"🔥{} start sync, reason: {}",
|
||||
&sync_object.object_id,
|
||||
reason
|
||||
);
|
||||
let awareness = collab.get_awareness();
|
||||
let payload = gen_sync_state(awareness, &ClientSyncProtocol)?;
|
||||
|
||||
sink.queue_init_sync(|msg_id| {
|
||||
let init_sync = InitSync::new(
|
||||
origin,
|
||||
|
|
|
|||
|
|
@ -83,7 +83,7 @@ async fn handle_tick(
|
|||
maximum_payload_size: usize,
|
||||
weak_seen_ids: Weak<Mutex<HashSet<SeenId>>>,
|
||||
) -> (usize, usize) {
|
||||
let (did_sent_seen_ids, messages_map) = next_batch_message(10, maximum_payload_size, queue).await;
|
||||
let (did_sent_seen_ids, messages_map) = next_batch_message(20, maximum_payload_size, queue).await;
|
||||
if messages_map.is_empty() {
|
||||
return (0, 0);
|
||||
}
|
||||
|
|
@ -227,9 +227,9 @@ fn calculate_next_tick_duration(
|
|||
Duration::from_secs(1)
|
||||
} else {
|
||||
match num_init_sync {
|
||||
0..=3 => default_interval,
|
||||
4..=7 => Duration::from_secs(4),
|
||||
_ => Duration::from_secs(6),
|
||||
0..=10 => default_interval,
|
||||
11..=20 => Duration::from_secs(2),
|
||||
_ => Duration::from_secs(4),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -68,9 +68,11 @@ impl CollabSyncProtocol for ClientSyncProtocol {
|
|||
update.missing.is_empty()
|
||||
);
|
||||
}
|
||||
let state_vector_v1 = txn.state_vector().encode_v1();
|
||||
|
||||
// when client handle sync step 2 and found missing updates, just return MissUpdates Error.
|
||||
// the state vector should be none that will trigger a client init sync
|
||||
Err(RTProtocolError::MissUpdates {
|
||||
state_vector_v1: Some(state_vector_v1),
|
||||
state_vector_v1: None,
|
||||
reason: "client miss updates".to_string(),
|
||||
})
|
||||
},
|
||||
|
|
|
|||
Loading…
Reference in New Issue