chore: better stress test scenario

This commit is contained in:
Bartosz Sypytkowski 2024-10-23 06:45:11 +02:00
parent de4016659e
commit fc1a7d28e2
6 changed files with 71 additions and 56 deletions

View File

@ -141,18 +141,6 @@ services:
- APPFLOWY_AI_SERVER_PORT=${APPFLOWY_AI_SERVER_PORT}
- APPFLOWY_AI_DATABASE_URL=${APPFLOWY_AI_DATABASE_URL}
appflowy_history:
restart: on-failure
image: appflowyinc/appflowy_history:${APPFLOWY_HISTORY_VERSION:-latest}
build:
context: .
dockerfile: ./services/appflowy-history/Dockerfile
environment:
- RUST_LOG=${RUST_LOG:-info}
- APPFLOWY_HISTORY_REDIS_URL=redis://redis:6379
- APPFLOWY_HISTORY_ENVIRONMENT=production
- APPFLOWY_HISTORY_DATABASE_URL=${APPFLOWY_HISTORY_DATABASE_URL}
appflowy_worker:
restart: on-failure
image: appflowyinc/appflowy_worker:${APPFLOWY_WORKER_VERSION:-latest}

View File

@ -200,7 +200,11 @@ where
return Ok(false);
}
trace!("🔥{} start sync, reason:{}", &sync_object.object_id, reason);
tracing::debug!(
"🔥{} restart sync due to missing update, reason:{}",
&sync_object.object_id,
reason
);
let awareness = collab.get_awareness();
let payload = gen_sync_state(awareness, &ClientSyncProtocol)?;
sink.queue_init_sync(|msg_id| {
@ -237,8 +241,8 @@ where
SyncReason::CollabInitialize
| SyncReason::ServerCannotApplyUpdate
| SyncReason::NetworkResume => {
trace!(
"🔥{} start sync, reason: {}",
tracing::debug!(
"🔥{} resume network, reason: {}",
&sync_object.object_id,
reason
);

View File

@ -245,12 +245,13 @@ impl AckMeta {
impl Display for CollabAck {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!(
"ack: [uid:{}|oid:{}|msg_id:{:?}|len:{}|code:{}]",
"ack: [uid:{}|oid:{}|msg_id:{:?}|len:{}|code:{}|seq_nr:{}]",
self.origin.client_user_id().unwrap_or(0),
self.object_id,
self.msg_id,
self.payload.len(),
self.code,
self.seq_num
))
}
}

View File

@ -195,18 +195,20 @@ impl CollabGroup {
}
async fn handle_inbound_update(state: &CollabGroupState, update: CollabStreamUpdate) {
tracing::trace!(
"broadcasting collab update from {} ({} bytes)",
update.sender,
update.data.len()
);
// update state vector based on incoming message
let mut sv = state.state_vector.write().await;
sv.merge(update.state_vector);
drop(sv);
let seq_num = state.seq_no.fetch_add(1, Ordering::SeqCst);
let message = BroadcastSync::new(update.sender, state.object_id.clone(), update.data, seq_num);
tracing::trace!(
"broadcasting collab update from {} ({} bytes) - seq_num: {}",
update.sender,
update.data.len(),
seq_num
);
let payload = Message::Sync(SyncMessage::Update(update.data)).encode_v1();
let message = BroadcastSync::new(update.sender, state.object_id.clone(), payload, seq_num);
for mut e in state.subscribers.iter_mut() {
let subscription = e.value_mut();
if message.origin == subscription.collab_origin {
@ -625,6 +627,7 @@ impl CollabGroup {
}
// we need to reconstruct document state on the server side
tracing::debug!("loading collab {}", state.object_id);
let snapshot = state
.persister
.load()
@ -890,7 +893,7 @@ impl CollabPersister {
};
let msg_id = self.awareness_sink.send(&update).await?;
tracing::trace!(
"persisted update from {} ({} bytes) - msg id: {}",
"persisted awareness from {} ({} bytes) - msg id: {}",
update.sender,
len,
msg_id
@ -961,6 +964,7 @@ impl CollabPersister {
}
async fn save(&self) -> Result<(), RealtimeError> {
tracing::debug!("requesting save for collab {}", self.object_id);
let mut snapshot = self.load_internal(true).await?;
if let Some(message_id) = &snapshot.last_message_id {
// non-nil message_id means that we had to update the most recent collab state snapshot

View File

@ -6,59 +6,76 @@ use serde_json::json;
use tokio::time::sleep;
use uuid::Uuid;
use client_api_test::{assert_server_collab, TestClient};
use super::util::TestScenario;
use client_api_test::{assert_server_collab, TestClient};
use database_entity::dto::AFRole;
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn run_multiple_text_edits() {
const READER_COUNT: usize = 1;
let test_scenario = Arc::new(TestScenario::open(
"./tests/collab/asset/automerge-paper.json.gz",
));
let mut tasks = Vec::new();
for _i in 0..1 {
let test_scenario = test_scenario.clone();
let task = tokio::spawn(async move {
let mut new_user = TestClient::new_user().await;
// sleep 2 secs to make sure it do not trigger register user too fast in gotrue
sleep(Duration::from_secs(2)).await;
// create writer
let mut writer = TestClient::new_user().await;
sleep(Duration::from_secs(2)).await; // sleep 2 secs to make sure it do not trigger register user too fast in gotrue
let object_id = Uuid::new_v4().to_string();
let workspace_id = new_user.workspace_id().await;
// the big doc_state will force the init_sync using the http request.
// It will trigger the POST_REALTIME_MESSAGE_STREAM_HANDLER to handle the request.
new_user
.open_collab(&workspace_id, &object_id, CollabType::Unknown)
.await;
let object_id = Uuid::new_v4().to_string();
let workspace_id = writer.workspace_id().await;
let collab = new_user.collabs.get(&object_id).unwrap().collab.clone();
test_scenario.execute(collab).await;
writer
.open_collab(&workspace_id, &object_id, CollabType::Unknown)
.await;
new_user
.wait_object_sync_complete(&object_id)
.await
.unwrap();
(new_user, object_id, workspace_id)
});
tasks.push(task);
// create readers and invite them into the same workspace
let mut readers = Vec::with_capacity(READER_COUNT);
for _ in 0..READER_COUNT {
let mut reader = TestClient::new_user().await;
sleep(Duration::from_secs(2)).await; // sleep 2 secs to make sure it do not trigger register user too fast in gotrue
writer
.invite_and_accepted_workspace_member(&workspace_id, &reader, AFRole::Member)
.await
.unwrap();
reader
.open_collab(&workspace_id, &object_id, CollabType::Unknown)
.await;
readers.push(reader);
}
// run test scenario
let collab = writer.collabs.get(&object_id).unwrap().collab.clone();
let expected = test_scenario.execute(collab).await;
// wait for the writer to complete sync
writer.wait_object_sync_complete(&object_id).await.unwrap();
// wait for the readers to complete sync
let mut tasks = Vec::with_capacity(READER_COUNT);
for reader in readers.iter() {
let fut = reader.wait_object_sync_complete(&object_id);
tasks.push(fut);
}
let results = futures::future::join_all(tasks).await;
for result in results.into_iter() {
let (mut client, object_id, workspace_id) = result.unwrap();
// make sure that the readers are in correct state
for res in results {
res.unwrap();
}
for mut reader in readers.drain(..) {
assert_server_collab(
&workspace_id,
&mut client.api_client,
&mut reader.api_client,
&object_id,
&CollabType::Document,
&CollabType::Unknown,
10,
json!({
"text-id": &test_scenario.end_content,
"text-id": &expected,
}),
)
.await
.unwrap();
drop(client);
}
}

View File

@ -133,7 +133,7 @@ impl TestScenario {
data
}
pub async fn execute(&self, collab: CollabRef) {
pub async fn execute(&self, collab: CollabRef) -> String {
for t in self.txns.iter() {
let mut lock = collab.write().await;
let collab = lock.borrow_mut();
@ -161,5 +161,6 @@ impl TestScenario {
let txt: TextRef = collab.data.get_with_txn(&txn, "text-id").unwrap();
let actual = txt.get_string(&txn);
assert_eq!(actual, expected);
actual
}
}