From 453329dc0c3c8f45efcc65ee2b2075809a3bb6b0 Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Mon, 18 Dec 2023 02:27:49 +0800 Subject: [PATCH] feat: try to resotre from snapshot when fail to initialize the collab in CollabStoragePlugin (#222) --- docker-compose-dev.yml | 4 ++ libs/database/src/collab/collab_db_ops.rs | 3 +- libs/database/src/collab/collab_storage.rs | 17 ++++++-- libs/realtime/src/collaborate/broadcast.rs | 48 +++++++++++++--------- libs/realtime/src/collaborate/group.rs | 21 +++++++--- libs/realtime/src/collaborate/plugin.rs | 42 +++++++++++++++---- src/biz/collab/storage.rs | 24 +++++++++-- 7 files changed, 116 insertions(+), 43 deletions(-) diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml index 1aeca5ed..141d4660 100644 --- a/docker-compose-dev.yml +++ b/docker-compose-dev.yml @@ -22,6 +22,8 @@ services: - 5433:5432 volumes: - ./migrations/before:/docker-entrypoint-initdb.d + # comment out the following line if you want to persist data when restarting docker + #- postgres_data:/var/lib/postgresql/data redis: restart: on-failure @@ -95,3 +97,5 @@ services: volumes: - ./docker/pgadmin/servers.json:/pgadmin4/servers.json +volumes: + postgres_data: diff --git a/libs/database/src/collab/collab_db_ops.rs b/libs/database/src/collab/collab_db_ops.rs index f582011c..8705a117 100644 --- a/libs/database/src/collab/collab_db_ops.rs +++ b/libs/database/src/collab/collab_db_ops.rs @@ -5,6 +5,7 @@ use database_entity::dto::{ InsertCollabParams, QueryCollabResult, RawData, }; +use crate::collab::SNAPSHOT_PER_HOUR; use app_error::AppError; use chrono::{Duration, Utc}; use database_entity::pg_row::AFCollabMemerAccessLevelRow; @@ -288,8 +289,6 @@ pub async fn create_snapshot( Ok(()) } -const SNAPSHOT_PER_HOUR: i64 = 3; - /// Determines whether a new snapshot should be created for the given `oid`. /// /// This asynchronous function checks the most recent snapshot creation time for the specified `oid`. diff --git a/libs/database/src/collab/collab_storage.rs b/libs/database/src/collab/collab_storage.rs index 8f42d1ab..20dbbfa5 100644 --- a/libs/database/src/collab/collab_storage.rs +++ b/libs/database/src/collab/collab_storage.rs @@ -17,7 +17,8 @@ use std::sync::{Arc, Weak}; use tracing::{debug, warn}; use validator::Validate; -pub const COLLAB_SNAPSHOT_LIMIT: i64 = 10; +pub const COLLAB_SNAPSHOT_LIMIT: i64 = 15; +pub const SNAPSHOT_PER_HOUR: i64 = 6; pub type DatabaseResult = core::result::Result; /// [CollabStorageAccessControl] is a trait that provides access control when accessing the storage @@ -61,7 +62,9 @@ pub trait CollabStorage: Send + Sync + 'static { /// * `bool` - `true` if the collaboration exists, `false` otherwise. async fn is_exist(&self, object_id: &str) -> bool; - async fn cache_collab(&self, _object_id: &str, _collab: Weak); + async fn cache_collab(&self, object_id: &str, collab: Weak); + + async fn remove_collab_cache(&self, object_id: &str); async fn is_collab_exist(&self, oid: &str) -> DatabaseResult; @@ -129,8 +132,12 @@ where self.as_ref().is_exist(object_id).await } - async fn cache_collab(&self, _object_id: &str, _collab: Weak) { - self.as_ref().cache_collab(_object_id, _collab).await + async fn cache_collab(&self, object_id: &str, collab: Weak) { + self.as_ref().cache_collab(object_id, collab).await + } + + async fn remove_collab_cache(&self, object_id: &str) { + self.as_ref().remove_collab_cache(object_id).await } async fn is_collab_exist(&self, oid: &str) -> DatabaseResult { @@ -220,6 +227,8 @@ impl CollabStorage for CollabStoragePgImpl { async fn cache_collab(&self, _object_id: &str, _collab: Weak) {} + async fn remove_collab_cache(&self, _object_id: &str) {} + async fn is_collab_exist(&self, oid: &str) -> DatabaseResult { let is_exist = is_collab_exists(oid, &self.pg_pool).await?; Ok(is_exist) diff --git a/libs/realtime/src/collaborate/broadcast.rs b/libs/realtime/src/collaborate/broadcast.rs index 560b8efa..0fe921b7 100644 --- a/libs/realtime/src/collaborate/broadcast.rs +++ b/libs/realtime/src/collaborate/broadcast.rs @@ -29,11 +29,8 @@ pub struct CollabBroadcast { object_id: String, collab: MutexCollab, sender: Sender, - - #[allow(dead_code)] - awareness_sub: awareness::UpdateSubscription, - #[allow(dead_code)] - doc_sub: UpdateSubscription, + awareness_sub: Mutex>, + doc_sub: Mutex>, } impl CollabBroadcast { @@ -47,28 +44,43 @@ impl CollabBroadcast { let object_id = object_id.to_owned(); // broadcast channel let (sender, _) = channel(buffer_capacity); + CollabBroadcast { + object_id, + collab, + sender, + awareness_sub: Default::default(), + doc_sub: Default::default(), + } + } + + pub async fn observe_collab_changes(&self) { let (doc_sub, awareness_sub) = { - let mut mutex_collab = collab.lock(); + let mut mutex_collab = self.collab.lock(); // Observer the document's update and broadcast it to all subscribers. - let cloned_oid = object_id.clone(); - let broadcast_sink = sender.clone(); + let cloned_oid = self.object_id.clone(); + let broadcast_sink = self.sender.clone(); let doc_sub = mutex_collab .get_mut_awareness() .doc_mut() .observe_update_v1(move |txn, event| { - trace!("broadcast doc update with len:{}", event.update.len()); + let update_len = event.update.len(); let origin = CollabOrigin::from(txn); let payload = gen_update_message(&event.update); let msg = CollabBroadcastData::new(origin, cloned_oid.clone(), payload); - if let Err(e) = broadcast_sink.send(msg.into()) { - error!("broadcast sink fail: {}", e); + + match broadcast_sink.send(msg.into()) { + Ok(_) => trace!("observe doc update with len:{}", update_len), + Err(e) => error!( + "observe doc update with len:{} - broadcast sink fail: {}", + update_len, e + ), } }) .unwrap(); - let broadcast_sink = sender.clone(); - let cloned_oid = object_id.clone(); + let broadcast_sink = self.sender.clone(); + let cloned_oid = self.object_id.clone(); // Observer the awareness's update and broadcast it to all subscribers. let awareness_sub = mutex_collab @@ -84,13 +96,9 @@ impl CollabBroadcast { }); (doc_sub, awareness_sub) }; - CollabBroadcast { - object_id, - collab, - sender, - awareness_sub, - doc_sub, - } + + *self.doc_sub.lock().await = Some(doc_sub); + *self.awareness_sub.lock().await = Some(awareness_sub); } /// Returns a reference to an underlying [MutexCollab] instance. diff --git a/libs/realtime/src/collaborate/group.rs b/libs/realtime/src/collaborate/group.rs index 93aeacbc..da84773d 100644 --- a/libs/realtime/src/collaborate/group.rs +++ b/libs/realtime/src/collaborate/group.rs @@ -80,6 +80,7 @@ where match self.group_by_object_id.try_write() { Ok(mut group_by_object_id) => { group_by_object_id.remove(object_id); + // self.storage.remove_collab_cache(object_id).await; }, Err(err) => error!("Failed to acquire write lock to remove group: {:?}", err), } @@ -127,12 +128,7 @@ where let collab = Arc::new(collab.clone()); // The lifecycle of the collab is managed by the group. - let group = Arc::new(CollabGroup { - collab: collab.clone(), - broadcast, - subscribers: Default::default(), - }); - + let group = Arc::new(CollabGroup::new(collab.clone(), broadcast)); let plugin = CollabStoragePlugin::new( uid, workspace_id, @@ -148,6 +144,7 @@ where .storage .cache_collab(object_id, Arc::downgrade(&collab)) .await; + group.observe_collab().await; group } } @@ -169,6 +166,18 @@ impl CollabGroup where U: RealtimeUser, { + pub fn new(collab: Arc, broadcast: CollabBroadcast) -> Self { + Self { + collab, + broadcast, + subscribers: Default::default(), + } + } + + pub async fn observe_collab(&self) { + self.broadcast.observe_collab_changes().await; + } + /// Mutate the [Collab] by the given closure pub fn get_mut_collab(&self, f: F) where diff --git a/libs/realtime/src/collaborate/plugin.rs b/libs/realtime/src/collaborate/plugin.rs index 531bb943..612c9d6f 100644 --- a/libs/realtime/src/collaborate/plugin.rs +++ b/libs/realtime/src/collaborate/plugin.rs @@ -174,9 +174,11 @@ where }; match self.storage.get_collab_encoded_v1(&self.uid, params).await { - Ok(encoded_collab) => match init_collab_with_raw_data(&encoded_collab, doc).await { + Ok(encoded_collab_v1) => match init_collab_with_raw_data(&encoded_collab_v1, doc).await { Ok(_) => { - // Try to create a snapshot for the collab object + // Attempt to create a snapshot for the collaboration object. When creating this snapshot, it is + // assumed that the 'encoded_collab_v1' is already in a valid format. Therefore, there is no need + // to verify the outcome of the 'encode_to_bytes' operation. if self.storage.should_create_snapshot(object_id).await { let cloned_workspace_id = self.workspace_id.clone(); let cloned_object_id = object_id.to_string(); @@ -184,23 +186,39 @@ where let _ = tokio::task::spawn_blocking(move || { let params = InsertSnapshotParams { object_id: cloned_object_id, - encoded_collab_v1: encoded_collab.encode_to_bytes().unwrap(), + encoded_collab_v1: encoded_collab_v1.encode_to_bytes().unwrap(), workspace_id: cloned_workspace_id, }; tokio::spawn(async move { + // FIXME(nathan): There is a potential issue when concurrently spawning tasks to create snapshots. A subsequent + // task for creating a snapshot might write to the database before a previous task completes. To address + // this, consider using `stream!` to queue these tasks, ensuring they are executed in the order they were + // spawned. if let Err(err) = storage.create_snapshot(params).await { - error!("Create snapshot {:?}", err); + error!("create snapshot {:?}", err); } }); }) .await; } }, - Err(e) => error!("🔴Init collab failed: {:?}", e), + Err(err) => { + // When initializing a collaboration object, if the 'init_collab_with_raw_data' operation fails, attempt to + // restore the collaboration object from the latest snapshot. + if let Some(encoded_collab_v1) = get_latest_snapshot(object_id, &self.storage).await { + if let Err(err) = init_collab_with_raw_data(&encoded_collab_v1, doc).await { + error!("restore collab with snapshot failed: {:?}", err); + return; + } + } + error!("init collab failed: {:?}", err) + }, }, Err(err) => match &err { AppError::RecordNotFound(_) => { + // When attempting to retrieve collaboration data from the disk and a 'Record Not Found' error is returned, + // this indicates that the collaboration is new. Therefore, the current collaboration data should be saved to disk. if let Err(err) = self.insert_new_collab(doc, object_id).await { error!("Insert collab {:?}", err); } @@ -223,7 +241,7 @@ where self.edit_state.flush_edit(); trace!("number of updates reach flush_per_update, start flushing"); match self.group.upgrade() { - None => error!("🔴Group is dropped, skip flush collab"), + None => error!("Group is dropped, skip flush collab"), Some(group) => group.flush_collab(), } } @@ -260,11 +278,21 @@ where fn encoded_v1_from_doc(doc: &Doc) -> EncodedCollabV1 { let txn = doc.transact(); - let doc_state = txn.encode_state_as_update_v1(&StateVector::default()); let state_vector = txn.state_vector().encode_v1(); + let doc_state = txn.encode_state_as_update_v1(&StateVector::default()); EncodedCollabV1::new(state_vector, doc_state) } +async fn get_latest_snapshot(object_id: &str, storage: &S) -> Option +where + S: CollabStorage, +{ + let metas = storage.get_collab_snapshot_list(object_id).await.ok()?; + let meta = metas.0.first()?; + let snapshot_data = storage.get_collab_snapshot(&meta.snapshot_id).await.ok()?; + EncodedCollabV1::decode_from_bytes(&snapshot_data.encoded_collab_v1).ok() +} + struct CollabEditState { edit_count: AtomicU32, flush_edit_count: AtomicU32, diff --git a/src/biz/collab/storage.rs b/src/biz/collab/storage.rs index a91c37aa..577912cf 100644 --- a/src/biz/collab/storage.rs +++ b/src/biz/collab/storage.rs @@ -20,7 +20,7 @@ use std::{ sync::{Arc, Weak}, }; use tokio::sync::RwLock; -use tracing::{event, info, instrument}; +use tracing::{event, instrument}; use validator::Validate; pub type CollabPostgresDBStorage = CollabStorageWrapper< @@ -75,7 +75,7 @@ where } async fn cache_collab(&self, object_id: &str, collab: Weak) { - tracing::trace!("Cache collab:{} in memory", object_id); + tracing::trace!("cache collab:{}", object_id); self .collab_by_object_id .write() @@ -83,6 +83,11 @@ where .insert(object_id.to_string(), collab); } + async fn remove_collab_cache(&self, object_id: &str) { + tracing::trace!("remove collab:{} cache", object_id); + self.collab_by_object_id.write().await.remove(object_id); + } + async fn is_collab_exist(&self, oid: &str) -> DatabaseResult { self.inner.is_collab_exist(oid).await } @@ -156,9 +161,20 @@ where .and_then(|collab| collab.upgrade()); match collab { - None => self.inner.get_collab_encoded_v1(uid, params).await, + None => { + event!( + tracing::Level::DEBUG, + "Get collab data:{} from disk", + params.object_id + ); + self.inner.get_collab_encoded_v1(uid, params).await + }, Some(collab) => { - info!("Get collab data:{} from memory", params.object_id); + event!( + tracing::Level::DEBUG, + "Get collab data:{} from memory", + params.object_id + ); let data = collab.encode_collab_v1(); Ok(data) },