diff --git a/libs/collab-stream/src/client.rs b/libs/collab-stream/src/client.rs index 4d33e2bf..4117c7dc 100644 --- a/libs/collab-stream/src/client.rs +++ b/libs/collab-stream/src/client.rs @@ -167,14 +167,18 @@ impl CollabRedisStream { .into_iter() .map(|stream_id| stream_id.id) .collect(); - let count: usize = conn.xdel(stream_key, &msg_ids).await?; - drop(conn); - tracing::debug!( - "Pruned redis stream `{}` <= `{}` ({} objects)", - stream_key, - message_id, - count - ); - Ok(count) + if !msg_ids.is_empty() { + let count: usize = conn.xdel(stream_key, &msg_ids).await?; + drop(conn); + tracing::debug!( + "Pruned redis stream `{}` <= `{}` ({} objects)", + stream_key, + message_id, + count + ); + Ok(count) + } else { + Ok(0) + } } } diff --git a/services/appflowy-collaborate/src/group/group_init.rs b/services/appflowy-collaborate/src/group/group_init.rs index c5340e87..e04611c1 100644 --- a/services/appflowy-collaborate/src/group/group_init.rs +++ b/services/appflowy-collaborate/src/group/group_init.rs @@ -612,26 +612,34 @@ impl CollabGroup { if let Ok(sv) = state.state_vector.try_read() { // we optimistically try to obtain state vector lock for a fast track: // if we remote sv is up-to-date with current one, we don't need to do anything - if remote_sv == &*sv { - tracing::trace!("remote client is up to date - nothing to send"); - return Ok(None); + match sv.partial_cmp(&remote_sv) { + Some(std::cmp::Ordering::Equal) => return Ok(None), // client and server are in sync + Some(std::cmp::Ordering::Less) => { + // server is behind client + let msg = Message::Sync(SyncMessage::SyncStep1(sv.clone())); + return Ok(Some(msg.encode_v1())); + }, + Some(std::cmp::Ordering::Greater) | None => { /* server has some new updates */ }, } } + // we need to reconstruct document state on the server side let snapshot = state .persister .load() .await .map_err(|err| RTProtocolError::Internal(err.into()))?; + + // prepare document state update and state vector let tx = snapshot.collab.transact(); let doc_state = tx.encode_state_as_update_v1(remote_sv); let local_sv = tx.state_vector(); drop(tx); + // Retrieve the latest document state from the client after they return online from offline editing. tracing::trace!("sending missing data to client ({} bytes)", doc_state.len()); let mut encoder = EncoderV1::new(); Message::Sync(SyncMessage::SyncStep2(doc_state)).encode(&mut encoder); - //FIXME: this should never happen as response to sync step 1 from the client, but rather be // send when a connection is established Message::Sync(SyncMessage::SyncStep1(local_sv)).encode(&mut encoder);