diff --git a/libs/database-entity/src/dto.rs b/libs/database-entity/src/dto.rs index 5e223d09..ee291143 100644 --- a/libs/database-entity/src/dto.rs +++ b/libs/database-entity/src/dto.rs @@ -24,6 +24,13 @@ impl CreateCollabParams { pub fn split(self) -> (CollabParams, String) { (self.inner, self.workspace_id) } + + pub fn new(workspace_id: T, params: CollabParams) -> Self { + Self { + inner: params, + workspace_id: workspace_id.to_string(), + } + } } impl Deref for CreateCollabParams { @@ -41,6 +48,30 @@ pub struct CollabParams { #[validate(custom = "validate_not_empty_payload")] pub encoded_collab_v1: Vec, pub collab_type: CollabType, + /// Determine whether to override the collab if it exists. Default is false. + #[serde(default)] + pub override_if_exist: bool, +} + +impl CollabParams { + pub fn new( + object_id: T, + collab_type: CollabType, + encoded_collab_v1: Vec, + ) -> Self { + let object_id = object_id.to_string(); + Self { + object_id, + collab_type, + encoded_collab_v1, + override_if_exist: false, + } + } + + pub fn override_collab_if_exist(mut self, override_if_exist: bool) -> Self { + self.override_if_exist = override_if_exist; + self + } } #[derive(Debug, Clone, Validate, Serialize, Deserialize)] @@ -50,43 +81,6 @@ pub struct BatchCreateCollabParams { pub params_list: Vec, } -impl CreateCollabParams { - pub fn new( - object_id: T, - collab_type: CollabType, - encoded_collab_v1: Vec, - workspace_id: String, - ) -> Self { - let object_id = object_id.to_string(); - let params = CollabParams { - object_id, - collab_type, - encoded_collab_v1, - }; - Self { - inner: params, - workspace_id, - } - } - pub fn from_raw_data( - object_id: String, - collab_type: CollabType, - encoded_collab_v1: Vec, - workspace_id: &str, - ) -> Self { - let workspace_id = workspace_id.to_string(); - let params = CollabParams { - object_id, - collab_type, - encoded_collab_v1, - }; - Self { - inner: params, - workspace_id, - } - } -} - #[derive(Debug, Clone, Validate, Serialize, Deserialize)] pub struct DeleteCollabParams { #[validate(custom = "validate_not_empty_str")] diff --git a/libs/realtime-protocol/src/message.rs b/libs/realtime-protocol/src/message.rs index 20bf204f..d52b75f3 100644 --- a/libs/realtime-protocol/src/message.rs +++ b/libs/realtime-protocol/src/message.rs @@ -162,8 +162,11 @@ pub const MSG_SYNC_UPDATE: u8 = 2; #[derive(Debug, PartialEq, Eq)] pub enum SyncMessage { + /// Sync step 1 message contains the [StateVector] from the remote side SyncStep1(StateVector), + /// Sync step 2 message contains the encoded [yrs::Update] from the remote side SyncStep2(Vec), + /// Update message contains the encoded [yrs::Update] from the remote side Update(Vec), } diff --git a/libs/realtime/src/collaborate/plugin.rs b/libs/realtime/src/collaborate/plugin.rs index 66f56c9a..cfcd705e 100644 --- a/libs/realtime/src/collaborate/plugin.rs +++ b/libs/realtime/src/collaborate/plugin.rs @@ -15,7 +15,7 @@ use collab::preclude::{CollabPlugin, Doc, TransactionMut}; use collab_entity::CollabType; use database::collab::CollabStorage; use database_entity::dto::{ - AFAccessLevel, CreateCollabParams, InsertSnapshotParams, QueryCollabParams, + AFAccessLevel, CollabParams, CreateCollabParams, InsertSnapshotParams, QueryCollabParams, }; use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, Ordering}; use std::sync::{Arc, Weak}; @@ -84,11 +84,9 @@ where ) .await; - let params = CreateCollabParams::from_raw_data( - object_id.to_string(), - self.collab_type.clone(), - encoded_collab_v1, + let params = CreateCollabParams::new( &self.workspace_id, + CollabParams::new(object_id, self.collab_type.clone(), encoded_collab_v1), ); self @@ -266,11 +264,9 @@ where }, }; - let params = CreateCollabParams::from_raw_data( - object_id.to_string(), - self.collab_type.clone(), - encoded_collab_v1, + let params = CreateCollabParams::new( &self.workspace_id, + CollabParams::new(object_id, self.collab_type.clone(), encoded_collab_v1), ); let storage = self.storage.clone(); diff --git a/libs/realtime/src/collaborate/sync_protocol.rs b/libs/realtime/src/collaborate/sync_protocol.rs index 755f1f74..19d66b07 100644 --- a/libs/realtime/src/collaborate/sync_protocol.rs +++ b/libs/realtime/src/collaborate/sync_protocol.rs @@ -13,12 +13,14 @@ impl CollabSyncProtocol for ServerSyncProtocol { sv: StateVector, ) -> Result>, Error> { let txn = awareness.doc().transact(); - let step2_update = txn.encode_state_as_update_v1(&sv); - let step1_update = txn.state_vector(); + let client_step2_update = txn.encode_state_as_update_v1(&sv); + + // Retrieve the latest document state from the client after they return online from offline editing. + let server_step1_update = txn.state_vector(); let mut encoder = EncoderV1::new(); - Message::Sync(SyncMessage::SyncStep2(step2_update)).encode(&mut encoder); - Message::Sync(SyncMessage::SyncStep1(step1_update)).encode(&mut encoder); + Message::Sync(SyncMessage::SyncStep2(client_step2_update)).encode(&mut encoder); + Message::Sync(SyncMessage::SyncStep1(server_step1_update)).encode(&mut encoder); Ok(Some(encoder.to_vec())) } diff --git a/src/biz/collab/ops.rs b/src/biz/collab/ops.rs index f487e390..da764237 100644 --- a/src/biz/collab/ops.rs +++ b/src/biz/collab/ops.rs @@ -24,7 +24,9 @@ where C: CollabAccessControl, { for params in params_list { - if database::collab::collab_exists(pg_pool, ¶ms.object_id).await? { + if !params.override_if_exist + && database::collab::collab_exists(pg_pool, ¶ms.object_id).await? + { // When calling this function, the caller should have already checked if the collab exists. return Err(AppError::RecordAlreadyExists(format!( "Collab with object_id {} already exists", diff --git a/src/biz/user.rs b/src/biz/user.rs index 8f7f010f..89512d96 100644 --- a/src/biz/user.rs +++ b/src/biz/user.rs @@ -116,6 +116,7 @@ where object_id, encoded_collab_v1, collab_type: template.object_type, + override_if_exist: false, }, ) .await?; diff --git a/tests/collab/collab_curd_test.rs b/tests/collab/collab_curd_test.rs index f02a5bd3..a4d1873b 100644 --- a/tests/collab/collab_curd_test.rs +++ b/tests/collab/collab_curd_test.rs @@ -1,5 +1,6 @@ use crate::util::test_client::TestClient; use app_error::ErrorCode; +use assert_json_diff::assert_json_include; use collab_entity::CollabType; use database_entity::dto::{CollabParams, CreateCollabParams, QueryCollab}; use serde::Serialize; @@ -30,6 +31,7 @@ async fn batch_insert_collab_success_test() { object_id: Uuid::new_v4().to_string(), encoded_collab_v1: vec![0, 200], collab_type: CollabType::Document, + override_if_exist: false, }) .collect::>(); @@ -68,7 +70,7 @@ async fn create_collab_params_compatibility_serde_test() { serde_json::from_value::(old_version_value.clone()).unwrap(); let new_version_value = serde_json::to_value(new_version_create_params.clone()).unwrap(); - assert_eq!(old_version_value, new_version_value); + assert_json_include!(actual: new_version_value.clone(), expected: old_version_value.clone()); assert_eq!(new_version_create_params.object_id, "object_id".to_string()); assert_eq!(new_version_create_params.encoded_collab_v1, vec![0, 200]); diff --git a/tests/collab/member_crud.rs b/tests/collab/member_crud.rs index a8654fed..ff04185e 100644 --- a/tests/collab/member_crud.rs +++ b/tests/collab/member_crud.rs @@ -2,7 +2,7 @@ use crate::collab::workspace_id_from_client; use crate::user::utils::generate_unique_registered_user_client; use collab_entity::CollabType; use database_entity::dto::{ - AFAccessLevel, CollabMemberIdentify, CreateCollabParams, InsertCollabMemberParams, + AFAccessLevel, CollabMemberIdentify, CollabParams, CreateCollabParams, InsertCollabMemberParams, QueryCollabMembers, UpdateCollabMemberParams, }; use uuid::Uuid; @@ -16,10 +16,8 @@ async fn collab_owner_permission_test() { let uid = c.get_profile().await.unwrap().uid; c.create_collab(CreateCollabParams::new( - &object_id, - CollabType::Document, - raw_data.clone(), workspace_id.clone(), + CollabParams::new(&object_id, CollabType::Document, raw_data.clone()), )) .await .unwrap(); @@ -45,10 +43,8 @@ async fn update_collab_member_permission_test() { let uid = c.get_profile().await.unwrap().uid; c.create_collab(CreateCollabParams::new( - &object_id, - CollabType::Document, - raw_data.clone(), workspace_id.clone(), + CollabParams::new(&object_id, CollabType::Document, raw_data.clone()), )) .await .unwrap(); @@ -81,10 +77,8 @@ async fn add_collab_member_test() { let object_id = Uuid::new_v4().to_string(); c_1 .create_collab(CreateCollabParams::new( - &object_id, - CollabType::Document, - vec![0; 10], workspace_id.clone(), + CollabParams::new(&object_id, CollabType::Document, vec![0; 10]), )) .await .unwrap(); @@ -126,10 +120,8 @@ async fn add_collab_member_then_remove_test() { let object_id = Uuid::new_v4().to_string(); c_1 .create_collab(CreateCollabParams::new( - &object_id, - CollabType::Document, - vec![0; 10], workspace_id.clone(), + CollabParams::new(&object_id, CollabType::Document, vec![0; 10]), )) .await .unwrap(); diff --git a/tests/collab/storage_test.rs b/tests/collab/storage_test.rs index e8baa459..ff993651 100644 --- a/tests/collab/storage_test.rs +++ b/tests/collab/storage_test.rs @@ -7,7 +7,8 @@ use std::collections::HashMap; use app_error::ErrorCode; use collab_entity::CollabType; use database_entity::dto::{ - CreateCollabParams, DeleteCollabParams, QueryCollab, QueryCollabParams, QueryCollabResult, + CollabParams, CreateCollabParams, DeleteCollabParams, QueryCollab, QueryCollabParams, + QueryCollabResult, }; use sqlx::types::Uuid; @@ -21,10 +22,8 @@ async fn success_insert_collab_test() { let workspace_id = workspace_id_from_client(&c).await; let object_id = Uuid::new_v4().to_string(); c.create_collab(CreateCollabParams::new( - &object_id, - CollabType::Document, - encoded_collab_v1, workspace_id.clone(), + CollabParams::new(&object_id, CollabType::Document, encoded_collab_v1), )) .await .unwrap(); @@ -75,10 +74,8 @@ async fn success_batch_get_collab_test() { ); c.create_collab(CreateCollabParams::new( - &object_id, - collab_type, - raw_data.clone(), workspace_id.clone(), + CollabParams::new(&object_id, collab_type, raw_data.clone()), )) .await .unwrap(); @@ -130,10 +127,8 @@ async fn success_part_batch_get_collab_test() { }, ); c.create_collab(CreateCollabParams::new( - &object_id, - collab_type, - raw_data.clone(), workspace_id.clone(), + CollabParams::new(&object_id, collab_type, raw_data.clone()), )) .await .unwrap(); @@ -153,10 +148,8 @@ async fn success_delete_collab_test() { let workspace_id = workspace_id_from_client(&c).await; let object_id = Uuid::new_v4().to_string(); c.create_collab(CreateCollabParams::new( - object_id.clone(), - CollabType::Document, - raw_data.clone(), workspace_id.clone(), + CollabParams::new(&object_id, CollabType::Document, raw_data.clone()), )) .await .unwrap(); @@ -186,10 +179,8 @@ async fn fail_insert_collab_with_empty_payload_test() { let workspace_id = workspace_id_from_client(&c).await; let error = c .create_collab(CreateCollabParams::new( - Uuid::new_v4().to_string(), - CollabType::Document, - vec![], workspace_id, + CollabParams::new(Uuid::new_v4(), CollabType::Document, vec![]), )) .await .unwrap_err(); @@ -204,10 +195,8 @@ async fn fail_insert_collab_with_invalid_workspace_id_test() { let raw_data = "hello world".to_string().as_bytes().to_vec(); let error = c .create_collab(CreateCollabParams::new( - Uuid::new_v4().to_string(), - CollabType::Document, - raw_data.clone(), workspace_id, + CollabParams::new(Uuid::new_v4(), CollabType::Document, raw_data.clone()), )) .await .unwrap_err(); diff --git a/tests/util/test_client.rs b/tests/util/test_client.rs index bad0d57a..4f16a23a 100644 --- a/tests/util/test_client.rs +++ b/tests/util/test_client.rs @@ -418,10 +418,8 @@ impl TestClient { self .api_client .create_collab(CreateCollabParams::new( - &object_id, - collab_type.clone(), - encoded_collab_v1, workspace_id.to_string(), + CollabParams::new(&object_id, collab_type.clone(), encoded_collab_v1), )) .await .unwrap();