chore: better stress test scenario
This commit is contained in:
parent
80405cc43e
commit
0032b7e08e
|
|
@ -141,18 +141,6 @@ services:
|
|||
- APPFLOWY_AI_SERVER_PORT=${APPFLOWY_AI_SERVER_PORT}
|
||||
- APPFLOWY_AI_DATABASE_URL=${APPFLOWY_AI_DATABASE_URL}
|
||||
|
||||
appflowy_history:
|
||||
restart: on-failure
|
||||
image: appflowyinc/appflowy_history:${APPFLOWY_HISTORY_VERSION:-latest}
|
||||
build:
|
||||
context: .
|
||||
dockerfile: ./services/appflowy-history/Dockerfile
|
||||
environment:
|
||||
- RUST_LOG=${RUST_LOG:-info}
|
||||
- APPFLOWY_HISTORY_REDIS_URL=redis://redis:6379
|
||||
- APPFLOWY_HISTORY_ENVIRONMENT=production
|
||||
- APPFLOWY_HISTORY_DATABASE_URL=${APPFLOWY_HISTORY_DATABASE_URL}
|
||||
|
||||
appflowy_worker:
|
||||
restart: on-failure
|
||||
image: appflowyinc/appflowy_worker:${APPFLOWY_WORKER_VERSION:-latest}
|
||||
|
|
|
|||
|
|
@ -200,7 +200,11 @@ where
|
|||
return Ok(false);
|
||||
}
|
||||
|
||||
trace!("🔥{} start sync, reason:{}", &sync_object.object_id, reason);
|
||||
tracing::debug!(
|
||||
"🔥{} restart sync due to missing update, reason:{}",
|
||||
&sync_object.object_id,
|
||||
reason
|
||||
);
|
||||
let awareness = collab.get_awareness();
|
||||
let payload = gen_sync_state(awareness, &ClientSyncProtocol)?;
|
||||
sink.queue_init_sync(|msg_id| {
|
||||
|
|
@ -237,8 +241,8 @@ where
|
|||
SyncReason::CollabInitialize
|
||||
| SyncReason::ServerCannotApplyUpdate
|
||||
| SyncReason::NetworkResume => {
|
||||
trace!(
|
||||
"🔥{} start sync, reason: {}",
|
||||
tracing::debug!(
|
||||
"🔥{} resume network, reason: {}",
|
||||
&sync_object.object_id,
|
||||
reason
|
||||
);
|
||||
|
|
|
|||
|
|
@ -245,12 +245,13 @@ impl AckMeta {
|
|||
impl Display for CollabAck {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
f.write_fmt(format_args!(
|
||||
"ack: [uid:{}|oid:{}|msg_id:{:?}|len:{}|code:{}]",
|
||||
"ack: [uid:{}|oid:{}|msg_id:{:?}|len:{}|code:{}|seq_nr:{}]",
|
||||
self.origin.client_user_id().unwrap_or(0),
|
||||
self.object_id,
|
||||
self.msg_id,
|
||||
self.payload.len(),
|
||||
self.code,
|
||||
self.seq_num
|
||||
))
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -195,18 +195,20 @@ impl CollabGroup {
|
|||
}
|
||||
|
||||
async fn handle_inbound_update(state: &CollabGroupState, update: CollabStreamUpdate) {
|
||||
tracing::trace!(
|
||||
"broadcasting collab update from {} ({} bytes)",
|
||||
update.sender,
|
||||
update.data.len()
|
||||
);
|
||||
// update state vector based on incoming message
|
||||
let mut sv = state.state_vector.write().await;
|
||||
sv.merge(update.state_vector);
|
||||
drop(sv);
|
||||
|
||||
let seq_num = state.seq_no.fetch_add(1, Ordering::SeqCst);
|
||||
let message = BroadcastSync::new(update.sender, state.object_id.clone(), update.data, seq_num);
|
||||
tracing::trace!(
|
||||
"broadcasting collab update from {} ({} bytes) - seq_num: {}",
|
||||
update.sender,
|
||||
update.data.len(),
|
||||
seq_num
|
||||
);
|
||||
let payload = Message::Sync(SyncMessage::Update(update.data)).encode_v1();
|
||||
let message = BroadcastSync::new(update.sender, state.object_id.clone(), payload, seq_num);
|
||||
for mut e in state.subscribers.iter_mut() {
|
||||
let subscription = e.value_mut();
|
||||
if message.origin == subscription.collab_origin {
|
||||
|
|
@ -625,6 +627,7 @@ impl CollabGroup {
|
|||
}
|
||||
|
||||
// we need to reconstruct document state on the server side
|
||||
tracing::debug!("loading collab {}", state.object_id);
|
||||
let snapshot = state
|
||||
.persister
|
||||
.load()
|
||||
|
|
@ -890,7 +893,7 @@ impl CollabPersister {
|
|||
};
|
||||
let msg_id = self.awareness_sink.send(&update).await?;
|
||||
tracing::trace!(
|
||||
"persisted update from {} ({} bytes) - msg id: {}",
|
||||
"persisted awareness from {} ({} bytes) - msg id: {}",
|
||||
update.sender,
|
||||
len,
|
||||
msg_id
|
||||
|
|
@ -961,6 +964,7 @@ impl CollabPersister {
|
|||
}
|
||||
|
||||
async fn save(&self) -> Result<(), RealtimeError> {
|
||||
tracing::debug!("requesting save for collab {}", self.object_id);
|
||||
let mut snapshot = self.load_internal(true).await?;
|
||||
if let Some(message_id) = &snapshot.last_message_id {
|
||||
// non-nil message_id means that we had to update the most recent collab state snapshot
|
||||
|
|
|
|||
|
|
@ -6,59 +6,76 @@ use serde_json::json;
|
|||
use tokio::time::sleep;
|
||||
use uuid::Uuid;
|
||||
|
||||
use client_api_test::{assert_server_collab, TestClient};
|
||||
|
||||
use super::util::TestScenario;
|
||||
use client_api_test::{assert_server_collab, TestClient};
|
||||
use database_entity::dto::AFRole;
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn run_multiple_text_edits() {
|
||||
const READER_COUNT: usize = 1;
|
||||
let test_scenario = Arc::new(TestScenario::open(
|
||||
"./tests/collab/asset/automerge-paper.json.gz",
|
||||
));
|
||||
let mut tasks = Vec::new();
|
||||
for _i in 0..1 {
|
||||
let test_scenario = test_scenario.clone();
|
||||
let task = tokio::spawn(async move {
|
||||
let mut new_user = TestClient::new_user().await;
|
||||
// sleep 2 secs to make sure it do not trigger register user too fast in gotrue
|
||||
sleep(Duration::from_secs(2)).await;
|
||||
// create writer
|
||||
let mut writer = TestClient::new_user().await;
|
||||
sleep(Duration::from_secs(2)).await; // sleep 2 secs to make sure it do not trigger register user too fast in gotrue
|
||||
|
||||
let object_id = Uuid::new_v4().to_string();
|
||||
let workspace_id = new_user.workspace_id().await;
|
||||
// the big doc_state will force the init_sync using the http request.
|
||||
// It will trigger the POST_REALTIME_MESSAGE_STREAM_HANDLER to handle the request.
|
||||
new_user
|
||||
let workspace_id = writer.workspace_id().await;
|
||||
|
||||
writer
|
||||
.open_collab(&workspace_id, &object_id, CollabType::Unknown)
|
||||
.await;
|
||||
|
||||
let collab = new_user.collabs.get(&object_id).unwrap().collab.clone();
|
||||
test_scenario.execute(collab).await;
|
||||
|
||||
new_user
|
||||
.wait_object_sync_complete(&object_id)
|
||||
// create readers and invite them into the same workspace
|
||||
let mut readers = Vec::with_capacity(READER_COUNT);
|
||||
for _ in 0..READER_COUNT {
|
||||
let mut reader = TestClient::new_user().await;
|
||||
sleep(Duration::from_secs(2)).await; // sleep 2 secs to make sure it do not trigger register user too fast in gotrue
|
||||
writer
|
||||
.invite_and_accepted_workspace_member(&workspace_id, &reader, AFRole::Member)
|
||||
.await
|
||||
.unwrap();
|
||||
(new_user, object_id, workspace_id)
|
||||
});
|
||||
tasks.push(task);
|
||||
|
||||
reader
|
||||
.open_collab(&workspace_id, &object_id, CollabType::Unknown)
|
||||
.await;
|
||||
|
||||
readers.push(reader);
|
||||
}
|
||||
|
||||
// run test scenario
|
||||
let collab = writer.collabs.get(&object_id).unwrap().collab.clone();
|
||||
let expected = test_scenario.execute(collab).await;
|
||||
|
||||
// wait for the writer to complete sync
|
||||
writer.wait_object_sync_complete(&object_id).await.unwrap();
|
||||
|
||||
// wait for the readers to complete sync
|
||||
let mut tasks = Vec::with_capacity(READER_COUNT);
|
||||
for reader in readers.iter() {
|
||||
let fut = reader.wait_object_sync_complete(&object_id);
|
||||
tasks.push(fut);
|
||||
}
|
||||
let results = futures::future::join_all(tasks).await;
|
||||
for result in results.into_iter() {
|
||||
let (mut client, object_id, workspace_id) = result.unwrap();
|
||||
|
||||
// make sure that the readers are in correct state
|
||||
for res in results {
|
||||
res.unwrap();
|
||||
}
|
||||
|
||||
for mut reader in readers.drain(..) {
|
||||
assert_server_collab(
|
||||
&workspace_id,
|
||||
&mut client.api_client,
|
||||
&mut reader.api_client,
|
||||
&object_id,
|
||||
&CollabType::Document,
|
||||
&CollabType::Unknown,
|
||||
10,
|
||||
json!({
|
||||
"text-id": &test_scenario.end_content,
|
||||
"text-id": &expected,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
drop(client);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -133,7 +133,7 @@ impl TestScenario {
|
|||
data
|
||||
}
|
||||
|
||||
pub async fn execute(&self, collab: CollabRef) {
|
||||
pub async fn execute(&self, collab: CollabRef) -> String {
|
||||
for t in self.txns.iter() {
|
||||
let mut lock = collab.write().await;
|
||||
let collab = lock.borrow_mut();
|
||||
|
|
@ -161,5 +161,6 @@ impl TestScenario {
|
|||
let txt: TextRef = collab.data.get_with_txn(&txn, "text-id").unwrap();
|
||||
let actual = txt.get_string(&txn);
|
||||
assert_eq!(actual, expected);
|
||||
actual
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue