chore: a dirty draft of snapshot saving
This commit is contained in:
parent
f0b907157e
commit
5ef6ab1738
|
|
@ -2469,6 +2469,7 @@ dependencies = [
|
||||||
"tokio-stream",
|
"tokio-stream",
|
||||||
"tokio-util",
|
"tokio-util",
|
||||||
"tracing",
|
"tracing",
|
||||||
|
"zstd",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,7 @@ tokio-util = { version = "0.7" }
|
||||||
prost.workspace = true
|
prost.workspace = true
|
||||||
async-stream.workspace = true
|
async-stream.workspace = true
|
||||||
async-trait.workspace = true
|
async-trait.workspace = true
|
||||||
|
zstd = "0.13"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
futures = "0.3.30"
|
futures = "0.3.30"
|
||||||
|
|
|
||||||
|
|
@ -38,7 +38,7 @@ impl CollabRedisStream {
|
||||||
workspace_id: &str,
|
workspace_id: &str,
|
||||||
object_id: &str,
|
object_id: &str,
|
||||||
) -> Result<Option<LeaseAcquisition>, StreamError> {
|
) -> Result<Option<LeaseAcquisition>, StreamError> {
|
||||||
let lease_key = format!("af_snapshot:lease:{}:{}", workspace_id, object_id);
|
let lease_key = format!("af:{}:{}:snapshot_lease", workspace_id, object_id);
|
||||||
self
|
self
|
||||||
.connection_manager
|
.connection_manager
|
||||||
.lease(lease_key, Self::LEASE_TTL)
|
.lease(lease_key, Self::LEASE_TTL)
|
||||||
|
|
@ -101,7 +101,7 @@ impl CollabRedisStream {
|
||||||
workspace_id: &str,
|
workspace_id: &str,
|
||||||
object_id: &str,
|
object_id: &str,
|
||||||
since: Option<MessageId>,
|
since: Option<MessageId>,
|
||||||
) -> impl Stream<Item = Result<CollabStreamUpdate, StreamError>> {
|
) -> impl Stream<Item = Result<(MessageId, CollabStreamUpdate), StreamError>> {
|
||||||
// use `:` separator as it adheres to Redis naming conventions
|
// use `:` separator as it adheres to Redis naming conventions
|
||||||
let mut conn = self.connection_manager.clone();
|
let mut conn = self.connection_manager.clone();
|
||||||
let stream_key = CollabStreamUpdate::stream_key(workspace_id, object_id);
|
let stream_key = CollabStreamUpdate::stream_key(workspace_id, object_id);
|
||||||
|
|
@ -115,7 +115,7 @@ impl CollabRedisStream {
|
||||||
.await?;
|
.await?;
|
||||||
for (message_id, update) in batch.updates {
|
for (message_id, update) in batch.updates {
|
||||||
since = since.max(message_id);
|
since = since.max(message_id);
|
||||||
yield update;
|
yield (message_id, update);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -32,6 +32,12 @@ pub enum StreamError {
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
BinCodeSerde(#[from] bincode::Error),
|
BinCodeSerde(#[from] bincode::Error),
|
||||||
|
|
||||||
|
#[error("failed to decode update: {0}")]
|
||||||
|
UpdateError(#[from] collab::preclude::encoding::read::Error),
|
||||||
|
|
||||||
|
#[error("I/O error: {0}")]
|
||||||
|
IO(#[from] std::io::Error),
|
||||||
|
|
||||||
#[error("Internal error: {0}")]
|
#[error("Internal error: {0}")]
|
||||||
Internal(anyhow::Error),
|
Internal(anyhow::Error),
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -392,6 +392,20 @@ impl CollabStreamUpdate {
|
||||||
pub fn stream_key(workspace_id: &str, object_id: &str) -> String {
|
pub fn stream_key(workspace_id: &str, object_id: &str) -> String {
|
||||||
format!("af:{}:{}:updates", workspace_id, object_id)
|
format!("af:{}:{}:updates", workspace_id, object_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn into_update(self) -> Result<collab::preclude::Update, StreamError> {
|
||||||
|
let bytes = if self.flags.is_compressed() {
|
||||||
|
zstd::decode_all(std::io::Cursor::new(self.data))?
|
||||||
|
} else {
|
||||||
|
self.data
|
||||||
|
};
|
||||||
|
let update = if self.flags.is_v1_encoded() {
|
||||||
|
collab::preclude::Update::decode_v1(&bytes)?
|
||||||
|
} else {
|
||||||
|
collab::preclude::Update::decode_v2(&bytes)?
|
||||||
|
};
|
||||||
|
Ok(update)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) struct CollabStreamUpdateBatch {
|
pub(crate) struct CollabStreamUpdateBatch {
|
||||||
|
|
|
||||||
|
|
@ -27,7 +27,7 @@ use collab_stream::model::{
|
||||||
use collab_stream::stream_group::StreamGroup;
|
use collab_stream::stream_group::StreamGroup;
|
||||||
use dashmap::DashMap;
|
use dashmap::DashMap;
|
||||||
use database::collab::{CollabStorage, GetCollabOrigin};
|
use database::collab::{CollabStorage, GetCollabOrigin};
|
||||||
use database_entity::dto::QueryCollabParams;
|
use database_entity::dto::{CollabParams, QueryCollabParams};
|
||||||
use futures::{pin_mut, Sink, Stream};
|
use futures::{pin_mut, Sink, Stream};
|
||||||
use futures_util::{SinkExt, StreamExt};
|
use futures_util::{SinkExt, StreamExt};
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
|
|
@ -184,7 +184,7 @@ impl CollabGroup {
|
||||||
}
|
}
|
||||||
res = updates.next() => {
|
res = updates.next() => {
|
||||||
match res {
|
match res {
|
||||||
Some(Ok(update)) => {
|
Some(Ok((_message_id, update))) => {
|
||||||
Self::handle_inbound_update(&state, update).await;
|
Self::handle_inbound_update(&state, update).await;
|
||||||
},
|
},
|
||||||
Some(Err(err)) => {
|
Some(Err(err)) => {
|
||||||
|
|
@ -866,6 +866,10 @@ struct CollabPersister {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl CollabPersister {
|
impl CollabPersister {
|
||||||
|
/// A grace period for prunning Redis collab updates. Instead of deleting all messages we
|
||||||
|
/// read right away, we give 1min for other potential client to catch up.
|
||||||
|
pub const GRACE_PERIOD_MS: u64 = 1000 * 60; // 5min
|
||||||
|
|
||||||
pub fn new(
|
pub fn new(
|
||||||
workspace_id: String,
|
workspace_id: String,
|
||||||
object_id: String,
|
object_id: String,
|
||||||
|
|
@ -947,30 +951,112 @@ impl CollabPersister {
|
||||||
let encoded_collab = self
|
let encoded_collab = self
|
||||||
.storage
|
.storage
|
||||||
.get_encode_collab(GetCollabOrigin::Server, params, false)
|
.get_encode_collab(GetCollabOrigin::Server, params, false)
|
||||||
.await?;
|
.await
|
||||||
let collab = Collab::new_with_source(
|
.map_err(|err| RealtimeError::Internal(err.into()))?;
|
||||||
|
let mut collab: Collab = Collab::new_with_source(
|
||||||
CollabOrigin::Server,
|
CollabOrigin::Server,
|
||||||
&self.object_id,
|
&self.object_id,
|
||||||
DataSource::DocStateV1(encoded_collab.doc_state.into()),
|
DataSource::DocStateV1(encoded_collab.doc_state.into()),
|
||||||
vec![],
|
vec![],
|
||||||
false,
|
true, // here we use history-remembering version
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
// 2. consume all Redis updates on top of it (keep redis msg id)
|
// 2. consume all Redis updates on top of it (keep redis msg id)
|
||||||
todo!()
|
let mut last_message_id = None;
|
||||||
|
let mut stream = self.collab_redis_stream.collab_updates(
|
||||||
|
&self.workspace_id,
|
||||||
|
&self.object_id,
|
||||||
|
None, //TODO: store Redis last msg id somewhere in doc state snapshot and replay from there
|
||||||
|
);
|
||||||
|
let mut tx = collab.transact_mut();
|
||||||
|
while let Some(res) = stream.next().await {
|
||||||
|
match res {
|
||||||
|
Ok((message_id, update)) => {
|
||||||
|
let update: Update = update.into_update()?;
|
||||||
|
tx.apply_update(update)
|
||||||
|
.map_err(|err| RTProtocolError::YrsApplyUpdate(err.to_string()))?;
|
||||||
|
last_message_id = Some(message_id); //TODO: shouldn't this happen before decoding?
|
||||||
|
},
|
||||||
|
Err(err) => {
|
||||||
|
tracing::error!(
|
||||||
|
"`{}` failed to resolve collab update: {}",
|
||||||
|
self.object_id,
|
||||||
|
err
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
drop(tx); // apply transaction to compress the state (without GC)
|
||||||
|
|
||||||
|
// now we have the most recent version of the document
|
||||||
|
let snapshot = Arc::new(CollabSnapshot {
|
||||||
|
collab,
|
||||||
|
last_message_id,
|
||||||
|
});
|
||||||
|
if self.temp_collab.swap(Some(snapshot.clone())).is_none() {
|
||||||
|
// if previous value was none it means that it didn't exist yet or is newer than the persisted
|
||||||
|
// state. Since we already have a collab with loaded updates in memory, we can as well try
|
||||||
|
// to save its state
|
||||||
|
self.save_attempt(snapshot.clone()).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(snapshot)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn save(&self) -> Result<(), RealtimeError> {
|
async fn save_attempt(&self, snapshot: Arc<CollabSnapshot>) -> Result<(), RealtimeError> {
|
||||||
// 1. try to acquire lease
|
if let Some(last_message_id) = snapshot.last_message_id {
|
||||||
if let Some(lease) = self
|
if let Some(mut lease) = self
|
||||||
.collab_redis_stream
|
.collab_redis_stream
|
||||||
.lease(&self.workspace_id, &self.object_id)
|
.lease(&self.workspace_id, &self.object_id)
|
||||||
.await?
|
.await?
|
||||||
{
|
{
|
||||||
// 3. if collab has any changes (any redis updates were applied):
|
let doc_state_full = snapshot
|
||||||
// 4. generate embeddings
|
.collab
|
||||||
// 5. store collab
|
.transact()
|
||||||
// 6. prune any redis msg ids older than 5 min. since collab snapshot time
|
.encode_state_as_update_v1(&StateVector::default());
|
||||||
todo!()
|
let uid = 0; //FIXME: what UID should go there?
|
||||||
|
let params = CollabParams::new(
|
||||||
|
&self.object_id,
|
||||||
|
self.collab_type.clone(),
|
||||||
|
doc_state_full.clone(),
|
||||||
|
);
|
||||||
|
self
|
||||||
|
.storage
|
||||||
|
.insert_or_update_collab(&self.workspace_id, &uid, params, true)
|
||||||
|
.await
|
||||||
|
.map_err(|err| RealtimeError::Internal(err.into()))?;
|
||||||
|
|
||||||
|
// document state with historical data has been saved - now do GC
|
||||||
|
let mut collab: Collab = Collab::new_with_source(
|
||||||
|
CollabOrigin::Server,
|
||||||
|
&self.object_id,
|
||||||
|
DataSource::DocStateV1(doc_state_full),
|
||||||
|
vec![],
|
||||||
|
false, // here we use history-pruning version
|
||||||
|
)?;
|
||||||
|
let doc_state_gced = collab
|
||||||
|
.transact()
|
||||||
|
.encode_state_as_update_v1(&StateVector::default());
|
||||||
|
let params = CollabParams::new(
|
||||||
|
&self.object_id,
|
||||||
|
self.collab_type.clone(),
|
||||||
|
doc_state_gced.clone(),
|
||||||
|
);
|
||||||
|
self
|
||||||
|
.storage
|
||||||
|
.insert_or_update_collab(&self.workspace_id, &uid, params, true)
|
||||||
|
.await
|
||||||
|
.map_err(|err| RealtimeError::Internal(err.into()))?;
|
||||||
|
|
||||||
|
// finally we can drop Redis messages
|
||||||
|
let message_id = MessageId {
|
||||||
|
timestamp_ms: last_message_id.timestamp_ms - Self::GRACE_PERIOD_MS,
|
||||||
|
sequence_number: 0,
|
||||||
|
};
|
||||||
|
self.prune_updates(message_id);
|
||||||
|
let _ = lease.release().await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
@ -978,5 +1064,5 @@ impl CollabPersister {
|
||||||
|
|
||||||
pub struct CollabSnapshot {
|
pub struct CollabSnapshot {
|
||||||
pub collab: Collab,
|
pub collab: Collab,
|
||||||
pub last_msg_id: MessageId,
|
pub last_message_id: Option<MessageId>,
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue