diff --git a/Cargo.lock b/Cargo.lock index 3904f24d..a6206750 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -635,6 +635,7 @@ dependencies = [ "derive_more", "dotenvy", "fancy-regex 0.11.0", + "flate2", "futures", "futures-lite", "futures-util", diff --git a/Cargo.toml b/Cargo.toml index 4c90f48d..bc83bbe7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -158,6 +158,7 @@ md5.workspace = true [dev-dependencies] +flate2 = "1.0" once_cell = "1.19.0" tempfile = "3.9.0" assert-json-diff = "2.0.2" diff --git a/services/appflowy-collaborate/src/group/group_init.rs b/services/appflowy-collaborate/src/group/group_init.rs index 57df589f..dcf4f6b3 100644 --- a/services/appflowy-collaborate/src/group/group_init.rs +++ b/services/appflowy-collaborate/src/group/group_init.rs @@ -291,6 +291,7 @@ impl CollabGroup { async fn snapshot_task(state: Arc, interval: Duration, is_new_collab: bool) { if is_new_collab { + tracing::trace!("persisting new collab for {}", state.object_id); if let Err(err) = state.persister.save().await { tracing::warn!( "failed to persist new document `{}`: {}", diff --git a/tests/collab/asset/automerge-paper.json.gz b/tests/collab/asset/automerge-paper.json.gz new file mode 100644 index 00000000..aa7f67ec Binary files /dev/null and b/tests/collab/asset/automerge-paper.json.gz differ diff --git a/tests/collab/mod.rs b/tests/collab/mod.rs index b7b38015..a7ae26bb 100644 --- a/tests/collab/mod.rs +++ b/tests/collab/mod.rs @@ -6,5 +6,6 @@ mod multi_devices_edit; mod permission_test; mod single_device_edit; mod storage_test; +mod stress_test; pub mod util; mod web_edit; diff --git a/tests/collab/stress_test.rs b/tests/collab/stress_test.rs new file mode 100644 index 00000000..ced47472 --- /dev/null +++ b/tests/collab/stress_test.rs @@ -0,0 +1,64 @@ +use std::sync::Arc; +use std::time::Duration; + +use collab_entity::CollabType; +use serde_json::json; +use tokio::time::sleep; +use uuid::Uuid; + +use client_api_test::{assert_server_collab, TestClient}; + +use super::util::TestScenario; + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn run_multiple_text_edits() { + let test_scenario = Arc::new(TestScenario::open( + "./tests/collab/asset/automerge-paper.json.gz", + )); + let mut tasks = Vec::new(); + for _i in 0..1 { + let test_scenario = test_scenario.clone(); + let task = tokio::spawn(async move { + let mut new_user = TestClient::new_user().await; + // sleep 2 secs to make sure it do not trigger register user too fast in gotrue + sleep(Duration::from_secs(2)).await; + + let object_id = Uuid::new_v4().to_string(); + let workspace_id = new_user.workspace_id().await; + // the big doc_state will force the init_sync using the http request. + // It will trigger the POST_REALTIME_MESSAGE_STREAM_HANDLER to handle the request. + new_user + .open_collab(&workspace_id, &object_id, CollabType::Unknown) + .await; + + let collab = new_user.collabs.get(&object_id).unwrap().collab.clone(); + test_scenario.execute(collab).await; + + new_user + .wait_object_sync_complete(&object_id) + .await + .unwrap(); + (new_user, object_id, workspace_id) + }); + tasks.push(task); + } + + let results = futures::future::join_all(tasks).await; + for result in results.into_iter() { + let (mut client, object_id, workspace_id) = result.unwrap(); + assert_server_collab( + &workspace_id, + &mut client.api_client, + &object_id, + &CollabType::Document, + 10, + json!({ + "text-id": &test_scenario.end_content, + }), + ) + .await + .unwrap(); + + drop(client); + } +} diff --git a/tests/collab/util.rs b/tests/collab/util.rs index d7806d74..43f82561 100644 --- a/tests/collab/util.rs +++ b/tests/collab/util.rs @@ -72,3 +72,94 @@ pub async fn redis_connection_manager() -> ConnectionManager { } } } + +use std::io::{BufReader, Read}; + +use collab::preclude::MapExt; +use flate2::bufread::GzDecoder; +use serde::Deserialize; +use yrs::{GetString, Text, TextRef}; + +use client_api_test::CollabRef; + +/// (position, delete length, insert content). +#[derive(Debug, Clone, Deserialize, Eq, PartialEq)] +pub struct TestPatch(pub usize, pub usize, pub String); + +#[derive(Debug, Clone, Deserialize, Eq, PartialEq)] +pub struct TestTxn { + // time: String, // ISO String. Unused. + pub patches: Vec, +} + +#[derive(Debug, Clone, Deserialize, Eq, PartialEq)] +pub struct TestScenario { + #[serde(default)] + pub using_byte_positions: bool, + + #[serde(rename = "startContent")] + pub start_content: String, + #[serde(rename = "endContent")] + pub end_content: String, + + pub txns: Vec, +} + +impl TestScenario { + /// Load the testing data at the specified file. If the filename ends in .gz, it will be + /// transparently uncompressed. + /// + /// This method panics if the file does not exist, or is corrupt. It'd be better to have a try_ + /// variant of this method, but given this is mostly for benchmarking and testing, I haven't felt + /// the need to write that code. + pub fn open(fpath: &str) -> TestScenario { + // let start = SystemTime::now(); + // let mut file = File::open("benchmark_data/automerge-paper.json.gz").unwrap(); + let file = std::fs::File::open(fpath).unwrap(); + + let mut reader = BufReader::new(file); + // We could pass the GzDecoder straight to serde, but it makes it way slower to parse for + // some reason. + let mut raw_json = vec![]; + + if fpath.ends_with(".gz") { + let mut reader = GzDecoder::new(reader); + reader.read_to_end(&mut raw_json).unwrap(); + } else { + reader.read_to_end(&mut raw_json).unwrap(); + } + + let data: TestScenario = serde_json::from_reader(raw_json.as_slice()).unwrap(); + data + } + + pub async fn execute(&self, collab: CollabRef) { + for t in self.txns.iter() { + let mut lock = collab.write().await; + let collab = lock.borrow_mut(); + let mut txn = collab.context.transact_mut(); + let txt = collab.data.get_or_init_text(&mut txn, "text-id"); + for patch in t.patches.iter() { + let at = patch.0; + let delete = patch.1; + let content = patch.2.as_str(); + + if delete != 0 { + txt.remove_range(&mut txn, at as u32, delete as u32); + } + if !content.is_empty() { + txt.insert(&mut txn, at as u32, content); + } + } + } + + // validate after applying all patches + let expected = self.end_content.as_str(); + let lock = collab.read().await; + let collab = lock.borrow(); + let txn = collab.context.transact(); + let txt: TextRef = collab.data.get_with_txn(&txn, "text-id").unwrap(); + let actual = txt.get_string(&txn); + assert_eq!(actual, expected); + } +}