diff --git a/Cargo.lock b/Cargo.lock index 4bfabec3..1aa62465 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1282,7 +1282,7 @@ dependencies = [ [[package]] name = "collab" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=8e17ba3bdfdee754a5dbbb1c65d630b551e5b268#8e17ba3bdfdee754a5dbbb1c65d630b551e5b268" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=0e117f568bd2465762582f6aeb8d9c11fe714e63#0e117f568bd2465762582f6aeb8d9c11fe714e63" dependencies = [ "anyhow", "async-trait", @@ -1301,7 +1301,7 @@ dependencies = [ [[package]] name = "collab-derive" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=8e17ba3bdfdee754a5dbbb1c65d630b551e5b268#8e17ba3bdfdee754a5dbbb1c65d630b551e5b268" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=0e117f568bd2465762582f6aeb8d9c11fe714e63#0e117f568bd2465762582f6aeb8d9c11fe714e63" dependencies = [ "proc-macro2", "quote", @@ -1313,7 +1313,7 @@ dependencies = [ [[package]] name = "collab-document" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=8e17ba3bdfdee754a5dbbb1c65d630b551e5b268#8e17ba3bdfdee754a5dbbb1c65d630b551e5b268" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=0e117f568bd2465762582f6aeb8d9c11fe714e63#0e117f568bd2465762582f6aeb8d9c11fe714e63" dependencies = [ "anyhow", "collab", @@ -1332,7 +1332,7 @@ dependencies = [ [[package]] name = "collab-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=8e17ba3bdfdee754a5dbbb1c65d630b551e5b268#8e17ba3bdfdee754a5dbbb1c65d630b551e5b268" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=0e117f568bd2465762582f6aeb8d9c11fe714e63#0e117f568bd2465762582f6aeb8d9c11fe714e63" dependencies = [ "anyhow", "bytes", @@ -1346,7 +1346,7 @@ dependencies = [ [[package]] name = "collab-folder" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=8e17ba3bdfdee754a5dbbb1c65d630b551e5b268#8e17ba3bdfdee754a5dbbb1c65d630b551e5b268" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=0e117f568bd2465762582f6aeb8d9c11fe714e63#0e117f568bd2465762582f6aeb8d9c11fe714e63" dependencies = [ "anyhow", "chrono", @@ -1366,7 +1366,7 @@ dependencies = [ [[package]] name = "collab-persistence" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=8e17ba3bdfdee754a5dbbb1c65d630b551e5b268#8e17ba3bdfdee754a5dbbb1c65d630b551e5b268" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=0e117f568bd2465762582f6aeb8d9c11fe714e63#0e117f568bd2465762582f6aeb8d9c11fe714e63" dependencies = [ "anyhow", "async-trait", @@ -3209,9 +3209,9 @@ dependencies = [ [[package]] name = "openssl" -version = "0.10.57" +version = "0.10.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bac25ee399abb46215765b1cb35bc0212377e58a061560d8b29b024fd0430e7c" +checksum = "6b8419dc8cc6d866deb801274bba2e6f8f6108c1bb7fcc10ee5ab864931dbb45" dependencies = [ "bitflags 2.4.0", "cfg-if", @@ -3250,9 +3250,9 @@ dependencies = [ [[package]] name = "openssl-sys" -version = "0.9.93" +version = "0.9.97" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db4d56a4c0478783083cfafcc42493dd4a981d41669da64b4572a2a089b51b1d" +checksum = "c3eaad34cdd97d81de97964fc7f29e2d104f483840d906ef56daa1912338460b" dependencies = [ "cc", "libc", @@ -4390,12 +4390,12 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.101.5" +version = "0.101.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45a27e3b59326c16e23d30aeb7a36a24cc0d29e71d68ff611cdfb4a01d013bed" +checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" dependencies = [ - "ring 0.16.20", - "untrusted 0.7.1", + "ring 0.17.7", + "untrusted 0.9.0", ] [[package]] @@ -6171,18 +6171,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.7.26" +version = "0.7.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e97e415490559a91254a2979b4829267a57d2fcd741a98eee8b722fb57289aa0" +checksum = "1c4061bedbb353041c12f413700357bec76df2c7e2ca8e4df8bac24c6bf68e3d" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.7.26" +version = "0.7.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd7e48ccf166952882ca8bd778a43502c64f33bf94c12ebe2a7f08e5a0f6689f" +checksum = "b3c129550b3e6de3fd0ba67ba5c81818f9805e58b8d7fee80a3a59d2c9fc601a" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 89f46a95..ad52407f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -156,10 +156,10 @@ lto = false opt-level = 3 [patch.crates-io] -collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "8e17ba3bdfdee754a5dbbb1c65d630b551e5b268" } -collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "8e17ba3bdfdee754a5dbbb1c65d630b551e5b268" } -collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "8e17ba3bdfdee754a5dbbb1c65d630b551e5b268" } -collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "8e17ba3bdfdee754a5dbbb1c65d630b551e5b268" } +collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "0e117f568bd2465762582f6aeb8d9c11fe714e63" } +collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "0e117f568bd2465762582f6aeb8d9c11fe714e63" } +collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "0e117f568bd2465762582f6aeb8d9c11fe714e63" } +collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "0e117f568bd2465762582f6aeb8d9c11fe714e63" } # Comment the above and uncomment the below to use local version of collab by cloning the repo and placing it in libs folder #collab = { path = "libs/AppFlowy-Collab/collab" } diff --git a/libs/realtime/src/client.rs b/libs/realtime/src/client.rs index 080471da..6310b910 100644 --- a/libs/realtime/src/client.rs +++ b/libs/realtime/src/client.rs @@ -15,7 +15,7 @@ use std::time::{Duration, Instant}; use database_entity::pg_row::AFUserNotification; use realtime_entity::user::{AFUserChange, UserMessage}; -use tracing::{error, trace}; +use tracing::{debug, error, trace}; pub struct ClientSession< U: Unpin + RealtimeUser, @@ -209,6 +209,11 @@ where let _ = self.forward_binary(bytes); }, ws::Message::Close(reason) => { + debug!( + "Websocket closing for ({:?}): {:?}", + self.user.uid(), + reason + ); ctx.close(reason); ctx.stop(); }, diff --git a/libs/realtime/src/collaborate/plugin.rs b/libs/realtime/src/collaborate/plugin.rs index 7dfa607a..abf14a26 100644 --- a/libs/realtime/src/collaborate/plugin.rs +++ b/libs/realtime/src/collaborate/plugin.rs @@ -10,6 +10,7 @@ use collab::core::awareness::Awareness; use collab::core::collab::TransactionMutExt; use collab::core::collab_plugin::EncodedCollabV1; use collab::core::origin::CollabOrigin; +use collab::core::transaction::DocTransactionExtension; use collab::preclude::{CollabPlugin, Doc, TransactionMut}; use collab_entity::CollabType; use database::collab::CollabStorage; @@ -23,8 +24,7 @@ use tokio::time::interval; use tracing::{debug, error, event, info, instrument, trace}; use yrs::updates::decoder::Decode; -use yrs::updates::encoder::Encode; -use yrs::{ReadTxn, StateVector, Transact, Update}; +use yrs::{Transact, Update}; pub struct CollabStoragePlugin { uid: i64, @@ -73,7 +73,7 @@ where object_id, self.uid ); - match encoded_v1_from_doc(doc).encode_to_bytes() { + match doc.get_encoded_collab_v1().encode_to_bytes() { Ok(encoded_collab_v1) => { let _ = self .access_control @@ -229,7 +229,6 @@ where AppError::RecordNotFound(_) => { // When attempting to retrieve collaboration data from the disk and a 'Record Not Found' error is returned, // this indicates that the collaboration is new. Therefore, the current collaboration data should be saved to disk. - event!( tracing::Level::DEBUG, "New collab object, insert collab to db" @@ -263,7 +262,7 @@ where } fn flush(&self, object_id: &str, doc: &Doc) { - let encoded_collab_v1 = match encoded_v1_from_doc(doc).encode_to_bytes() { + let encoded_collab_v1 = match doc.get_encoded_collab_v1().encode_to_bytes() { Ok(data) => data, Err(err) => { error!("Error encoding: {:?}", err); @@ -291,13 +290,6 @@ where } } -fn encoded_v1_from_doc(doc: &Doc) -> EncodedCollabV1 { - let txn = doc.transact(); - let state_vector = txn.state_vector().encode_v1(); - let doc_state = txn.encode_state_as_update_v1(&StateVector::default()); - EncodedCollabV1::new(state_vector, doc_state) -} - async fn get_latest_snapshot(object_id: &str, storage: &S) -> Option where S: CollabStorage, diff --git a/tests/collab/single_device_edit.rs b/tests/collab/single_device_edit.rs index 423023c0..5d14fa61 100644 --- a/tests/collab/single_device_edit.rs +++ b/tests/collab/single_device_edit.rs @@ -1,7 +1,56 @@ -use crate::util::test_client::{assert_server_collab, TestClient}; +use crate::util::test_client::{ + assert_client_collab_include_value, assert_server_collab, TestClient, +}; use collab_entity::CollabType; +use database_entity::dto::AFAccessLevel; use serde_json::json; +use uuid::Uuid; + +#[tokio::test] +async fn collab_storage_plugin_write_test() { + let collab_type = CollabType::Document; + let mut test_client = TestClient::new_user().await; + let workspace_id = test_client.workspace_id().await; + let object_id = Uuid::new_v4().to_string(); + + // Calling the open_collab function directly will create the collab object in the plugin. + // The [CollabStoragePlugin] plugin try to get the collab object from the database, but it doesn't exist. + // So the plugin will create the collab object. + test_client + .open_collab(&workspace_id, &object_id, collab_type.clone()) + .await; + + // Edit the collab + for i in 0..=5 { + test_client + .collab_by_object_id + .get_mut(&object_id) + .unwrap() + .collab + .lock() + .insert(&i.to_string(), i.to_string()); + } + test_client.wait_object_sync_complete(&object_id).await; + test_client.disconnect().await; + + assert_server_collab( + &workspace_id, + &mut test_client.api_client, + &object_id, + &collab_type, + 10, + json!( { + "0": "0", + "1": "1", + "2": "2", + "3": "3", + "4": "4", + "5": "5", + }), + ) + .await; +} #[tokio::test] async fn realtime_write_single_collab_test() { @@ -98,129 +147,134 @@ async fn realtime_write_multiple_collab_test() { } // -// #[tokio::test] -// async fn user_with_duplicate_devices_connect_edit_test() { -// let object_id = uuid::Uuid::new_v4().to_string(); -// let collab_type = CollabType::Document; -// let registered_user = generate_unique_registered_user().await; -// -// // Client_1_2 will force the server to disconnect client_1_1. So any changes made by client_1_1 -// // will not be saved to the server. -// let device_id = Uuid::new_v4().to_string(); -// let mut client_1_1 = TestClient::new(device_id.clone(), registered_user.clone()).await; -// let workspace_id = client_1_1.current_workspace_id().await; -// -// client_1_1 -// .create_collab(&workspace_id, &object_id, collab_type.clone()) -// .await; -// -// client_1_1 -// .collab_by_object_id -// .get_mut(&object_id) -// .unwrap() -// .collab -// .lock() -// .insert("1", "a"); -// client_1_1 -// .collab_by_object_id -// .get_mut(&object_id) -// .unwrap() -// .collab -// .lock() -// .insert("3", "c"); -// client_1_1.wait_object_sync_complete(&object_id).await; -// -// let mut client_1_2 = TestClient::new(device_id.clone(), registered_user.clone()).await; -// client_1_2 -// .create_collab(&workspace_id, &object_id, collab_type.clone()) -// .await; -// client_1_2 -// .collab_by_object_id -// .get_mut(&object_id) -// .unwrap() -// .collab -// .lock() -// .insert("2", "b"); -// client_1_2.wait_object_sync_complete(&object_id).await; -// -// assert_client_collab( -// &mut client_1_1, -// &object_id, -// 10, -// json!({ -// "1": "a", -// "3": "c" -// }), -// ) -// .await; -// -// assert_client_collab( -// &mut client_1_2, -// &object_id, -// 10, -// json!({ -// "1": "a", -// "3": "c", -// "2": "b" -// }), -// ) -// .await; -// -// assert_remote_collab_json( -// &mut client_1_2.api_client, -// &object_id, -// &collab_type, -// 5, -// json!({ -// "1": "a", -// "2": "b", -// "3": "c" -// }), -// ) -// .await; -// } +#[tokio::test] +async fn user_with_duplicate_devices_connect_edit_test() { + let collab_type = CollabType::Document; + let mut old_client = TestClient::new_user().await; + let workspace_id = old_client.workspace_id().await; -// #[tokio::test] -// async fn two_direction_peer_sync_test() { -// let object_id = uuid::Uuid::new_v4().to_string(); -// let collab_type = CollabType::Document; -// -// let mut client_1 = TestClient::new_user().await; -// let workspace_id = client_1.current_workspace_id().await; -// client_1 -// .create_collab(&workspace_id, &object_id, collab_type.clone()) -// .await; -// -// let mut client_2 = TestClient::new_user().await; -// client_2 -// .create_collab(&workspace_id, &object_id, collab_type.clone()) -// .await; -// -// client_1 -// .collab_by_object_id -// .get_mut(&object_id) -// .unwrap() -// .collab -// .lock() -// .insert("name", "AppFlowy"); -// client_1.wait_object_sync_complete(&object_id).await; -// -// client_2 -// .collab_by_object_id -// .get_mut(&object_id) -// .unwrap() -// .collab -// .lock() -// .insert("support platform", "macOS, Windows, Linux, iOS, Android"); -// client_2.wait_object_sync_complete(&object_id).await; -// -// let expected_json = json!({ -// "name": "AppFlowy", -// "support platform": "macOS, Windows, Linux, iOS, Android" -// }); -// assert_client_collab(&mut client_1, &object_id, expected_json.clone()).await; -// assert_client_collab(&mut client_2, &object_id, expected_json.clone()).await; -// } + let object_id = old_client + .create_collab(&workspace_id, collab_type.clone()) + .await; + + old_client + .collab_by_object_id + .get_mut(&object_id) + .unwrap() + .collab + .lock() + .insert("1", "a"); + old_client + .collab_by_object_id + .get_mut(&object_id) + .unwrap() + .collab + .lock() + .insert("3", "c"); + old_client.wait_object_sync_complete(&object_id).await; + + let mut new_client = + TestClient::new(old_client.device_id.clone(), old_client.user.clone(), true).await; + new_client + .open_collab(&workspace_id, &object_id, collab_type.clone()) + .await; + new_client + .collab_by_object_id + .get_mut(&object_id) + .unwrap() + .collab + .lock() + .insert("2", "b"); + new_client.wait_object_sync_complete(&object_id).await; + + // Old client shouldn't receive the new client's edit + assert_client_collab_include_value( + &mut old_client, + &object_id, + json!({ + "1": "a", + "3": "c" + }), + ) + .await; + + assert_client_collab_include_value( + &mut new_client, + &object_id, + json!({ + "1": "a", + "3": "c", + "2": "b" + }), + ) + .await; + + assert_server_collab( + &workspace_id, + &mut new_client.api_client, + &object_id, + &collab_type, + 10, + json!({ + "1": "a", + "2": "b", + "3": "c" + }), + ) + .await; +} + +#[tokio::test] +async fn two_direction_peer_sync_test() { + let collab_type = CollabType::Document; + + let mut client_1 = TestClient::new_user().await; + let workspace_id = client_1.workspace_id().await; + let object_id = client_1 + .create_collab(&workspace_id, collab_type.clone()) + .await; + + let mut client_2 = TestClient::new_user().await; + // Before the client_2 want to edit the collab object, it needs to become a member of the collab + // Otherwise, the server will reject the edit request + client_1 + .add_client_as_collab_member( + &workspace_id, + &object_id, + &client_2, + AFAccessLevel::FullAccess, + ) + .await; + + client_2 + .open_collab(&workspace_id, &object_id, collab_type.clone()) + .await; + + client_1 + .collab_by_object_id + .get_mut(&object_id) + .unwrap() + .collab + .lock() + .insert("name", "AppFlowy"); + client_1.wait_object_sync_complete(&object_id).await; + + client_2 + .collab_by_object_id + .get_mut(&object_id) + .unwrap() + .collab + .lock() + .insert("support platform", "macOS, Windows, Linux, iOS, Android"); + client_2.wait_object_sync_complete(&object_id).await; + + let expected_json = json!({ + "name": "AppFlowy", + "support platform": "macOS, Windows, Linux, iOS, Android" + }); + assert_client_collab_include_value(&mut client_1, &object_id, expected_json.clone()).await; + assert_client_collab_include_value(&mut client_2, &object_id, expected_json.clone()).await; +} #[tokio::test] async fn multiple_collab_edit_test() { diff --git a/tests/util/test_client.rs b/tests/util/test_client.rs index 860796c3..ea5c65fa 100644 --- a/tests/util/test_client.rs +++ b/tests/util/test_client.rs @@ -36,6 +36,7 @@ use crate::user::utils::{generate_unique_registered_user, User}; use crate::util::setup_log; pub(crate) struct TestClient { + pub user: User, pub ws_client: WSClient, pub api_client: client_api::Client, pub collab_by_object_id: HashMap, @@ -47,7 +48,7 @@ pub(crate) struct TestCollab { pub collab: Arc, } impl TestClient { - pub(crate) async fn new(device_id: String, registered_user: User, invoke_ws_conn: bool) -> Self { + pub(crate) async fn new(device_id: String, registered_user: User, start_ws_conn: bool) -> Self { setup_log(); let api_client = localhost_client(); api_client @@ -65,13 +66,14 @@ impl TestClient { api_client.clone(), ); - if invoke_ws_conn { + if start_ws_conn { ws_client .connect(api_client.ws_url(&device_id).unwrap(), &device_id) .await .unwrap(); } Self { + user: registered_user, ws_client, api_client, collab_by_object_id: Default::default(),