From 97335a94ec93092eb70d61289d09c2b94884ced3 Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Thu, 10 Oct 2024 11:25:17 +0200 Subject: [PATCH] chore: basics of snapshots (unoptimized) --- ...a673ea4e196920636e4a4db9d42fad3ef4d73.json | 14 ++ Cargo.lock | 2 +- libs/collab-stream/src/client.rs | 28 ++- libs/collab-stream/src/collab_update_sink.rs | 3 + .../src/group/group_init.rs | 211 ++++++++++-------- 5 files changed, 156 insertions(+), 102 deletions(-) create mode 100644 .sqlx/query-884c44d3a87ca4e520f9e8cec6ba673ea4e196920636e4a4db9d42fad3ef4d73.json diff --git a/.sqlx/query-884c44d3a87ca4e520f9e8cec6ba673ea4e196920636e4a4db9d42fad3ef4d73.json b/.sqlx/query-884c44d3a87ca4e520f9e8cec6ba673ea4e196920636e4a4db9d42fad3ef4d73.json new file mode 100644 index 00000000..aafc6f23 --- /dev/null +++ b/.sqlx/query-884c44d3a87ca4e520f9e8cec6ba673ea4e196920636e4a4db9d42fad3ef4d73.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE auth.users\n SET role = 'supabase_admin', email_confirmed_at = NOW()\n WHERE id = $1\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "884c44d3a87ca4e520f9e8cec6ba673ea4e196920636e4a4db9d42fad3ef4d73" +} diff --git a/Cargo.lock b/Cargo.lock index 7748a1fd..874aa37e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2469,7 +2469,7 @@ dependencies = [ "tokio-stream", "tokio-util", "tracing", - "zstd", + "zstd 0.13.2", ] [[package]] diff --git a/libs/collab-stream/src/client.rs b/libs/collab-stream/src/client.rs index 3fe891f5..6c5a7c2c 100644 --- a/libs/collab-stream/src/client.rs +++ b/libs/collab-stream/src/client.rs @@ -9,8 +9,8 @@ use crate::pubsub::{CollabStreamPub, CollabStreamSub}; use crate::stream_group::{StreamConfig, StreamGroup}; use futures::Stream; use redis::aio::ConnectionManager; -use redis::streams::{StreamReadOptions, StreamReadReply}; -use redis::AsyncCommands; +use redis::streams::{StreamRangeReply, StreamReadOptions, StreamReadReply}; +use redis::{AsyncCommands, FromRedisValue}; use std::time::Duration; use tracing::error; @@ -145,4 +145,28 @@ impl CollabRedisStream { } } } + + pub async fn prune_stream( + &self, + stream_key: &str, + message_id: MessageId, + ) -> Result { + let mut conn = self.connection_manager.clone(); + let value = conn.xrange(stream_key, "-", message_id.to_string()).await?; + let value = StreamRangeReply::from_owned_redis_value(value)?; + let msg_ids: Vec<_> = value + .ids + .into_iter() + .map(|stream_id| stream_id.id) + .collect(); + let count: usize = conn.xdel(stream_key, &msg_ids).await?; + drop(conn); + tracing::debug!( + "Pruned redis stream `{}` <= `{}` ({} objects)", + stream_key, + message_id, + count + ); + Ok(count) + } } diff --git a/libs/collab-stream/src/collab_update_sink.rs b/libs/collab-stream/src/collab_update_sink.rs index 392b3e50..de2b2834 100644 --- a/libs/collab-stream/src/collab_update_sink.rs +++ b/libs/collab-stream/src/collab_update_sink.rs @@ -55,6 +55,9 @@ impl AwarenessUpdateSink { let mut lock = self.conn.lock().await; let msg_id: MessageId = cmd("XADD") .arg(&self.stream_key) + .arg("MAXLEN") + .arg("~") + .arg(100) // we cap awareness stream to at most 20 awareness updates .arg("*") .arg("sender") .arg(msg.sender.to_string()) diff --git a/services/appflowy-collaborate/src/group/group_init.rs b/services/appflowy-collaborate/src/group/group_init.rs index a453ba54..d93e5222 100644 --- a/services/appflowy-collaborate/src/group/group_init.rs +++ b/services/appflowy-collaborate/src/group/group_init.rs @@ -27,7 +27,7 @@ use collab_stream::model::{ use collab_stream::stream_group::StreamGroup; use dashmap::DashMap; use database::collab::{CollabStorage, GetCollabOrigin}; -use database_entity::dto::{CollabParams, QueryCollabParams}; +use database_entity::dto::{CollabParams, InsertSnapshotParams, QueryCollabParams}; use futures::{pin_mut, Sink, Stream}; use futures_util::{SinkExt, StreamExt}; use std::collections::VecDeque; @@ -202,10 +202,6 @@ impl CollabGroup { } async fn handle_inbound_update(state: &CollabGroupState, update: CollabStreamUpdate) { - // we received new update, which means that our temp_collab within our persister's task - // is no longer up to date: we need to clear it - state.persister.clear_collab(); - // update state vector based on incoming message let mut sv = state.state_vector.write().await; sv.merge(update.state_vector); @@ -859,8 +855,6 @@ struct CollabPersister { storage: Arc, collab_redis_stream: Arc, indexer: Option>, - /// Collab stored temporarily. - temp_collab: ArcSwapOption, update_sink: CollabUpdateSink, awareness_sink: AwarenessUpdateSink, } @@ -889,15 +883,9 @@ impl CollabPersister { indexer, update_sink, awareness_sink, - temp_collab: Default::default(), } } - /// Drop temp collab i.e. because it was no longer up to date or was not accessed for too long. - fn clear_collab(&self) { - self.temp_collab.store(None); // cleanup temp collab - } - async fn send_update( &self, sender: CollabOrigin, @@ -934,41 +922,23 @@ impl CollabPersister { Ok(msg_id) } - async fn load(&self) -> Result, RealtimeError> { - match self.temp_collab.load_full() { - Some(collab) => Ok(collab), // return cached collab - None => self.load_internal().await, - } + async fn load(&self) -> Result { + self.load_internal(false).await } - async fn load_internal(&self) -> Result, RealtimeError> { + async fn load_internal(&self, skip_gc: bool) -> Result { // 1. Try to load the latest snapshot from storage - let params = QueryCollabParams::new( - self.object_id.clone(), - self.collab_type.clone(), - self.workspace_id.clone(), - ); - let encoded_collab = self - .storage - .get_encode_collab(GetCollabOrigin::Server, params, false) - .await - .map_err(|err| RealtimeError::Internal(err.into()))?; - let mut collab: Collab = Collab::new_with_source( - CollabOrigin::Server, - &self.object_id, - DataSource::DocStateV1(encoded_collab.doc_state.into()), - vec![], - true, // here we use history-remembering version - )?; + let mut collab = self.load_collab_full(skip_gc).await?; // 2. consume all Redis updates on top of it (keep redis msg id) let mut last_message_id = None; + let mut tx = collab.transact_mut(); let mut stream = self.collab_redis_stream.collab_updates( &self.workspace_id, &self.object_id, None, //TODO: store Redis last msg id somewhere in doc state snapshot and replay from there ); - let mut tx = collab.transact_mut(); + pin_mut!(stream); while let Some(res) = stream.next().await { match res { Ok((message_id, update)) => { @@ -990,76 +960,119 @@ impl CollabPersister { drop(tx); // apply transaction to compress the state (without GC) // now we have the most recent version of the document - let snapshot = Arc::new(CollabSnapshot { + let snapshot = CollabSnapshot { collab, last_message_id, - }); - if self.temp_collab.swap(Some(snapshot.clone())).is_none() { - // if previous value was none it means that it didn't exist yet or is newer than the persisted - // state. Since we already have a collab with loaded updates in memory, we can as well try - // to save its state - self.save_attempt(snapshot.clone()).await?; - } - + }; Ok(snapshot) } - async fn save_attempt(&self, snapshot: Arc) -> Result<(), RealtimeError> { - if let Some(last_message_id) = snapshot.last_message_id { - if let Some(mut lease) = self - .collab_redis_stream - .lease(&self.workspace_id, &self.object_id) - .await? - { - let doc_state_full = snapshot - .collab - .transact() - .encode_state_as_update_v1(&StateVector::default()); - let uid = 0; //FIXME: what UID should go there? - let params = CollabParams::new( - &self.object_id, - self.collab_type.clone(), - doc_state_full.clone(), - ); - self - .storage - .insert_or_update_collab(&self.workspace_id, &uid, params, true) - .await - .map_err(|err| RealtimeError::Internal(err.into()))?; - - // document state with historical data has been saved - now do GC - let mut collab: Collab = Collab::new_with_source( - CollabOrigin::Server, - &self.object_id, - DataSource::DocStateV1(doc_state_full), - vec![], - false, // here we use history-pruning version - )?; - let doc_state_gced = collab - .transact() - .encode_state_as_update_v1(&StateVector::default()); - let params = CollabParams::new( - &self.object_id, - self.collab_type.clone(), - doc_state_gced.clone(), - ); - self - .storage - .insert_or_update_collab(&self.workspace_id, &uid, params, true) - .await - .map_err(|err| RealtimeError::Internal(err.into()))?; - - // finally we can drop Redis messages - let message_id = MessageId { - timestamp_ms: last_message_id.timestamp_ms - Self::GRACE_PERIOD_MS, - sequence_number: 0, - }; - self.prune_updates(message_id); - let _ = lease.release().await; - } + async fn save(&self) -> Result<(), RealtimeError> { + let mut snapshot = self.load_internal(true).await?; + if let Some(message_id) = &snapshot.last_message_id { + // non-nil message_id means that we had to update the most recent collab state snapshot + // with new updates from Redis. This means that our snapshot state is newer than the last + // persisted one in the database + self.save_attempt(&mut snapshot.collab, message_id).await?; } Ok(()) } + + /// Tries to save provided `snapshot`. This snapshot is expected to have **GC turned off**, as + /// first it will try to save it as a historical snapshot (will all updates available), then it + /// will generate another (compact) snapshot variant that will be used as main one for loading + /// for the sake of y-sync protocol. + async fn save_attempt( + &self, + collab: &mut Collab, + message_id: &MessageId, + ) -> Result<(), RealtimeError> { + if !collab.get_awareness().doc().skip_gc() { + return Err(RealtimeError::UnexpectedData( + "tried to save history for snapshot with GC turned on", + )); + } + // try to acquire snapshot lease - it's possible that multiple web services will try to + // perform snapshot at the same time, so we'll use lease to let only one of them atm. + if let Some(mut lease) = self + .collab_redis_stream + .lease(&self.workspace_id, &self.object_id) + .await? + { + // 1. Save full historic document state + let mut tx = collab.transact_mut(); + let sv = tx.state_vector().encode_v1(); + let doc_state_full = tx.encode_state_as_update_v1(&StateVector::default()); + let encoded_collab = EncodedCollab::new_v1(sv.clone(), doc_state_full) + .encode_to_bytes() + .map_err(|err| RealtimeError::Internal(err.into()))?; + let params = InsertSnapshotParams { + object_id: self.object_id.clone(), + encoded_collab_v1: encoded_collab, + workspace_id: self.workspace_id.clone(), + collab_type: self.collab_type.clone(), + }; + self + .storage + .create_snapshot(params) + .await + .map_err(|err| RealtimeError::Internal(err.into()))?; + + // 2. Generate document state with GC turned on and save it. + tx.force_gc(); + drop(tx); + + let doc_state_light = collab + .transact() + .encode_state_as_update_v1(&StateVector::default()); + let encoded_collab = EncodedCollab::new_v1(sv, doc_state_light) + .encode_to_bytes() + .map_err(|err| RealtimeError::Internal(err.into()))?; + let params = CollabParams::new(&self.object_id, self.collab_type.clone(), encoded_collab); + let uid = 0; //FIXME: what UID should go there? + self + .storage + .insert_or_update_collab(&self.workspace_id, &uid, params, true) + .await + .map_err(|err| RealtimeError::Internal(err.into()))?; + + // 3. finally we can drop Redis messages + let msg_id = MessageId { + timestamp_ms: message_id.timestamp_ms - Self::GRACE_PERIOD_MS, + sequence_number: 0, + }; + let stream_key = CollabStreamUpdate::stream_key(&self.workspace_id, &self.object_id); + self + .collab_redis_stream + .prune_stream(&stream_key, msg_id) + .await?; + let _ = lease.release().await; + } + + Ok(()) + } + + async fn load_collab_full(&self, keep_history: bool) -> Result { + let params = QueryCollabParams::new( + self.object_id.clone(), + self.collab_type.clone(), + self.workspace_id.clone(), + ); + let encoded_collab = self + .storage + .get_encode_collab(GetCollabOrigin::Server, params, false) + .await + .map_err(|err| RealtimeError::Internal(err.into()))?; + + let collab: Collab = Collab::new_with_source( + CollabOrigin::Server, + &self.object_id, + DataSource::DocStateV1(encoded_collab.doc_state.into()), + vec![], + keep_history, // should we use history-remembering version (true) or lightweight one (false)? + )?; + Ok(collab) + } } pub struct CollabSnapshot {