diff --git a/.sqlx/query-fba0b66b63ae9561c4868891770821abe34953a6ed21fd25639beab9a76780dd.json b/.sqlx/query-fba0b66b63ae9561c4868891770821abe34953a6ed21fd25639beab9a76780dd.json deleted file mode 100644 index 57e4fbdf..00000000 --- a/.sqlx/query-fba0b66b63ae9561c4868891770821abe34953a6ed21fd25639beab9a76780dd.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "INSERT INTO af_quick_note (workspace_id, uid, data) VALUES ($1, $2, $3)", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Uuid", - "Int8", - "Jsonb" - ] - }, - "nullable": [] - }, - "hash": "fba0b66b63ae9561c4868891770821abe34953a6ed21fd25639beab9a76780dd" -} diff --git a/services/appflowy-collaborate/src/group/group_init.rs b/services/appflowy-collaborate/src/group/group_init.rs index 79f83cfe..faf4978d 100644 --- a/services/appflowy-collaborate/src/group/group_init.rs +++ b/services/appflowy-collaborate/src/group/group_init.rs @@ -25,7 +25,7 @@ use collab_stream::error::StreamError; use collab_stream::model::{AwarenessStreamUpdate, CollabStreamUpdate, MessageId, UpdateFlags}; use dashmap::DashMap; use database::collab::{CollabStorage, GetCollabOrigin}; -use database_entity::dto::{CollabParams, InsertSnapshotParams, QueryCollabParams}; +use database_entity::dto::{CollabParams, QueryCollabParams}; use futures::{pin_mut, Sink, Stream}; use futures_util::{SinkExt, StreamExt}; use std::sync::atomic::{AtomicU32, Ordering}; @@ -993,7 +993,7 @@ impl CollabPersister { tracing::trace!("requested to load compact collab {}", self.object_id); // 1. Try to load the latest snapshot from storage let start = Instant::now(); - let mut collab = match self.load_collab_full(false).await? { + let mut collab = match self.load_collab_full().await? { Some(collab) => collab, None => Collab::new_with_origin(CollabOrigin::Server, self.object_id.clone(), vec![], false), }; @@ -1055,10 +1055,10 @@ impl CollabPersister { i += 1; let update: Update = update.into_update()?; if collab.is_none() { - collab = Some(match self.load_collab_full(true).await? { + collab = Some(match self.load_collab_full().await? { Some(collab) => collab, None => { - Collab::new_with_origin(CollabOrigin::Server, self.object_id.clone(), vec![], true) + Collab::new_with_origin(CollabOrigin::Server, self.object_id.clone(), vec![], false) }, }) }; @@ -1141,27 +1141,16 @@ impl CollabPersister { .lease(&self.workspace_id, &self.object_id) .await? { - // 1. Save full historic document state - let mut tx = collab.transact_mut(); - let sv = tx.state_vector().encode_v1(); - let doc_state_full = tx.encode_state_as_update_v1(&StateVector::default()); - let full_len = doc_state_full.len(); - self.write_doc_state_full(doc_state_full).await?; - - // 2. Generate document state with GC turned on and save it. - tx.force_gc(); - drop(tx); let doc_state_light = collab .transact() .encode_state_as_update_v1(&StateVector::default()); let light_len = doc_state_light.len(); - self.write_doc_state_light(sv, doc_state_light).await?; + self.write_collab(doc_state_light).await?; tracing::debug!( - "persisted collab {} snapshot at {}: {} and {} bytes", + "persisted collab {} snapshot at {}: {} bytes", self.object_id, message_id, - full_len, light_len ); @@ -1183,12 +1172,8 @@ impl CollabPersister { Ok(()) } - async fn write_doc_state_light( - &self, - sv: Vec, - doc_state_light: Vec, - ) -> Result<(), RealtimeError> { - let encoded_collab = EncodedCollab::new_v1(sv, doc_state_light) + async fn write_collab(&self, doc_state_v1: Vec) -> Result<(), RealtimeError> { + let encoded_collab = EncodedCollab::new_v1(Default::default(), doc_state_v1) .encode_to_bytes() .map(Bytes::from) .map_err(|err| RealtimeError::BincodeEncode(err.to_string()))?; @@ -1210,25 +1195,6 @@ impl CollabPersister { Ok(()) } - async fn write_doc_state_full(&self, doc_state_full: Vec) -> Result<(), RealtimeError> { - self - .metrics - .full_collab_size - .observe(doc_state_full.len() as f64); - let params = InsertSnapshotParams { - object_id: self.object_id.clone(), - doc_state: doc_state_full.into(), - workspace_id: self.workspace_id.clone(), - collab_type: self.collab_type.clone(), - }; - self - .storage - .create_snapshot(params) - .await - .map_err(|err| RealtimeError::CreateSnapshotFailed(err.to_string()))?; - Ok(()) - } - fn index_encoded_collab(&self, encoded_collab: Bytes) { let indexed_collab = IndexedCollab { object_id: self.object_id.clone(), @@ -1248,48 +1214,21 @@ impl CollabPersister { } } - async fn load_collab_full(&self, keep_history: bool) -> Result, RealtimeError> { - let doc_state = if keep_history { - // if we want history-keeping variant, we need to get a snapshot - let snapshot = self - .storage - .get_latest_snapshot( - &self.workspace_id, - &self.object_id, - self.collab_type.clone(), - ) - .await - .map_err(|err| RealtimeError::GetLatestSnapshotFailed(err.to_string()))?; - match snapshot { - None => None, - Some(snapshot) => { - let encoded_collab = EncodedCollab::decode_from_bytes(&snapshot.encoded_collab_v1) - .map_err(|err| RealtimeError::BincodeEncode(err.to_string()))?; - Some(encoded_collab.doc_state) - }, - } - } else { - None // if we want a lightweight variant, we'll fallback to default - }; - let doc_state = match doc_state { - Some(doc_state) => doc_state, - None => { - // we didn't find a snapshot, or we want a lightweight collab version - let params = QueryCollabParams::new( - self.object_id.clone(), - self.collab_type.clone(), - self.workspace_id.clone(), - ); - let result = self - .storage - .get_encode_collab(GetCollabOrigin::Server, params, false) - .await; - match result { - Ok(encoded_collab) => encoded_collab.doc_state, - Err(AppError::RecordNotFound(_)) => return Ok(None), - Err(err) => return Err(RealtimeError::Internal(err.into())), - } - }, + async fn load_collab_full(&self) -> Result, RealtimeError> { + // we didn't find a snapshot, or we want a lightweight collab version + let params = QueryCollabParams::new( + self.object_id.clone(), + self.collab_type.clone(), + self.workspace_id.clone(), + ); + let result = self + .storage + .get_encode_collab(GetCollabOrigin::Server, params, false) + .await; + let doc_state = match result { + Ok(encoded_collab) => encoded_collab.doc_state, + Err(AppError::RecordNotFound(_)) => return Ok(None), + Err(err) => return Err(RealtimeError::Internal(err.into())), }; let collab: Collab = Collab::new_with_source( @@ -1297,7 +1236,7 @@ impl CollabPersister { &self.object_id, DataSource::DocStateV1(doc_state.into()), vec![], - keep_history, // should we use history-remembering version (true) or lightweight one (false)? + false, )?; Ok(Some(collab)) } diff --git a/tests/collab/mod.rs b/tests/collab/mod.rs index e98dc547..2223bf81 100644 --- a/tests/collab/mod.rs +++ b/tests/collab/mod.rs @@ -7,6 +7,7 @@ mod missing_update_test; mod multi_devices_edit; mod permission_test; mod single_device_edit; +mod snapshot_test; mod storage_test; mod stress_test; pub mod util; diff --git a/tests/collab/snapshot_test.rs b/tests/collab/snapshot_test.rs new file mode 100644 index 00000000..2c71f23b --- /dev/null +++ b/tests/collab/snapshot_test.rs @@ -0,0 +1,85 @@ +use client_api_test::{assert_server_collab, TestClient}; +use collab::core::collab::DataSource; +use collab::core::origin::CollabOrigin; +use collab::entity::EncodedCollab; +use collab::preclude::{Collab, JsonValue}; +use collab_entity::CollabType; +use serde_json::json; + +#[tokio::test] +async fn read_write_snapshot() { + let mut c = TestClient::new_user().await; + + // prepare initial document + let wid = c.workspace_id().await; + let oid = c.create_and_edit_collab(&wid, CollabType::Unknown).await; + c.open_collab(&wid, &oid, CollabType::Unknown).await; + c.insert_into(&oid, "title", "t1").await; + c.wait_object_sync_complete(&oid).await.unwrap(); + assert_server_collab( + &wid, + &mut c.api_client, + &oid, + &CollabType::Unknown, + 10, + json!({"title": "t1"}), + ) + .await + .unwrap(); + // create the 1st snapshot + let m1 = c + .create_snapshot(&wid, &oid, CollabType::Unknown) + .await + .unwrap(); + + c.insert_into(&oid, "title", "t2").await; + c.wait_object_sync_complete(&oid).await.unwrap(); + assert_server_collab( + &wid, + &mut c.api_client, + &oid, + &CollabType::Unknown, + 10, + json!({"title": "t2"}), + ) + .await + .unwrap(); + // create the 2nd snapshot + let m2 = c + .create_snapshot(&wid, &oid, CollabType::Unknown) + .await + .unwrap(); + + let snapshots = c.get_snapshot_list(&wid, &oid).await.unwrap(); + assert_eq!(snapshots.0.len(), 2, "expecting 2 snapshots"); + + // retrieve state + verify_snapshot_state(&c, &wid, &oid, &m1.snapshot_id, json!({"title": "t1"})).await; + verify_snapshot_state(&c, &wid, &oid, &m2.snapshot_id, json!({"title": "t2"})).await; +} + +async fn verify_snapshot_state( + c: &TestClient, + workspace_id: &str, + oid: &str, + snapshot_id: &i64, + expected: JsonValue, +) { + let snapshot = c + .get_snapshot(workspace_id, oid, snapshot_id) + .await + .unwrap(); + + // retrieve state + let encoded_collab = EncodedCollab::decode_from_bytes(&snapshot.encoded_collab_v1).unwrap(); + let collab = Collab::new_with_source( + CollabOrigin::Empty, + oid, + DataSource::DocStateV1(encoded_collab.doc_state.into()), + vec![], + true, + ) + .unwrap(); + let actual = collab.to_json_value(); + assert_eq!(actual, expected); +}