From 0032b7e08e39d1de3d20eb678999fb09909d12e1 Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Wed, 23 Oct 2024 06:45:11 +0200 Subject: [PATCH] chore: better stress test scenario --- docker-compose.yml | 12 --- .../src/collab_sync/sync_control.rs | 10 ++- libs/collab-rt-entity/src/server_message.rs | 3 +- .../src/group/group_init.rs | 18 +++-- tests/collab/stress_test.rs | 81 +++++++++++-------- tests/collab/util.rs | 3 +- 6 files changed, 71 insertions(+), 56 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 043bab74..44466f9b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -141,18 +141,6 @@ services: - APPFLOWY_AI_SERVER_PORT=${APPFLOWY_AI_SERVER_PORT} - APPFLOWY_AI_DATABASE_URL=${APPFLOWY_AI_DATABASE_URL} - appflowy_history: - restart: on-failure - image: appflowyinc/appflowy_history:${APPFLOWY_HISTORY_VERSION:-latest} - build: - context: . - dockerfile: ./services/appflowy-history/Dockerfile - environment: - - RUST_LOG=${RUST_LOG:-info} - - APPFLOWY_HISTORY_REDIS_URL=redis://redis:6379 - - APPFLOWY_HISTORY_ENVIRONMENT=production - - APPFLOWY_HISTORY_DATABASE_URL=${APPFLOWY_HISTORY_DATABASE_URL} - appflowy_worker: restart: on-failure image: appflowyinc/appflowy_worker:${APPFLOWY_WORKER_VERSION:-latest} diff --git a/libs/client-api/src/collab_sync/sync_control.rs b/libs/client-api/src/collab_sync/sync_control.rs index 99437da0..a7d282a8 100644 --- a/libs/client-api/src/collab_sync/sync_control.rs +++ b/libs/client-api/src/collab_sync/sync_control.rs @@ -200,7 +200,11 @@ where return Ok(false); } - trace!("🔥{} start sync, reason:{}", &sync_object.object_id, reason); + tracing::debug!( + "🔥{} restart sync due to missing update, reason:{}", + &sync_object.object_id, + reason + ); let awareness = collab.get_awareness(); let payload = gen_sync_state(awareness, &ClientSyncProtocol)?; sink.queue_init_sync(|msg_id| { @@ -237,8 +241,8 @@ where SyncReason::CollabInitialize | SyncReason::ServerCannotApplyUpdate | SyncReason::NetworkResume => { - trace!( - "🔥{} start sync, reason: {}", + tracing::debug!( + "🔥{} resume network, reason: {}", &sync_object.object_id, reason ); diff --git a/libs/collab-rt-entity/src/server_message.rs b/libs/collab-rt-entity/src/server_message.rs index e5bc26de..94838bb6 100644 --- a/libs/collab-rt-entity/src/server_message.rs +++ b/libs/collab-rt-entity/src/server_message.rs @@ -245,12 +245,13 @@ impl AckMeta { impl Display for CollabAck { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_fmt(format_args!( - "ack: [uid:{}|oid:{}|msg_id:{:?}|len:{}|code:{}]", + "ack: [uid:{}|oid:{}|msg_id:{:?}|len:{}|code:{}|seq_nr:{}]", self.origin.client_user_id().unwrap_or(0), self.object_id, self.msg_id, self.payload.len(), self.code, + self.seq_num )) } } diff --git a/services/appflowy-collaborate/src/group/group_init.rs b/services/appflowy-collaborate/src/group/group_init.rs index bda7f421..8782e335 100644 --- a/services/appflowy-collaborate/src/group/group_init.rs +++ b/services/appflowy-collaborate/src/group/group_init.rs @@ -195,18 +195,20 @@ impl CollabGroup { } async fn handle_inbound_update(state: &CollabGroupState, update: CollabStreamUpdate) { - tracing::trace!( - "broadcasting collab update from {} ({} bytes)", - update.sender, - update.data.len() - ); // update state vector based on incoming message let mut sv = state.state_vector.write().await; sv.merge(update.state_vector); drop(sv); let seq_num = state.seq_no.fetch_add(1, Ordering::SeqCst); - let message = BroadcastSync::new(update.sender, state.object_id.clone(), update.data, seq_num); + tracing::trace!( + "broadcasting collab update from {} ({} bytes) - seq_num: {}", + update.sender, + update.data.len(), + seq_num + ); + let payload = Message::Sync(SyncMessage::Update(update.data)).encode_v1(); + let message = BroadcastSync::new(update.sender, state.object_id.clone(), payload, seq_num); for mut e in state.subscribers.iter_mut() { let subscription = e.value_mut(); if message.origin == subscription.collab_origin { @@ -625,6 +627,7 @@ impl CollabGroup { } // we need to reconstruct document state on the server side + tracing::debug!("loading collab {}", state.object_id); let snapshot = state .persister .load() @@ -890,7 +893,7 @@ impl CollabPersister { }; let msg_id = self.awareness_sink.send(&update).await?; tracing::trace!( - "persisted update from {} ({} bytes) - msg id: {}", + "persisted awareness from {} ({} bytes) - msg id: {}", update.sender, len, msg_id @@ -961,6 +964,7 @@ impl CollabPersister { } async fn save(&self) -> Result<(), RealtimeError> { + tracing::debug!("requesting save for collab {}", self.object_id); let mut snapshot = self.load_internal(true).await?; if let Some(message_id) = &snapshot.last_message_id { // non-nil message_id means that we had to update the most recent collab state snapshot diff --git a/tests/collab/stress_test.rs b/tests/collab/stress_test.rs index ced47472..ea802c31 100644 --- a/tests/collab/stress_test.rs +++ b/tests/collab/stress_test.rs @@ -6,59 +6,76 @@ use serde_json::json; use tokio::time::sleep; use uuid::Uuid; -use client_api_test::{assert_server_collab, TestClient}; - use super::util::TestScenario; +use client_api_test::{assert_server_collab, TestClient}; +use database_entity::dto::AFRole; #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn run_multiple_text_edits() { + const READER_COUNT: usize = 1; let test_scenario = Arc::new(TestScenario::open( "./tests/collab/asset/automerge-paper.json.gz", )); - let mut tasks = Vec::new(); - for _i in 0..1 { - let test_scenario = test_scenario.clone(); - let task = tokio::spawn(async move { - let mut new_user = TestClient::new_user().await; - // sleep 2 secs to make sure it do not trigger register user too fast in gotrue - sleep(Duration::from_secs(2)).await; + // create writer + let mut writer = TestClient::new_user().await; + sleep(Duration::from_secs(2)).await; // sleep 2 secs to make sure it do not trigger register user too fast in gotrue - let object_id = Uuid::new_v4().to_string(); - let workspace_id = new_user.workspace_id().await; - // the big doc_state will force the init_sync using the http request. - // It will trigger the POST_REALTIME_MESSAGE_STREAM_HANDLER to handle the request. - new_user - .open_collab(&workspace_id, &object_id, CollabType::Unknown) - .await; + let object_id = Uuid::new_v4().to_string(); + let workspace_id = writer.workspace_id().await; - let collab = new_user.collabs.get(&object_id).unwrap().collab.clone(); - test_scenario.execute(collab).await; + writer + .open_collab(&workspace_id, &object_id, CollabType::Unknown) + .await; - new_user - .wait_object_sync_complete(&object_id) - .await - .unwrap(); - (new_user, object_id, workspace_id) - }); - tasks.push(task); + // create readers and invite them into the same workspace + let mut readers = Vec::with_capacity(READER_COUNT); + for _ in 0..READER_COUNT { + let mut reader = TestClient::new_user().await; + sleep(Duration::from_secs(2)).await; // sleep 2 secs to make sure it do not trigger register user too fast in gotrue + writer + .invite_and_accepted_workspace_member(&workspace_id, &reader, AFRole::Member) + .await + .unwrap(); + + reader + .open_collab(&workspace_id, &object_id, CollabType::Unknown) + .await; + + readers.push(reader); } + // run test scenario + let collab = writer.collabs.get(&object_id).unwrap().collab.clone(); + let expected = test_scenario.execute(collab).await; + + // wait for the writer to complete sync + writer.wait_object_sync_complete(&object_id).await.unwrap(); + + // wait for the readers to complete sync + let mut tasks = Vec::with_capacity(READER_COUNT); + for reader in readers.iter() { + let fut = reader.wait_object_sync_complete(&object_id); + tasks.push(fut); + } let results = futures::future::join_all(tasks).await; - for result in results.into_iter() { - let (mut client, object_id, workspace_id) = result.unwrap(); + + // make sure that the readers are in correct state + for res in results { + res.unwrap(); + } + + for mut reader in readers.drain(..) { assert_server_collab( &workspace_id, - &mut client.api_client, + &mut reader.api_client, &object_id, - &CollabType::Document, + &CollabType::Unknown, 10, json!({ - "text-id": &test_scenario.end_content, + "text-id": &expected, }), ) .await .unwrap(); - - drop(client); } } diff --git a/tests/collab/util.rs b/tests/collab/util.rs index 43f82561..7994ceb6 100644 --- a/tests/collab/util.rs +++ b/tests/collab/util.rs @@ -133,7 +133,7 @@ impl TestScenario { data } - pub async fn execute(&self, collab: CollabRef) { + pub async fn execute(&self, collab: CollabRef) -> String { for t in self.txns.iter() { let mut lock = collab.write().await; let collab = lock.borrow_mut(); @@ -161,5 +161,6 @@ impl TestScenario { let txt: TextRef = collab.data.get_with_txn(&txn, "text-id").unwrap(); let actual = txt.get_string(&txn); assert_eq!(actual, expected); + actual } }