From 3133e5ec32fe6a1daa5e76d33120d34f93547372 Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Sat, 30 Sep 2023 12:36:42 +0800 Subject: [PATCH] chore: try fix test (#84) --- Cargo.lock | 8 +-- Cargo.toml | 6 +-- build/run_local_server.sh | 9 +++- libs/client-api/Cargo.toml | 6 +-- .../client-api/src/collab_sync/pending_msg.rs | 2 +- libs/client-api/src/collab_sync/plugin.rs | 2 +- libs/client-api/src/collab_sync/sink.rs | 2 +- libs/client-api/src/collab_sync/sync.rs | 54 +++++++------------ libs/client-api/src/ws/msg.rs | 2 +- libs/realtime/src/client.rs | 3 +- libs/realtime/src/collaborate/broadcast.rs | 45 +++------------- libs/realtime/src/collaborate/group.rs | 2 +- libs/realtime/src/collaborate/server.rs | 2 +- libs/realtime/src/entities.rs | 2 +- src/application.rs | 1 - tests/main.rs | 2 +- tests/realtime/offline_edit_collab_test.rs | 4 +- 17 files changed, 56 insertions(+), 96 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4faecf14..9641a2be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -868,7 +868,7 @@ dependencies = [ [[package]] name = "collab" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=0f9b52a#0f9b52a5675e6e55c6d3b3d2a67055f424f6608e" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=d5a4a0e#d5a4a0e2e071fa36add784728f5ef33eb4450f51" dependencies = [ "anyhow", "async-trait", @@ -888,11 +888,13 @@ dependencies = [ [[package]] name = "collab-define" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=0f9b52a#0f9b52a5675e6e55c6d3b3d2a67055f424f6608e" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=d5a4a0e#d5a4a0e2e071fa36add784728f5ef33eb4450f51" dependencies = [ "anyhow", + "bytes", "collab", "serde", + "serde_json", "serde_repr", "uuid", ] @@ -900,7 +902,7 @@ dependencies = [ [[package]] name = "collab-sync-protocol" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=0f9b52a#0f9b52a5675e6e55c6d3b3d2a67055f424f6608e" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=d5a4a0e#d5a4a0e2e071fa36add784728f5ef33eb4450f51" dependencies = [ "bytes", "collab", diff --git a/Cargo.toml b/Cargo.toml index 777fb61b..b9a9157d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -125,9 +125,9 @@ lto = false opt-level = 3 [patch.crates-io] -collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "0f9b52a" } -collab-sync-protocol = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "0f9b52a" } -collab-define = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "0f9b52a" } +collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "d5a4a0e" } +collab-sync-protocol = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "d5a4a0e" } +collab-define = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "d5a4a0e" } # Comment the above and uncomment the below to use local version of collab by cloning the repo and placing it in libs folder #collab = { path = "libs/AppFlowy-Collab/collab" } diff --git a/build/run_local_server.sh b/build/run_local_server.sh index 847fccaa..11b5d00f 100755 --- a/build/run_local_server.sh +++ b/build/run_local_server.sh @@ -42,7 +42,14 @@ pkill -f appflowy_cloud || true # To generate the .sqlx files, we need to run the following command # After the .sqlx files are generated, we build in SQLX_OFFLINE=true # where we don't need to connect to the database -cargo sqlx database create && cargo sqlx migrate run && cargo sqlx prepare --workspace + +cargo sqlx database create && cargo sqlx migrate run +if [[ -z "${SKIP_SQLX_PREPARE+x}" ]] +then + cargo sqlx prepare --workspace +fi + + RUST_LOG=trace cargo run & # revert to require signup email verification diff --git a/libs/client-api/Cargo.toml b/libs/client-api/Cargo.toml index 910b7ac5..9c8bcea1 100644 --- a/libs/client-api/Cargo.toml +++ b/libs/client-api/Cargo.toml @@ -31,15 +31,15 @@ futures-core = "0.3.26" tokio-retry = "0.3" bytes = "1.0" uuid = "1.4.1" -collab-sync-protocol = { version = "0.1.0" } scraper = "0.17.1" # collab sync collab = { version = "0.1.0", optional = true } -collab-define = { version = "0.1.0", optional = true } +collab-define = { version = "0.1.0" } +collab-sync-protocol = { version = "0.1.0" } y-sync = { version = "0.3.1", optional = true } yrs = { version = "0.16.5", optional = true } lib0 = { version = "0.16.3", features = ["lib0-serde"], optional = true } [features] -collab-sync = ["collab", "collab-define", "y-sync", "yrs", "lib0"] +collab-sync = ["collab", "y-sync", "yrs", "lib0"] diff --git a/libs/client-api/src/collab_sync/pending_msg.rs b/libs/client-api/src/collab_sync/pending_msg.rs index dfe0ae27..6ce09d0f 100644 --- a/libs/client-api/src/collab_sync/pending_msg.rs +++ b/libs/client-api/src/collab_sync/pending_msg.rs @@ -1,10 +1,10 @@ +use collab_define::collab_msg::CollabSinkMessage; use std::cmp::Ordering; use std::collections::BinaryHeap; use std::fmt::Display; use std::ops::{Deref, DerefMut}; use crate::collab_sync::MsgId; -use collab_sync_protocol::CollabSinkMessage; use tokio::sync::oneshot; pub(crate) struct PendingMsgQueue { diff --git a/libs/client-api/src/collab_sync/plugin.rs b/libs/client-api/src/collab_sync/plugin.rs index 6cb6ef79..13af3328 100644 --- a/libs/client-api/src/collab_sync/plugin.rs +++ b/libs/client-api/src/collab_sync/plugin.rs @@ -4,8 +4,8 @@ use collab::core::collab::MutexCollab; use collab::core::collab_state::SyncState; use collab::core::origin::CollabOrigin; use collab::preclude::CollabPlugin; +use collab_define::collab_msg::{ClientUpdateRequest, CollabMessage}; use collab_define::{CollabObject, CollabType}; -use collab_sync_protocol::{ClientUpdateRequest, CollabMessage}; use futures_util::SinkExt; use tokio_stream::StreamExt; diff --git a/libs/client-api/src/collab_sync/sink.rs b/libs/client-api/src/collab_sync/sink.rs index 7b2389a4..1adce91c 100644 --- a/libs/client-api/src/collab_sync/sink.rs +++ b/libs/client-api/src/collab_sync/sink.rs @@ -1,3 +1,4 @@ +use collab_define::collab_msg::CollabSinkMessage; use std::marker::PhantomData; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Weak}; @@ -5,7 +6,6 @@ use std::time::Duration; use crate::collab_sync::pending_msg::{MessageState, PendingMsgQueue}; use crate::collab_sync::{SyncError, DEFAULT_SYNC_TIMEOUT}; -use collab_sync_protocol::CollabSinkMessage; use futures_util::SinkExt; use tokio::spawn; use tokio::sync::{mpsc, oneshot, watch, Mutex}; diff --git a/libs/client-api/src/collab_sync/sync.rs b/libs/client-api/src/collab_sync/sync.rs index 43ea7f69..4dc6836d 100644 --- a/libs/client-api/src/collab_sync/sync.rs +++ b/libs/client-api/src/collab_sync/sync.rs @@ -9,10 +9,8 @@ use crate::collab_sync::{ use collab::core::collab::MutexCollab; use collab::core::collab_state::SyncState; use collab::core::origin::CollabOrigin; -use collab_sync_protocol::{handle_msg, CollabSyncProtocol, DefaultSyncProtocol}; -use collab_sync_protocol::{ - ClientCollabInit, ClientUpdateRequest, CollabMessage, ServerCollabInitResponse, -}; +use collab_define::collab_msg::{ClientCollabInit, ClientUpdateRequest, CollabMessage}; +use collab_sync_protocol::{handle_msg, ClientSyncProtocol, CollabSyncProtocol}; use futures_util::{SinkExt, StreamExt}; use lib0::decoding::Cursor; use tokio::spawn; @@ -21,9 +19,9 @@ use tokio::task::JoinHandle; use tokio_stream::wrappers::WatchStream; use tracing::{error, trace, warn}; use y_sync::awareness::Awareness; -use y_sync::sync::{Message, MessageReader}; -use yrs::updates::decoder::{Decode, DecoderV1}; -use yrs::updates::encoder::{Encode, Encoder, EncoderV1}; +use y_sync::sync::MessageReader; +use yrs::updates::decoder::DecoderV1; +use yrs::updates::encoder::{Encoder, EncoderV1}; pub const DEFAULT_SYNC_TIMEOUT: u64 = 2; @@ -38,7 +36,7 @@ pub struct SyncQueue { /// the updates from the remote. #[allow(dead_code)] stream: SyncStream, - protocol: DefaultSyncProtocol, + protocol: ClientSyncProtocol, sync_state: Arc>, } @@ -56,7 +54,7 @@ where collab: Weak, config: SinkConfig, ) -> Self { - let protocol = DefaultSyncProtocol; + let protocol = ClientSyncProtocol; let (notifier, notifier_rx) = watch::channel(false); let sync_state = Arc::new(watch::channel(SyncState::SyncInitStart).0); let (sync_state_tx, sink_state_rx) = watch::channel(SinkState::Init); @@ -257,25 +255,9 @@ where where P: CollabSyncProtocol + Send + Sync + 'static, { - match msg { - CollabMessage::ServerInit(init_sync) => { - if !init_sync.payload.is_empty() { - let mut decoder = DecoderV1::from(init_sync.payload.as_ref()); - if let Ok(msg) = Message::decode(&mut decoder) { - if let Some(resp_msg) = handle_msg(&Some(origin), protocol, collab, msg).await? { - let payload = resp_msg.encode_v1(); - let object_id = object_id.to_string(); - sink.queue_msg(|msg_id| { - ServerCollabInitResponse::new(origin.clone(), object_id, payload, msg_id).into() - }); - } - } - } - - sink.ack_msg(object_id, init_sync.msg_id).await; - Ok(()) - }, - _ => { + { + if !msg.payload().is_empty() { + trace!("<<< start processing messages"); SyncStream::::process_payload( origin, msg.payload(), @@ -285,11 +267,12 @@ where sink, ) .await?; - if let Some(msg_id) = msg.msg_id() { - sink.ack_msg(msg.object_id(), msg_id).await; - } - Ok(()) - }, + trace!("<<< end processing messages"); + } + if let Some(msg_id) = msg.msg_id() { + sink.ack_msg(msg.object_id(), msg_id).await; + } + Ok(()) } } @@ -311,9 +294,8 @@ where let mut decoder = DecoderV1::new(Cursor::new(payload)); let reader = MessageReader::new(&mut decoder); for msg in reader { - let msg = msg?; - if let Some(resp) = handle_msg(&Some(origin), protocol, collab, msg).await? { - let payload = resp.encode_v1(); + trace!("handle message: {:?}", msg); + if let Some(payload) = handle_msg(&Some(origin), protocol, collab, msg?).await? { let object_id = object_id.to_string(); sink.queue_msg(|msg_id| { ClientUpdateRequest::new(origin.clone(), object_id, msg_id, payload).into() diff --git a/libs/client-api/src/ws/msg.rs b/libs/client-api/src/ws/msg.rs index 7a02e008..00a88fef 100644 --- a/libs/client-api/src/ws/msg.rs +++ b/libs/client-api/src/ws/msg.rs @@ -1,5 +1,5 @@ use crate::ws::WSError; -use collab_sync_protocol::CollabMessage; +use collab_define::collab_msg::CollabMessage; use serde::{Deserialize, Serialize}; use serde_repr::{Deserialize_repr, Serialize_repr}; use tokio_tungstenite::tungstenite::Message; diff --git a/libs/realtime/src/client.rs b/libs/realtime/src/client.rs index 950ce402..e11c235b 100644 --- a/libs/realtime/src/client.rs +++ b/libs/realtime/src/client.rs @@ -9,11 +9,10 @@ use actix_web_actors::ws; use bytes::Bytes; use std::ops::Deref; -use collab_sync_protocol::CollabMessage; - use crate::collaborate::CollabServer; use crate::error::RealtimeError; +use collab_define::collab_msg::CollabMessage; use database::collab::CollabStorage; use std::time::{Duration, Instant}; use tracing::error; diff --git a/libs/realtime/src/collaborate/broadcast.rs b/libs/realtime/src/collaborate/broadcast.rs index a5d39769..68621599 100644 --- a/libs/realtime/src/collaborate/broadcast.rs +++ b/libs/realtime/src/collaborate/broadcast.rs @@ -11,16 +11,16 @@ use tokio::sync::Mutex; use tokio::task::JoinHandle; use y_sync::awareness; use y_sync::awareness::{Awareness, AwarenessUpdate}; -use y_sync::sync::{Message, MessageReader, SyncMessage, MSG_SYNC, MSG_SYNC_UPDATE}; +use y_sync::sync::{Message, MessageReader, MSG_SYNC, MSG_SYNC_UPDATE}; use yrs::updates::decoder::DecoderV1; use yrs::updates::encoder::{Encode, Encoder, EncoderV1}; -use yrs::{ReadTxn, UpdateSubscription}; +use yrs::UpdateSubscription; use crate::error::{internal_error, RealtimeError}; -use collab_sync_protocol::{handle_msg, DefaultSyncProtocol}; -use collab_sync_protocol::{ - CSAwarenessUpdate, ClientUpdateResponse, CollabMessage, CollabServerBroadcast, ServerCollabInit, +use collab_define::collab_msg::{ + CSAwarenessUpdate, ClientUpdateResponse, CollabMessage, CollabServerBroadcast, }; +use collab_sync_protocol::{handle_msg, ServerSyncProtocol}; use tracing::{error, trace, warn}; /// A broadcast can be used to propagate updates produced by yrs [yrs::Doc] and [Awareness] @@ -170,8 +170,6 @@ impl CollabBroadcast { } let origin = collab_msg.origin(); - let is_client_init = collab_msg.is_init(); - if object_id != collab_msg.object_id() { error!("[🔴Server]: Incoming message's object id does not match the broadcast group's object id"); continue; @@ -183,19 +181,14 @@ impl CollabBroadcast { for msg in reader { match msg { Ok(msg) => { - let message = handle_msg(&origin, &DefaultSyncProtocol, &collab, msg).await?; + let payload = handle_msg(&origin, &ServerSyncProtocol, &collab, msg).await?; match origin { None => warn!("Client message does not have a origin"), Some(origin) => { - let payload = match message { - None => vec![], - Some(message) => message.encode_v1(), - }; - let resp = ClientUpdateResponse::new( origin.clone(), object_id.clone(), - payload, + payload.unwrap_or_default(), collab_msg.msg_id(), ); @@ -214,23 +207,6 @@ impl CollabBroadcast { }, } } - - if let Some(msg_id) = collab_msg.msg_id() { - // Send the server's state vector to the client. The client will calculate the missing - // updates and send them as a single update back to the server. - let payload = if is_client_init { - encode_server_sv(&collab) - } else { - vec![] - }; - - let server_init_sync = ServerCollabInit::new(object_id.clone(), msg_id, payload); - if let Err(e) = sink.send(server_init_sync.into()).await { - trace!("Send server init sync to the client failed: {}", e); - } - } else { - warn!("Client message does not have a message id"); - } }, Err(err) => { error!("Get sink lock failed: {:?}", err); @@ -248,13 +224,6 @@ impl CollabBroadcast { } } -fn encode_server_sv(collab: &MutexCollab) -> Vec { - let mut encoder = EncoderV1::new(); - let sv = collab.lock().transact().state_vector(); - Message::Sync(SyncMessage::SyncStep1(sv)).encode(&mut encoder); - encoder.to_vec() -} - /// A subscription structure returned from [CollabBroadcast::subscribe], which represents a /// subscribed connection. It can be dropped in order to unsubscribe or awaited via /// [Subscription::completed] method in order to complete of its own volition (due to an internal diff --git a/libs/realtime/src/collaborate/group.rs b/libs/realtime/src/collaborate/group.rs index 334cea4d..025a16f2 100644 --- a/libs/realtime/src/collaborate/group.rs +++ b/libs/realtime/src/collaborate/group.rs @@ -11,7 +11,7 @@ use std::iter::Take; use std::pin::Pin; use anyhow::Error; -use collab_sync_protocol::CollabMessage; +use collab_define::collab_msg::CollabMessage; use database::collab::CollabStorage; use parking_lot::Mutex; use std::sync::{Arc, Weak}; diff --git a/libs/realtime/src/collaborate/server.rs b/libs/realtime/src/collaborate/server.rs index 302b592a..e62c52ad 100644 --- a/libs/realtime/src/collaborate/server.rs +++ b/libs/realtime/src/collaborate/server.rs @@ -4,7 +4,7 @@ use anyhow::Result; use actix::{Actor, Context, Handler, ResponseFuture}; -use collab_sync_protocol::CollabMessage; +use collab_define::collab_msg::CollabMessage; use parking_lot::Mutex; use std::collections::{HashMap, HashSet}; use std::sync::Arc; diff --git a/libs/realtime/src/entities.rs b/libs/realtime/src/entities.rs index f096858b..112f7bc5 100644 --- a/libs/realtime/src/entities.rs +++ b/libs/realtime/src/entities.rs @@ -2,8 +2,8 @@ use crate::error::RealtimeError; use actix::{Message, Recipient}; use bytes::Bytes; use collab::core::origin::CollabOrigin; -use collab_sync_protocol::CollabMessage; +use collab_define::collab_msg::CollabMessage; use serde::{Deserialize, Serialize}; use serde_repr::{Deserialize_repr, Serialize_repr}; use std::fmt::{Debug, Display}; diff --git a/src/application.rs b/src/application.rs index c89e4581..a2a80868 100644 --- a/src/application.rs +++ b/src/application.rs @@ -149,7 +149,6 @@ async fn setup_admin_account( ) { let admin_email = gotrue_setting.admin_email.as_str(); let password = gotrue_setting.admin_password.as_str(); - gotrue_client.sign_up(admin_email, password).await.unwrap(); // Unable to use query! macro here instead diff --git a/tests/main.rs b/tests/main.rs index d35a4e51..46b809d1 100644 --- a/tests/main.rs +++ b/tests/main.rs @@ -5,7 +5,7 @@ use client_api::Client; mod client; mod collab; mod gotrue; -// mod realtime; +mod realtime; pub fn client_api_client() -> Client { Client::from( diff --git a/tests/realtime/offline_edit_collab_test.rs b/tests/realtime/offline_edit_collab_test.rs index 21502b53..030a4662 100644 --- a/tests/realtime/offline_edit_collab_test.rs +++ b/tests/realtime/offline_edit_collab_test.rs @@ -1,5 +1,6 @@ use crate::client::utils::generate_unique_registered_user; use crate::realtime::test_client::{assert_client_collab, assert_remote_collab, TestClient}; +use std::time::Duration; use collab_define::CollabType; use serde_json::json; @@ -88,8 +89,9 @@ async fn edit_document_with_one_client_online_and_other_offline_test() { client_2 .create_collab(&workspace_id, &object_id, collab_type.clone()) .await; - client_2.disconnect().await; + tokio::time::sleep(Duration::from_millis(1000)).await; + client_2.disconnect().await; client_2 .collab_by_object_id .get_mut(&object_id)