chore: don't try to prune empty redis stream
This commit is contained in:
parent
13d657861a
commit
15f438ea05
|
|
@ -167,14 +167,18 @@ impl CollabRedisStream {
|
|||
.into_iter()
|
||||
.map(|stream_id| stream_id.id)
|
||||
.collect();
|
||||
let count: usize = conn.xdel(stream_key, &msg_ids).await?;
|
||||
drop(conn);
|
||||
tracing::debug!(
|
||||
"Pruned redis stream `{}` <= `{}` ({} objects)",
|
||||
stream_key,
|
||||
message_id,
|
||||
count
|
||||
);
|
||||
Ok(count)
|
||||
if !msg_ids.is_empty() {
|
||||
let count: usize = conn.xdel(stream_key, &msg_ids).await?;
|
||||
drop(conn);
|
||||
tracing::debug!(
|
||||
"Pruned redis stream `{}` <= `{}` ({} objects)",
|
||||
stream_key,
|
||||
message_id,
|
||||
count
|
||||
);
|
||||
Ok(count)
|
||||
} else {
|
||||
Ok(0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -612,26 +612,34 @@ impl CollabGroup {
|
|||
if let Ok(sv) = state.state_vector.try_read() {
|
||||
// we optimistically try to obtain state vector lock for a fast track:
|
||||
// if we remote sv is up-to-date with current one, we don't need to do anything
|
||||
if remote_sv == &*sv {
|
||||
tracing::trace!("remote client is up to date - nothing to send");
|
||||
return Ok(None);
|
||||
match sv.partial_cmp(&remote_sv) {
|
||||
Some(std::cmp::Ordering::Equal) => return Ok(None), // client and server are in sync
|
||||
Some(std::cmp::Ordering::Less) => {
|
||||
// server is behind client
|
||||
let msg = Message::Sync(SyncMessage::SyncStep1(sv.clone()));
|
||||
return Ok(Some(msg.encode_v1()));
|
||||
},
|
||||
Some(std::cmp::Ordering::Greater) | None => { /* server has some new updates */ },
|
||||
}
|
||||
}
|
||||
|
||||
// we need to reconstruct document state on the server side
|
||||
let snapshot = state
|
||||
.persister
|
||||
.load()
|
||||
.await
|
||||
.map_err(|err| RTProtocolError::Internal(err.into()))?;
|
||||
|
||||
// prepare document state update and state vector
|
||||
let tx = snapshot.collab.transact();
|
||||
let doc_state = tx.encode_state_as_update_v1(remote_sv);
|
||||
let local_sv = tx.state_vector();
|
||||
drop(tx);
|
||||
|
||||
// Retrieve the latest document state from the client after they return online from offline editing.
|
||||
tracing::trace!("sending missing data to client ({} bytes)", doc_state.len());
|
||||
let mut encoder = EncoderV1::new();
|
||||
Message::Sync(SyncMessage::SyncStep2(doc_state)).encode(&mut encoder);
|
||||
|
||||
//FIXME: this should never happen as response to sync step 1 from the client, but rather be
|
||||
// send when a connection is established
|
||||
Message::Sync(SyncMessage::SyncStep1(local_sv)).encode(&mut encoder);
|
||||
|
|
|
|||
Loading…
Reference in New Issue