chore: basics of snapshots (unoptimized)

This commit is contained in:
Bartosz Sypytkowski 2024-10-10 11:25:17 +02:00
parent 5ef6ab1738
commit a7713d9001
5 changed files with 156 additions and 102 deletions

View File

@ -0,0 +1,14 @@
{
"db_name": "PostgreSQL",
"query": "\n UPDATE auth.users\n SET role = 'supabase_admin', email_confirmed_at = NOW()\n WHERE id = $1\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": []
},
"hash": "884c44d3a87ca4e520f9e8cec6ba673ea4e196920636e4a4db9d42fad3ef4d73"
}

2
Cargo.lock generated
View File

@ -2469,7 +2469,7 @@ dependencies = [
"tokio-stream",
"tokio-util",
"tracing",
"zstd",
"zstd 0.13.2",
]
[[package]]

View File

@ -9,8 +9,8 @@ use crate::pubsub::{CollabStreamPub, CollabStreamSub};
use crate::stream_group::{StreamConfig, StreamGroup};
use futures::Stream;
use redis::aio::ConnectionManager;
use redis::streams::{StreamReadOptions, StreamReadReply};
use redis::AsyncCommands;
use redis::streams::{StreamRangeReply, StreamReadOptions, StreamReadReply};
use redis::{AsyncCommands, FromRedisValue};
use std::time::Duration;
use tracing::error;
@ -145,4 +145,28 @@ impl CollabRedisStream {
}
}
}
pub async fn prune_stream(
&self,
stream_key: &str,
message_id: MessageId,
) -> Result<usize, StreamError> {
let mut conn = self.connection_manager.clone();
let value = conn.xrange(stream_key, "-", message_id.to_string()).await?;
let value = StreamRangeReply::from_owned_redis_value(value)?;
let msg_ids: Vec<_> = value
.ids
.into_iter()
.map(|stream_id| stream_id.id)
.collect();
let count: usize = conn.xdel(stream_key, &msg_ids).await?;
drop(conn);
tracing::debug!(
"Pruned redis stream `{}` <= `{}` ({} objects)",
stream_key,
message_id,
count
);
Ok(count)
}
}

View File

@ -55,6 +55,9 @@ impl AwarenessUpdateSink {
let mut lock = self.conn.lock().await;
let msg_id: MessageId = cmd("XADD")
.arg(&self.stream_key)
.arg("MAXLEN")
.arg("~")
.arg(100) // we cap awareness stream to at most 20 awareness updates
.arg("*")
.arg("sender")
.arg(msg.sender.to_string())

View File

@ -27,7 +27,7 @@ use collab_stream::model::{
use collab_stream::stream_group::StreamGroup;
use dashmap::DashMap;
use database::collab::{CollabStorage, GetCollabOrigin};
use database_entity::dto::{CollabParams, QueryCollabParams};
use database_entity::dto::{CollabParams, InsertSnapshotParams, QueryCollabParams};
use futures::{pin_mut, Sink, Stream};
use futures_util::{SinkExt, StreamExt};
use std::collections::VecDeque;
@ -202,10 +202,6 @@ impl CollabGroup {
}
async fn handle_inbound_update(state: &CollabGroupState, update: CollabStreamUpdate) {
// we received new update, which means that our temp_collab within our persister's task
// is no longer up to date: we need to clear it
state.persister.clear_collab();
// update state vector based on incoming message
let mut sv = state.state_vector.write().await;
sv.merge(update.state_vector);
@ -859,8 +855,6 @@ struct CollabPersister {
storage: Arc<dyn CollabStorage>,
collab_redis_stream: Arc<CollabRedisStream>,
indexer: Option<Arc<dyn Indexer>>,
/// Collab stored temporarily.
temp_collab: ArcSwapOption<CollabSnapshot>,
update_sink: CollabUpdateSink,
awareness_sink: AwarenessUpdateSink,
}
@ -889,15 +883,9 @@ impl CollabPersister {
indexer,
update_sink,
awareness_sink,
temp_collab: Default::default(),
}
}
/// Drop temp collab i.e. because it was no longer up to date or was not accessed for too long.
fn clear_collab(&self) {
self.temp_collab.store(None); // cleanup temp collab
}
async fn send_update(
&self,
sender: CollabOrigin,
@ -934,41 +922,23 @@ impl CollabPersister {
Ok(msg_id)
}
async fn load(&self) -> Result<Arc<CollabSnapshot>, RealtimeError> {
match self.temp_collab.load_full() {
Some(collab) => Ok(collab), // return cached collab
None => self.load_internal().await,
}
async fn load(&self) -> Result<CollabSnapshot, RealtimeError> {
self.load_internal(false).await
}
async fn load_internal(&self) -> Result<Arc<CollabSnapshot>, RealtimeError> {
async fn load_internal(&self, skip_gc: bool) -> Result<CollabSnapshot, RealtimeError> {
// 1. Try to load the latest snapshot from storage
let params = QueryCollabParams::new(
self.object_id.clone(),
self.collab_type.clone(),
self.workspace_id.clone(),
);
let encoded_collab = self
.storage
.get_encode_collab(GetCollabOrigin::Server, params, false)
.await
.map_err(|err| RealtimeError::Internal(err.into()))?;
let mut collab: Collab = Collab::new_with_source(
CollabOrigin::Server,
&self.object_id,
DataSource::DocStateV1(encoded_collab.doc_state.into()),
vec![],
true, // here we use history-remembering version
)?;
let mut collab = self.load_collab_full(skip_gc).await?;
// 2. consume all Redis updates on top of it (keep redis msg id)
let mut last_message_id = None;
let mut tx = collab.transact_mut();
let mut stream = self.collab_redis_stream.collab_updates(
&self.workspace_id,
&self.object_id,
None, //TODO: store Redis last msg id somewhere in doc state snapshot and replay from there
);
let mut tx = collab.transact_mut();
pin_mut!(stream);
while let Some(res) = stream.next().await {
match res {
Ok((message_id, update)) => {
@ -990,76 +960,119 @@ impl CollabPersister {
drop(tx); // apply transaction to compress the state (without GC)
// now we have the most recent version of the document
let snapshot = Arc::new(CollabSnapshot {
let snapshot = CollabSnapshot {
collab,
last_message_id,
});
if self.temp_collab.swap(Some(snapshot.clone())).is_none() {
// if previous value was none it means that it didn't exist yet or is newer than the persisted
// state. Since we already have a collab with loaded updates in memory, we can as well try
// to save its state
self.save_attempt(snapshot.clone()).await?;
}
};
Ok(snapshot)
}
async fn save_attempt(&self, snapshot: Arc<CollabSnapshot>) -> Result<(), RealtimeError> {
if let Some(last_message_id) = snapshot.last_message_id {
if let Some(mut lease) = self
.collab_redis_stream
.lease(&self.workspace_id, &self.object_id)
.await?
{
let doc_state_full = snapshot
.collab
.transact()
.encode_state_as_update_v1(&StateVector::default());
let uid = 0; //FIXME: what UID should go there?
let params = CollabParams::new(
&self.object_id,
self.collab_type.clone(),
doc_state_full.clone(),
);
self
.storage
.insert_or_update_collab(&self.workspace_id, &uid, params, true)
.await
.map_err(|err| RealtimeError::Internal(err.into()))?;
// document state with historical data has been saved - now do GC
let mut collab: Collab = Collab::new_with_source(
CollabOrigin::Server,
&self.object_id,
DataSource::DocStateV1(doc_state_full),
vec![],
false, // here we use history-pruning version
)?;
let doc_state_gced = collab
.transact()
.encode_state_as_update_v1(&StateVector::default());
let params = CollabParams::new(
&self.object_id,
self.collab_type.clone(),
doc_state_gced.clone(),
);
self
.storage
.insert_or_update_collab(&self.workspace_id, &uid, params, true)
.await
.map_err(|err| RealtimeError::Internal(err.into()))?;
// finally we can drop Redis messages
let message_id = MessageId {
timestamp_ms: last_message_id.timestamp_ms - Self::GRACE_PERIOD_MS,
sequence_number: 0,
};
self.prune_updates(message_id);
let _ = lease.release().await;
}
async fn save(&self) -> Result<(), RealtimeError> {
let mut snapshot = self.load_internal(true).await?;
if let Some(message_id) = &snapshot.last_message_id {
// non-nil message_id means that we had to update the most recent collab state snapshot
// with new updates from Redis. This means that our snapshot state is newer than the last
// persisted one in the database
self.save_attempt(&mut snapshot.collab, message_id).await?;
}
Ok(())
}
/// Tries to save provided `snapshot`. This snapshot is expected to have **GC turned off**, as
/// first it will try to save it as a historical snapshot (will all updates available), then it
/// will generate another (compact) snapshot variant that will be used as main one for loading
/// for the sake of y-sync protocol.
async fn save_attempt(
&self,
collab: &mut Collab,
message_id: &MessageId,
) -> Result<(), RealtimeError> {
if !collab.get_awareness().doc().skip_gc() {
return Err(RealtimeError::UnexpectedData(
"tried to save history for snapshot with GC turned on",
));
}
// try to acquire snapshot lease - it's possible that multiple web services will try to
// perform snapshot at the same time, so we'll use lease to let only one of them atm.
if let Some(mut lease) = self
.collab_redis_stream
.lease(&self.workspace_id, &self.object_id)
.await?
{
// 1. Save full historic document state
let mut tx = collab.transact_mut();
let sv = tx.state_vector().encode_v1();
let doc_state_full = tx.encode_state_as_update_v1(&StateVector::default());
let encoded_collab = EncodedCollab::new_v1(sv.clone(), doc_state_full)
.encode_to_bytes()
.map_err(|err| RealtimeError::Internal(err.into()))?;
let params = InsertSnapshotParams {
object_id: self.object_id.clone(),
encoded_collab_v1: encoded_collab,
workspace_id: self.workspace_id.clone(),
collab_type: self.collab_type.clone(),
};
self
.storage
.create_snapshot(params)
.await
.map_err(|err| RealtimeError::Internal(err.into()))?;
// 2. Generate document state with GC turned on and save it.
tx.force_gc();
drop(tx);
let doc_state_light = collab
.transact()
.encode_state_as_update_v1(&StateVector::default());
let encoded_collab = EncodedCollab::new_v1(sv, doc_state_light)
.encode_to_bytes()
.map_err(|err| RealtimeError::Internal(err.into()))?;
let params = CollabParams::new(&self.object_id, self.collab_type.clone(), encoded_collab);
let uid = 0; //FIXME: what UID should go there?
self
.storage
.insert_or_update_collab(&self.workspace_id, &uid, params, true)
.await
.map_err(|err| RealtimeError::Internal(err.into()))?;
// 3. finally we can drop Redis messages
let msg_id = MessageId {
timestamp_ms: message_id.timestamp_ms - Self::GRACE_PERIOD_MS,
sequence_number: 0,
};
let stream_key = CollabStreamUpdate::stream_key(&self.workspace_id, &self.object_id);
self
.collab_redis_stream
.prune_stream(&stream_key, msg_id)
.await?;
let _ = lease.release().await;
}
Ok(())
}
async fn load_collab_full(&self, keep_history: bool) -> Result<Collab, RealtimeError> {
let params = QueryCollabParams::new(
self.object_id.clone(),
self.collab_type.clone(),
self.workspace_id.clone(),
);
let encoded_collab = self
.storage
.get_encode_collab(GetCollabOrigin::Server, params, false)
.await
.map_err(|err| RealtimeError::Internal(err.into()))?;
let collab: Collab = Collab::new_with_source(
CollabOrigin::Server,
&self.object_id,
DataSource::DocStateV1(encoded_collab.doc_state.into()),
vec![],
keep_history, // should we use history-remembering version (true) or lightweight one (false)?
)?;
Ok(collab)
}
}
pub struct CollabSnapshot {