diff --git a/services/appflowy-collaborate/src/group/group_init.rs b/services/appflowy-collaborate/src/group/group_init.rs index aedcd6d1..c05cc63b 100644 --- a/services/appflowy-collaborate/src/group/group_init.rs +++ b/services/appflowy-collaborate/src/group/group_init.rs @@ -673,20 +673,34 @@ impl CollabGroup { .await .map_err(|err| RTProtocolError::Internal(err.into()))?? }; - let state_vector = decoded_update.state_vector(); - state - .persister - .send_update(origin.clone(), update, state_vector) - .await - .map_err(|err| RTProtocolError::Internal(err.into()))?; - let elapsed = start.elapsed(); + let missing_updates = { + let state_vector = state.state_vector.read().await; + match state_vector.partial_cmp(&decoded_update.state_vector_lower()) { + None | Some(std::cmp::Ordering::Less) => Some(state_vector.clone()), + _ => None, + } + }; - state - .metrics - .apply_update_time - .observe(elapsed.as_millis() as f64); + if let Some(missing_updates) = missing_updates { + let msg = Message::Sync(SyncMessage::SyncStep1(missing_updates)); + tracing::debug!("subscriber {} send update with missing data", origin); + Ok(Some(msg.encode_v1())) + } else { + let upper_state_vector = decoded_update.state_vector(); + state + .persister + .send_update(origin.clone(), update, upper_state_vector) + .await + .map_err(|err| RTProtocolError::Internal(err.into()))?; + let elapsed = start.elapsed(); - Ok(None) + state + .metrics + .apply_update_time + .observe(elapsed.as_millis() as f64); + + Ok(None) + } } async fn handle_update(