chore: a dirty draft of snapshot saving

This commit is contained in:
Bartosz Sypytkowski 2024-10-08 13:12:14 +02:00
parent 9164c494b1
commit 253bb97825
6 changed files with 130 additions and 23 deletions

1
Cargo.lock generated
View File

@ -2469,6 +2469,7 @@ dependencies = [
"tokio-stream", "tokio-stream",
"tokio-util", "tokio-util",
"tracing", "tracing",
"zstd",
] ]
[[package]] [[package]]

View File

@ -24,7 +24,7 @@ tokio-util = { version = "0.7" }
prost.workspace = true prost.workspace = true
async-stream.workspace = true async-stream.workspace = true
async-trait.workspace = true async-trait.workspace = true
zstd = "0.13"
[dev-dependencies] [dev-dependencies]
futures = "0.3.30" futures = "0.3.30"

View File

@ -38,7 +38,7 @@ impl CollabRedisStream {
workspace_id: &str, workspace_id: &str,
object_id: &str, object_id: &str,
) -> Result<Option<LeaseAcquisition>, StreamError> { ) -> Result<Option<LeaseAcquisition>, StreamError> {
let lease_key = format!("af_snapshot:lease:{}:{}", workspace_id, object_id); let lease_key = format!("af:{}:{}:snapshot_lease", workspace_id, object_id);
self self
.connection_manager .connection_manager
.lease(lease_key, Self::LEASE_TTL) .lease(lease_key, Self::LEASE_TTL)
@ -101,7 +101,7 @@ impl CollabRedisStream {
workspace_id: &str, workspace_id: &str,
object_id: &str, object_id: &str,
since: Option<MessageId>, since: Option<MessageId>,
) -> impl Stream<Item = Result<CollabStreamUpdate, StreamError>> { ) -> impl Stream<Item = Result<(MessageId, CollabStreamUpdate), StreamError>> {
// use `:` separator as it adheres to Redis naming conventions // use `:` separator as it adheres to Redis naming conventions
let mut conn = self.connection_manager.clone(); let mut conn = self.connection_manager.clone();
let stream_key = CollabStreamUpdate::stream_key(workspace_id, object_id); let stream_key = CollabStreamUpdate::stream_key(workspace_id, object_id);
@ -115,7 +115,7 @@ impl CollabRedisStream {
.await?; .await?;
for (message_id, update) in batch.updates { for (message_id, update) in batch.updates {
since = since.max(message_id); since = since.max(message_id);
yield update; yield (message_id, update);
} }
} }
} }

View File

@ -32,6 +32,12 @@ pub enum StreamError {
#[error(transparent)] #[error(transparent)]
BinCodeSerde(#[from] bincode::Error), BinCodeSerde(#[from] bincode::Error),
#[error("failed to decode update: {0}")]
UpdateError(#[from] collab::preclude::encoding::read::Error),
#[error("I/O error: {0}")]
IO(#[from] std::io::Error),
#[error("Internal error: {0}")] #[error("Internal error: {0}")]
Internal(anyhow::Error), Internal(anyhow::Error),
} }

View File

@ -392,6 +392,20 @@ impl CollabStreamUpdate {
pub fn stream_key(workspace_id: &str, object_id: &str) -> String { pub fn stream_key(workspace_id: &str, object_id: &str) -> String {
format!("af:{}:{}:updates", workspace_id, object_id) format!("af:{}:{}:updates", workspace_id, object_id)
} }
pub fn into_update(self) -> Result<collab::preclude::Update, StreamError> {
let bytes = if self.flags.is_compressed() {
zstd::decode_all(std::io::Cursor::new(self.data))?
} else {
self.data
};
let update = if self.flags.is_v1_encoded() {
collab::preclude::Update::decode_v1(&bytes)?
} else {
collab::preclude::Update::decode_v2(&bytes)?
};
Ok(update)
}
} }
pub(crate) struct CollabStreamUpdateBatch { pub(crate) struct CollabStreamUpdateBatch {

View File

@ -27,7 +27,7 @@ use collab_stream::model::{
use collab_stream::stream_group::StreamGroup; use collab_stream::stream_group::StreamGroup;
use dashmap::DashMap; use dashmap::DashMap;
use database::collab::{CollabStorage, GetCollabOrigin}; use database::collab::{CollabStorage, GetCollabOrigin};
use database_entity::dto::QueryCollabParams; use database_entity::dto::{CollabParams, QueryCollabParams};
use futures::{pin_mut, Sink, Stream}; use futures::{pin_mut, Sink, Stream};
use futures_util::{SinkExt, StreamExt}; use futures_util::{SinkExt, StreamExt};
use std::collections::VecDeque; use std::collections::VecDeque;
@ -184,7 +184,7 @@ impl CollabGroup {
} }
res = updates.next() => { res = updates.next() => {
match res { match res {
Some(Ok(update)) => { Some(Ok((_message_id, update))) => {
Self::handle_inbound_update(&state, update).await; Self::handle_inbound_update(&state, update).await;
}, },
Some(Err(err)) => { Some(Err(err)) => {
@ -866,6 +866,10 @@ struct CollabPersister {
} }
impl CollabPersister { impl CollabPersister {
/// A grace period for prunning Redis collab updates. Instead of deleting all messages we
/// read right away, we give 1min for other potential client to catch up.
pub const GRACE_PERIOD_MS: u64 = 1000 * 60; // 5min
pub fn new( pub fn new(
workspace_id: String, workspace_id: String,
object_id: String, object_id: String,
@ -947,30 +951,112 @@ impl CollabPersister {
let encoded_collab = self let encoded_collab = self
.storage .storage
.get_encode_collab(GetCollabOrigin::Server, params, false) .get_encode_collab(GetCollabOrigin::Server, params, false)
.await?; .await
let collab = Collab::new_with_source( .map_err(|err| RealtimeError::Internal(err.into()))?;
let mut collab: Collab = Collab::new_with_source(
CollabOrigin::Server, CollabOrigin::Server,
&self.object_id, &self.object_id,
DataSource::DocStateV1(encoded_collab.doc_state.into()), DataSource::DocStateV1(encoded_collab.doc_state.into()),
vec![], vec![],
false, true, // here we use history-remembering version
)?; )?;
// 2. consume all Redis updates on top of it (keep redis msg id) // 2. consume all Redis updates on top of it (keep redis msg id)
todo!() let mut last_message_id = None;
let mut stream = self.collab_redis_stream.collab_updates(
&self.workspace_id,
&self.object_id,
None, //TODO: store Redis last msg id somewhere in doc state snapshot and replay from there
);
let mut tx = collab.transact_mut();
while let Some(res) = stream.next().await {
match res {
Ok((message_id, update)) => {
let update: Update = update.into_update()?;
tx.apply_update(update)
.map_err(|err| RTProtocolError::YrsApplyUpdate(err.to_string()))?;
last_message_id = Some(message_id); //TODO: shouldn't this happen before decoding?
},
Err(err) => {
tracing::error!(
"`{}` failed to resolve collab update: {}",
self.object_id,
err
);
break;
},
}
}
drop(tx); // apply transaction to compress the state (without GC)
// now we have the most recent version of the document
let snapshot = Arc::new(CollabSnapshot {
collab,
last_message_id,
});
if self.temp_collab.swap(Some(snapshot.clone())).is_none() {
// if previous value was none it means that it didn't exist yet or is newer than the persisted
// state. Since we already have a collab with loaded updates in memory, we can as well try
// to save its state
self.save_attempt(snapshot.clone()).await?;
}
Ok(snapshot)
} }
async fn save(&self) -> Result<(), RealtimeError> { async fn save_attempt(&self, snapshot: Arc<CollabSnapshot>) -> Result<(), RealtimeError> {
// 1. try to acquire lease if let Some(last_message_id) = snapshot.last_message_id {
if let Some(lease) = self if let Some(mut lease) = self
.collab_redis_stream .collab_redis_stream
.lease(&self.workspace_id, &self.object_id) .lease(&self.workspace_id, &self.object_id)
.await? .await?
{ {
// 3. if collab has any changes (any redis updates were applied): let doc_state_full = snapshot
// 4. generate embeddings .collab
// 5. store collab .transact()
// 6. prune any redis msg ids older than 5 min. since collab snapshot time .encode_state_as_update_v1(&StateVector::default());
todo!() let uid = 0; //FIXME: what UID should go there?
let params = CollabParams::new(
&self.object_id,
self.collab_type.clone(),
doc_state_full.clone(),
);
self
.storage
.insert_or_update_collab(&self.workspace_id, &uid, params, true)
.await
.map_err(|err| RealtimeError::Internal(err.into()))?;
// document state with historical data has been saved - now do GC
let mut collab: Collab = Collab::new_with_source(
CollabOrigin::Server,
&self.object_id,
DataSource::DocStateV1(doc_state_full),
vec![],
false, // here we use history-pruning version
)?;
let doc_state_gced = collab
.transact()
.encode_state_as_update_v1(&StateVector::default());
let params = CollabParams::new(
&self.object_id,
self.collab_type.clone(),
doc_state_gced.clone(),
);
self
.storage
.insert_or_update_collab(&self.workspace_id, &uid, params, true)
.await
.map_err(|err| RealtimeError::Internal(err.into()))?;
// finally we can drop Redis messages
let message_id = MessageId {
timestamp_ms: last_message_id.timestamp_ms - Self::GRACE_PERIOD_MS,
sequence_number: 0,
};
self.prune_updates(message_id);
let _ = lease.release().await;
}
} }
Ok(()) Ok(())
} }
@ -978,5 +1064,5 @@ impl CollabPersister {
pub struct CollabSnapshot { pub struct CollabSnapshot {
pub collab: Collab, pub collab: Collab,
pub last_msg_id: MessageId, pub last_message_id: Option<MessageId>,
} }