chore: detect and re-request missing updates on the server side
This commit is contained in:
parent
923fe7ffde
commit
395424bfd2
|
|
@ -673,20 +673,34 @@ impl CollabGroup {
|
|||
.await
|
||||
.map_err(|err| RTProtocolError::Internal(err.into()))??
|
||||
};
|
||||
let state_vector = decoded_update.state_vector();
|
||||
state
|
||||
.persister
|
||||
.send_update(origin.clone(), update, state_vector)
|
||||
.await
|
||||
.map_err(|err| RTProtocolError::Internal(err.into()))?;
|
||||
let elapsed = start.elapsed();
|
||||
let missing_updates = {
|
||||
let state_vector = state.state_vector.read().await;
|
||||
match state_vector.partial_cmp(&decoded_update.state_vector_lower()) {
|
||||
None | Some(std::cmp::Ordering::Less) => Some(state_vector.clone()),
|
||||
_ => None,
|
||||
}
|
||||
};
|
||||
|
||||
state
|
||||
.metrics
|
||||
.apply_update_time
|
||||
.observe(elapsed.as_millis() as f64);
|
||||
if let Some(missing_updates) = missing_updates {
|
||||
let msg = Message::Sync(SyncMessage::SyncStep1(missing_updates));
|
||||
tracing::debug!("subscriber {} send update with missing data", origin);
|
||||
Ok(Some(msg.encode_v1()))
|
||||
} else {
|
||||
let upper_state_vector = decoded_update.state_vector();
|
||||
state
|
||||
.persister
|
||||
.send_update(origin.clone(), update, upper_state_vector)
|
||||
.await
|
||||
.map_err(|err| RTProtocolError::Internal(err.into()))?;
|
||||
let elapsed = start.elapsed();
|
||||
|
||||
Ok(None)
|
||||
state
|
||||
.metrics
|
||||
.apply_update_time
|
||||
.observe(elapsed.as_millis() as f64);
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_update(
|
||||
|
|
|
|||
Loading…
Reference in New Issue