chore: when saving collab snapshot, do not load it unless there are pending updates in redis

This commit is contained in:
Bartosz Sypytkowski 2024-10-30 06:53:31 +01:00
parent b6ba52672a
commit e86c9de316
2 changed files with 98 additions and 36 deletions

View File

@ -100,7 +100,7 @@ impl CollabRedisStream {
workspace_id: &str,
object_id: &str,
since: Option<MessageId>,
live: bool,
keep_alive: bool,
) -> impl Stream<Item = Result<(MessageId, CollabStreamUpdate), StreamError>> {
let mut conn = self.connection_manager.clone();
let stream_key = CollabStreamUpdate::stream_key(workspace_id, object_id);
@ -112,22 +112,17 @@ impl CollabRedisStream {
// reset once we get new data onboard
let mut backoff = ExponentialBackoff::new(Duration::from_millis(100), Duration::from_secs(10));
async_stream::try_stream! {
let until = if live {
None
} else {
let mut reply: StreamRangeReply = conn.xrevrange_count(&stream_key, "+", "-", 1).await?;
match reply.ids.pop().map(|stream_id| stream_id.id) {
None => Some(MessageId::default()),
Some(id) => Some(MessageId::try_from(id)?),
}
};
while until.is_none() || since < until.unwrap() {
loop {
let last_id = since.to_string();
let batch: CollabStreamUpdateBatch = conn
.xread_options(&[&stream_key], &[&last_id], &read_options)
.await?;
if batch.updates.is_empty() {
if !keep_alive {
// if stream is not set to keep alive, we finish it once we get all current messages
return;
}
backoff.sleep().await; // stream has no new messages, phase out
} else {
backoff.reset();

View File

@ -327,7 +327,7 @@ impl CollabGroup {
}
pub async fn encode_collab(&self) -> Result<EncodedCollab, RealtimeError> {
let snapshot = self.state.persister.load().await?;
let snapshot = self.state.persister.load_compact().await?;
let encode_collab = snapshot.collab.encode_collab_v1(|collab| {
self
.state
@ -635,7 +635,7 @@ impl CollabGroup {
tracing::debug!("loading collab {}", state.object_id);
let snapshot = state
.persister
.load()
.load_compact()
.await
.map_err(|err| RTProtocolError::Internal(err.into()))?;
@ -922,20 +922,12 @@ impl CollabPersister {
Ok(msg_id)
}
async fn load(&self) -> Result<CollabSnapshot, RealtimeError> {
self.load_internal(false).await
}
async fn load_internal(&self, skip_gc: bool) -> Result<CollabSnapshot, RealtimeError> {
/// Loads collab without its history. Used for handling y-sync protocol messages.
async fn load_compact(&self) -> Result<CollabSnapshot, RealtimeError> {
// 1. Try to load the latest snapshot from storage
let mut collab = match self.load_collab_full(skip_gc).await? {
let mut collab = match self.load_collab_full(false).await? {
Some(collab) => collab,
None => Collab::new_with_origin(
CollabOrigin::Server,
self.object_id.clone(),
vec![],
skip_gc,
),
None => Collab::new_with_origin(CollabOrigin::Server, self.object_id.clone(), vec![], false),
};
// 2. consume all Redis updates on top of it (keep redis msg id)
@ -968,11 +960,10 @@ impl CollabPersister {
},
}
}
drop(tx); // apply transaction to compress the state (without GC)
drop(tx);
tracing::trace!(
"loaded collab {} state: {} replaying {} updates",
"loaded collab compact state: {} replaying {} updates",
self.object_id,
if skip_gc { "full" } else { "compact" },
i
);
@ -984,14 +975,90 @@ impl CollabPersister {
Ok(snapshot)
}
/// Returns a collab state (with GC turned off), but only if there were any pending updates
/// waiting to be merged into main document state.
async fn load_if_changed(&self) -> Result<Option<CollabSnapshot>, RealtimeError> {
// 1. load pending Redis updates
let stream = self.collab_redis_stream.collab_updates(
&self.workspace_id,
&self.object_id,
None, //TODO: store Redis last msg id somewhere in doc state snapshot and replay from there
false, // read only data currently existing in the stream
);
pin_mut!(stream);
let mut i = 0;
let mut collab = None;
let mut last_message_id = None;
while let Some(res) = stream.next().await {
match res {
Ok((message_id, update)) => {
i += 1;
let update: Update = update.into_update()?;
if collab.is_none() {
collab = Some(match self.load_collab_full(true).await? {
Some(collab) => collab,
None => {
Collab::new_with_origin(CollabOrigin::Server, self.object_id.clone(), vec![], true)
},
})
};
let collab = collab.as_mut().unwrap();
collab
.transact_mut()
.apply_update(update)
.map_err(|err| RTProtocolError::YrsApplyUpdate(err.to_string()))?;
last_message_id = Some(message_id); //TODO: shouldn't this happen before decoding?
},
Err(err) => {
tracing::error!(
"`{}` failed to resolve collab update: {}",
self.object_id,
err
);
break;
},
}
}
// if there were no Redis updates, collab is still not initialized
match collab {
Some(collab) => {
tracing::trace!(
"loaded collab full state: {} replaying {} updates",
self.object_id,
i
);
{
let tx = collab.transact();
if tx.store().pending_update().is_some() || tx.store().pending_ds().is_some() {
tracing::trace!(
"loaded collab {} is incomplete: has pending data",
self.object_id
);
}
}
Ok(Some(CollabSnapshot {
collab,
last_message_id,
}))
},
None => Ok(None),
}
}
async fn save(&self) -> Result<(), RealtimeError> {
tracing::debug!("requesting save for collab {}", self.object_id);
let mut snapshot = self.load_internal(true).await?;
if let Some(message_id) = &snapshot.last_message_id {
// non-nil message_id means that we had to update the most recent collab state snapshot
// with new updates from Redis. This means that our snapshot state is newer than the last
// persisted one in the database
self.save_attempt(&mut snapshot.collab, message_id).await?;
// load collab but only if there were pending updates in Redis
if let Some(mut snapshot) = self.load_if_changed().await? {
tracing::debug!("requesting save for collab {}", self.object_id);
if let Some(message_id) = snapshot.last_message_id {
// non-nil message_id means that we had to update the most recent collab state snapshot
// with new updates from Redis. This means that our snapshot state is newer than the last
// persisted one in the database
self.save_attempt(&mut snapshot.collab, message_id).await?;
}
} else {
tracing::trace!("collab {} state has not changed", self.object_id);
}
Ok(())
}
@ -1003,7 +1070,7 @@ impl CollabPersister {
async fn save_attempt(
&self,
collab: &mut Collab,
message_id: &MessageId,
message_id: MessageId,
) -> Result<(), RealtimeError> {
if !collab.get_awareness().doc().skip_gc() {
return Err(RealtimeError::UnexpectedData(