diff --git a/Cargo.lock b/Cargo.lock index 3a6cd907..47205c08 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -444,6 +444,7 @@ dependencies = [ "collab", "collab-define", "collab-plugins", + "collab-sync-protocol", "collab-ws", "config", "derive_more", diff --git a/Cargo.toml b/Cargo.toml index e9a282a5..f4682fcd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -75,6 +75,7 @@ collab = { version = "0.1.0" } collab-ws = { version = "0.1.0" } collab-plugins = { version = "0.1.0", features = ["sync_plugin"] } collab-define = { version = "0.1.0" } +collab-sync-protocol = { version = "0.1.0" } client-api = { path = "libs/client-api" } tempfile = "3.4.0" assert-json-diff = "2.0.2" diff --git a/build/run_local_server.sh b/build/run_local_server.sh index 3b7d9515..3071c2ec 100755 --- a/build/run_local_server.sh +++ b/build/run_local_server.sh @@ -44,4 +44,4 @@ pkill -f appflowy_cloud || true # where we don't need to connect to the database cargo sqlx database create && cargo sqlx migrate run && cargo sqlx prepare --workspace -cargo run +RUST_LOG=debug cargo run diff --git a/libs/client-api/src/http.rs b/libs/client-api/src/http.rs index 5fa7ce10..86997633 100644 --- a/libs/client-api/src/http.rs +++ b/libs/client-api/src/http.rs @@ -6,6 +6,7 @@ use shared_entity::data::AppResponse; use gotrue_entity::{AccessTokenResponse, User}; use shared_entity::error::AppError; +use shared_entity::server_error::ErrorCode; use storage_entity::{AFUserProfileView, InsertCollabParams}; use storage_entity::{AFWorkspaces, QueryCollabParams}; use storage_entity::{DeleteCollabParams, RawData}; @@ -13,13 +14,15 @@ use storage_entity::{DeleteCollabParams, RawData}; pub struct Client { http_client: reqwest::Client, base_url: String, + ws_addr: String, token: Option, } impl Client { - pub fn from(c: reqwest::Client, base_url: &str) -> Self { + pub fn from(c: reqwest::Client, base_url: &str, ws_addr: &str) -> Self { Self { base_url: base_url.to_string(), + ws_addr: ws_addr.to_string(), http_client: c, token: None, } @@ -145,6 +148,16 @@ impl Client { AppResponse::<()>::from_response(resp).await?.into_error() } + pub fn ws_url(&self) -> Result { + match self.token() { + None => Err(AppError::new( + ErrorCode::OAuthError, + "No token found".to_string(), + )), + Some(token) => Ok(format!("{}/{}", self.ws_addr, token.access_token)), + } + } + fn http_client_with_auth(&self, method: Method, url: &str) -> Result { match &self.token { None => anyhow::bail!("no token found, are you logged in?"), diff --git a/libs/realtime/src/core/client.rs b/libs/realtime/src/core/client.rs index 429e5522..47df2b94 100644 --- a/libs/realtime/src/core/client.rs +++ b/libs/realtime/src/core/client.rs @@ -59,6 +59,7 @@ where } fn forward_binary_to_ws_server(&self, bytes: Bytes) { + tracing::info!("Receive binary message from client"); match RealtimeMessage::from_vec(bytes.to_vec()) { Ok(message) => match CollabMessage::from_vec(&message.payload) { Ok(collab_msg) => { diff --git a/src/api/ws.rs b/src/api/ws.rs index a563a2c8..eb4f2e4b 100644 --- a/src/api/ws.rs +++ b/src/api/ws.rs @@ -23,7 +23,7 @@ pub async fn establish_ws_connection( state: Data, server: Data>>, ) -> Result { - tracing::trace!("{:?}", request); + tracing::info!("ws connect: {:?}", request); let auth = authorization_from_token(token.as_str(), &state)?; let user_uuid = UserUuid::from_auth(auth)?; let client = CollabSession::new( diff --git a/src/main.rs b/src/main.rs index 6ffa6777..f424e62f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,9 +4,18 @@ use appflowy_cloud::telemetry::{get_subscriber, init_subscriber}; #[actix_web::main] async fn main() -> anyhow::Result<()> { + let level = std::env::var("RUST_LOG").unwrap_or("info".to_string()); + let mut filters = vec![]; + filters.push(format!("actix_web={}", level)); + filters.push(format!("collab={}", level)); + filters.push(format!("collab_sync={}", level)); + filters.push(format!("appflowy_cloud={}", level)); + filters.push(format!("collab_plugins={}", level)); + filters.push(format!("realtime={}", level)); + let subscriber = get_subscriber( "appflowy_cloud".to_string(), - "info".to_string(), + filters.join(","), std::io::stdout, ); init_subscriber(subscriber); diff --git a/src/telemetry.rs b/src/telemetry.rs index aaea2e68..4e5eebbf 100644 --- a/src/telemetry.rs +++ b/src/telemetry.rs @@ -28,6 +28,7 @@ where .with_ansi(true) .with_target(true) .with_max_level(tracing::Level::TRACE) + .pretty() .finish() .with(env_filter) .with(JsonStorageLayer) diff --git a/tests/client/constants.rs b/tests/client/constants.rs index 22040096..d601cd97 100644 --- a/tests/client/constants.rs +++ b/tests/client/constants.rs @@ -1 +1,2 @@ pub const LOCALHOST_URL: &str = "http://localhost:8000"; +pub const LOCALHOST_WS: &str = "ws://localhost:8000/ws"; diff --git a/tests/client/sign_in.rs b/tests/client/sign_in.rs index 0d457ff6..35f41a36 100644 --- a/tests/client/sign_in.rs +++ b/tests/client/sign_in.rs @@ -1,17 +1,13 @@ -use client_api::Client; - use shared_entity::server_error::ErrorCode; -use crate::client::{ - constants::LOCALHOST_URL, - utils::{generate_unique_email, REGISTERED_EMAIL, REGISTERED_PASSWORD}, -}; +use crate::client::utils::{generate_unique_email, REGISTERED_EMAIL, REGISTERED_PASSWORD}; +use crate::client_api_client; #[tokio::test] async fn sign_in_unknown_user() { let email = generate_unique_email(); let password = "Hello123!"; - let mut c = Client::from(reqwest::Client::new(), LOCALHOST_URL); + let mut c = client_api_client(); let err = c.sign_in_password(&email, password).await.unwrap_err(); assert_eq!(err.code, ErrorCode::OAuthError); assert!(!err.message.is_empty()); @@ -19,7 +15,7 @@ async fn sign_in_unknown_user() { #[tokio::test] async fn sign_in_wrong_password() { - let mut c = Client::from(reqwest::Client::new(), LOCALHOST_URL); + let mut c = client_api_client(); let email = generate_unique_email(); let password = "Hello123!"; @@ -37,7 +33,7 @@ async fn sign_in_wrong_password() { #[tokio::test] async fn sign_in_unconfirmed_email() { - let mut c = Client::from(reqwest::Client::new(), LOCALHOST_URL); + let mut c = client_api_client(); let email = generate_unique_email(); let password = "Hello123!"; @@ -51,7 +47,7 @@ async fn sign_in_unconfirmed_email() { #[tokio::test] async fn sign_in_success() { - let mut c = Client::from(reqwest::Client::new(), LOCALHOST_URL); + let mut c = client_api_client(); c.sign_in_password(®ISTERED_EMAIL, ®ISTERED_PASSWORD) .await .unwrap(); diff --git a/tests/client/sign_out.rs b/tests/client/sign_out.rs index beb080f6..6a537403 100644 --- a/tests/client/sign_out.rs +++ b/tests/client/sign_out.rs @@ -1,20 +1,16 @@ -use client_api::Client; - -use crate::client::{ - constants::LOCALHOST_URL, - utils::{REGISTERED_EMAIL, REGISTERED_PASSWORD}, -}; +use crate::client::utils::{REGISTERED_EMAIL, REGISTERED_PASSWORD}; +use crate::client_api_client; #[tokio::test] async fn sign_out_but_not_sign_in() { - let c = Client::from(reqwest::Client::new(), LOCALHOST_URL); + let c = client_api_client(); let res = c.sign_out().await; assert!(res.is_err()); } #[tokio::test] async fn sign_out_after_sign_in() { - let mut c = Client::from(reqwest::Client::new(), LOCALHOST_URL); + let mut c = client_api_client(); c.sign_in_password(®ISTERED_EMAIL, ®ISTERED_PASSWORD) .await diff --git a/tests/client/sign_up.rs b/tests/client/sign_up.rs index 4fffeee8..40c821f3 100644 --- a/tests/client/sign_up.rs +++ b/tests/client/sign_up.rs @@ -1,15 +1,13 @@ -use crate::client::{ - constants::LOCALHOST_URL, - utils::{generate_unique_email, REGISTERED_EMAIL, REGISTERED_PASSWORD}, -}; -use client_api::Client; +use crate::client::utils::{generate_unique_email, REGISTERED_EMAIL, REGISTERED_PASSWORD}; +use crate::client_api_client; + use shared_entity::server_error::ErrorCode; #[tokio::test] async fn sign_up_success() { let email = generate_unique_email(); let password = "Hello!123#"; - let c = Client::from(reqwest::Client::new(), LOCALHOST_URL); + let c = client_api_client(); c.sign_up(&email, password).await.unwrap(); } @@ -17,7 +15,7 @@ async fn sign_up_success() { async fn sign_up_invalid_email() { let invalid_email = "not_email_address"; let password = "Hello!123#"; - let c = Client::from(reqwest::Client::new(), LOCALHOST_URL); + let c = client_api_client(); let error = c.sign_up(invalid_email, password).await.unwrap_err(); assert_eq!(error.code, ErrorCode::InvalidEmail); assert_eq!(error.message, "invalid email: not_email_address"); @@ -27,7 +25,7 @@ async fn sign_up_invalid_email() { async fn sign_up_invalid_password() { let email = generate_unique_email(); let password = "123"; - let c = Client::from(reqwest::Client::new(), LOCALHOST_URL); + let c = client_api_client(); let error = c.sign_up(&email, password).await.unwrap_err(); assert_eq!(error.code, ErrorCode::InvalidPassword); assert_eq!(error.message, "invalid password: 123") @@ -35,7 +33,7 @@ async fn sign_up_invalid_password() { #[tokio::test] async fn sign_up_but_existing_user() { - let c = Client::from(reqwest::Client::new(), LOCALHOST_URL); + let c = client_api_client(); c.sign_up(®ISTERED_EMAIL, ®ISTERED_PASSWORD) .await .unwrap(); diff --git a/tests/client/update.rs b/tests/client/update.rs index 54ad2b61..1ae42cf9 100644 --- a/tests/client/update.rs +++ b/tests/client/update.rs @@ -1,13 +1,9 @@ -use client_api::Client; - -use crate::client::{ - constants::LOCALHOST_URL, - utils::{generate_unique_email, REGISTERED_EMAIL, REGISTERED_PASSWORD}, -}; +use crate::client::utils::{generate_unique_email, REGISTERED_EMAIL, REGISTERED_PASSWORD}; +use crate::client_api_client; #[tokio::test] async fn update_but_not_logged_in() { - let mut c = Client::from(reqwest::Client::new(), LOCALHOST_URL); + let mut c = client_api_client(); let new_email = generate_unique_email(); let new_password = "Hello123!"; let res = c.update(&new_email, new_password).await; @@ -16,7 +12,7 @@ async fn update_but_not_logged_in() { #[tokio::test] async fn update_password_same_password() { - let mut c = Client::from(reqwest::Client::new(), LOCALHOST_URL); + let mut c = client_api_client(); c.sign_in_password(®ISTERED_EMAIL, ®ISTERED_PASSWORD) .await .unwrap(); @@ -30,7 +26,7 @@ async fn update_password_and_revert() { let new_password = "Hello456!"; { // change password to new_password - let mut c = Client::from(reqwest::Client::new(), LOCALHOST_URL); + let mut c = client_api_client(); c.sign_in_password(®ISTERED_EMAIL, ®ISTERED_PASSWORD) .await .unwrap(); @@ -38,7 +34,7 @@ async fn update_password_and_revert() { } { // revert password to old_password - let mut c = Client::from(reqwest::Client::new(), LOCALHOST_URL); + let mut c = client_api_client(); c.sign_in_password(®ISTERED_EMAIL, new_password) .await .unwrap(); diff --git a/tests/collab/storage_test.rs b/tests/collab/storage_test.rs index 6c8e5d23..d5f5a6fd 100644 --- a/tests/collab/storage_test.rs +++ b/tests/collab/storage_test.rs @@ -1,7 +1,7 @@ -use crate::client::constants::LOCALHOST_URL; use crate::client::utils::{REGISTERED_EMAIL, REGISTERED_PASSWORD}; +use crate::client_api_client; use crate::collab::workspace_id_from_client; -use client_api::Client; + use collab_define::CollabType; use shared_entity::server_error::ErrorCode; use storage_entity::{DeleteCollabParams, InsertCollabParams, QueryCollabParams}; @@ -9,7 +9,7 @@ use uuid::Uuid; #[tokio::test] async fn success_insert_collab_test() { - let mut c = Client::from(reqwest::Client::new(), LOCALHOST_URL); + let mut c = client_api_client(); c.sign_in_password(®ISTERED_EMAIL, ®ISTERED_PASSWORD) .await .unwrap(); @@ -40,7 +40,7 @@ async fn success_insert_collab_test() { #[tokio::test] async fn success_delete_collab_test() { - let mut c = Client::from(reqwest::Client::new(), LOCALHOST_URL); + let mut c = client_api_client(); c.sign_in_password(®ISTERED_EMAIL, ®ISTERED_PASSWORD) .await .unwrap(); @@ -77,7 +77,7 @@ async fn success_delete_collab_test() { #[tokio::test] async fn fail_insert_collab_with_empty_payload_test() { - let mut c = Client::from(reqwest::Client::new(), LOCALHOST_URL); + let mut c = client_api_client(); c.sign_in_password(®ISTERED_EMAIL, ®ISTERED_PASSWORD) .await .unwrap(); @@ -99,7 +99,7 @@ async fn fail_insert_collab_with_empty_payload_test() { #[tokio::test] async fn fail_insert_collab_with_invalid_workspace_id_test() { - let mut c = Client::from(reqwest::Client::new(), LOCALHOST_URL); + let mut c = client_api_client(); c.sign_in_password(®ISTERED_EMAIL, ®ISTERED_PASSWORD) .await .unwrap(); diff --git a/tests/main.rs b/tests/main.rs index 250018e1..d3202d6d 100644 --- a/tests/main.rs +++ b/tests/main.rs @@ -1,4 +1,11 @@ -mod client; -mod gotrue; +use crate::client::constants::{LOCALHOST_URL, LOCALHOST_WS}; +use client_api::Client; +mod client; mod collab; +mod gotrue; +mod realtime; + +pub fn client_api_client() -> Client { + Client::from(reqwest::Client::new(), LOCALHOST_URL, LOCALHOST_WS) +} diff --git a/tests/realtime/connect_test.rs b/tests/realtime/connect_test.rs new file mode 100644 index 00000000..40cdc853 --- /dev/null +++ b/tests/realtime/connect_test.rs @@ -0,0 +1,92 @@ +use crate::client::utils::{REGISTERED_EMAIL, REGISTERED_PASSWORD}; +use crate::client_api_client; +use collab_ws::{ConnectState, WSClient, WSClientConfig}; + +#[tokio::test] +async fn realtime_connect_test() { + let mut c = client_api_client(); + c.sign_in_password(®ISTERED_EMAIL, ®ISTERED_PASSWORD) + .await + .unwrap(); + + let ws_client = WSClient::new( + c.ws_url().unwrap(), + WSClientConfig { + buffer_capacity: 100, + ping_per_secs: 2, + retry_connect_per_pings: 5, + }, + ); + let mut state = ws_client.subscribe_connect_state().await; + + loop { + tokio::select! { + _ = ws_client.connect() => {}, + value = state.recv() => { + let new_state = value.unwrap(); + if new_state == ConnectState::Connected { + break; + } + }, + } + } +} + +// #[tokio::test] +// async fn realtime_write_test() { +// let mut c = client_api_client(); +// c.sign_in_password(®ISTERED_EMAIL, ®ISTERED_PASSWORD) +// .await +// .unwrap(); +// +// let ws_client = WSClient::new( +// c.ws_url().unwrap(), +// WSClientConfig { +// buffer_capacity: 100, +// ping_per_secs: 2, +// retry_connect_per_pings: 5, +// }, +// ); +// ws_client.connect().await.unwrap(); +// +// let handler = ws_client.subscribe(1, "test".to_string()).await.unwrap(); +// let mut sink = handler.sink::(); +// let mut stream = handler.stream::(); +// let msg = TestMessage { +// object_id: "test".to_string(), +// }; +// sink.send(msg.clone()).await.unwrap(); +// +// tokio::time::sleep(Duration::from_secs(3)).await; +// // loop { +// // tokio::select! { +// // _ = sink.send(msg.clone()) => {}, +// // msg = stream.next() => { +// // break; +// // }, +// // } +// // } +// } + +// #[derive(Clone, Debug)] +// struct TestMessage { +// object_id: String, +// } +// +// impl From for WSMessage { +// fn from(value: TestMessage) -> Self { +// WSMessage { +// business_id: 0, +// object_id: value.object_id, +// payload: vec![], +// } +// } +// } +// +// impl From for TestMessage { +// fn from(value: WSMessage) -> Self { +// TestMessage { +// object_id: value.object_id, +// } +// } +// } diff --git a/tests/realtime/mod.rs b/tests/realtime/mod.rs new file mode 100644 index 00000000..a0c0b506 --- /dev/null +++ b/tests/realtime/mod.rs @@ -0,0 +1 @@ +mod connect_test;