chore: don't try to prune empty redis stream

This commit is contained in:
Bartosz Sypytkowski 2024-10-17 10:34:48 +02:00
parent d0b7c7d8e3
commit 85e25f887c
2 changed files with 25 additions and 13 deletions

View File

@ -167,6 +167,7 @@ impl CollabRedisStream {
.into_iter()
.map(|stream_id| stream_id.id)
.collect();
if !msg_ids.is_empty() {
let count: usize = conn.xdel(stream_key, &msg_ids).await?;
drop(conn);
tracing::debug!(
@ -176,5 +177,8 @@ impl CollabRedisStream {
count
);
Ok(count)
} else {
Ok(0)
}
}
}

View File

@ -612,26 +612,34 @@ impl CollabGroup {
if let Ok(sv) = state.state_vector.try_read() {
// we optimistically try to obtain state vector lock for a fast track:
// if we remote sv is up-to-date with current one, we don't need to do anything
if remote_sv == &*sv {
tracing::trace!("remote client is up to date - nothing to send");
return Ok(None);
match sv.partial_cmp(&remote_sv) {
Some(std::cmp::Ordering::Equal) => return Ok(None), // client and server are in sync
Some(std::cmp::Ordering::Less) => {
// server is behind client
let msg = Message::Sync(SyncMessage::SyncStep1(sv.clone()));
return Ok(Some(msg.encode_v1()));
},
Some(std::cmp::Ordering::Greater) | None => { /* server has some new updates */ },
}
}
// we need to reconstruct document state on the server side
let snapshot = state
.persister
.load()
.await
.map_err(|err| RTProtocolError::Internal(err.into()))?;
// prepare document state update and state vector
let tx = snapshot.collab.transact();
let doc_state = tx.encode_state_as_update_v1(remote_sv);
let local_sv = tx.state_vector();
drop(tx);
// Retrieve the latest document state from the client after they return online from offline editing.
tracing::trace!("sending missing data to client ({} bytes)", doc_state.len());
let mut encoder = EncoderV1::new();
Message::Sync(SyncMessage::SyncStep2(doc_state)).encode(&mut encoder);
//FIXME: this should never happen as response to sync step 1 from the client, but rather be
// send when a connection is established
Message::Sync(SyncMessage::SyncStep1(local_sv)).encode(&mut encoder);