test: add ws connect test (#39)

* test: add ws connect test

* chore: format log
This commit is contained in:
Nathan.fooo 2023-09-13 17:04:42 +08:00 committed by GitHub
parent 107627f4d8
commit c42158b7cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 162 additions and 49 deletions

1
Cargo.lock generated
View File

@ -444,6 +444,7 @@ dependencies = [
"collab",
"collab-define",
"collab-plugins",
"collab-sync-protocol",
"collab-ws",
"config",
"derive_more",

View File

@ -75,6 +75,7 @@ collab = { version = "0.1.0" }
collab-ws = { version = "0.1.0" }
collab-plugins = { version = "0.1.0", features = ["sync_plugin"] }
collab-define = { version = "0.1.0" }
collab-sync-protocol = { version = "0.1.0" }
client-api = { path = "libs/client-api" }
tempfile = "3.4.0"
assert-json-diff = "2.0.2"

View File

@ -44,4 +44,4 @@ pkill -f appflowy_cloud || true
# where we don't need to connect to the database
cargo sqlx database create && cargo sqlx migrate run && cargo sqlx prepare --workspace
cargo run
RUST_LOG=debug cargo run

View File

@ -6,6 +6,7 @@ use shared_entity::data::AppResponse;
use gotrue_entity::{AccessTokenResponse, User};
use shared_entity::error::AppError;
use shared_entity::server_error::ErrorCode;
use storage_entity::{AFUserProfileView, InsertCollabParams};
use storage_entity::{AFWorkspaces, QueryCollabParams};
use storage_entity::{DeleteCollabParams, RawData};
@ -13,13 +14,15 @@ use storage_entity::{DeleteCollabParams, RawData};
pub struct Client {
http_client: reqwest::Client,
base_url: String,
ws_addr: String,
token: Option<AccessTokenResponse>,
}
impl Client {
pub fn from(c: reqwest::Client, base_url: &str) -> Self {
pub fn from(c: reqwest::Client, base_url: &str, ws_addr: &str) -> Self {
Self {
base_url: base_url.to_string(),
ws_addr: ws_addr.to_string(),
http_client: c,
token: None,
}
@ -145,6 +148,16 @@ impl Client {
AppResponse::<()>::from_response(resp).await?.into_error()
}
pub fn ws_url(&self) -> Result<String, AppError> {
match self.token() {
None => Err(AppError::new(
ErrorCode::OAuthError,
"No token found".to_string(),
)),
Some(token) => Ok(format!("{}/{}", self.ws_addr, token.access_token)),
}
}
fn http_client_with_auth(&self, method: Method, url: &str) -> Result<RequestBuilder, Error> {
match &self.token {
None => anyhow::bail!("no token found, are you logged in?"),

View File

@ -59,6 +59,7 @@ where
}
fn forward_binary_to_ws_server(&self, bytes: Bytes) {
tracing::info!("Receive binary message from client");
match RealtimeMessage::from_vec(bytes.to_vec()) {
Ok(message) => match CollabMessage::from_vec(&message.payload) {
Ok(collab_msg) => {

View File

@ -23,7 +23,7 @@ pub async fn establish_ws_connection(
state: Data<State>,
server: Data<Addr<CollabServer<CollabPostgresDBStorageImpl>>>,
) -> Result<HttpResponse> {
tracing::trace!("{:?}", request);
tracing::info!("ws connect: {:?}", request);
let auth = authorization_from_token(token.as_str(), &state)?;
let user_uuid = UserUuid::from_auth(auth)?;
let client = CollabSession::new(

View File

@ -4,9 +4,18 @@ use appflowy_cloud::telemetry::{get_subscriber, init_subscriber};
#[actix_web::main]
async fn main() -> anyhow::Result<()> {
let level = std::env::var("RUST_LOG").unwrap_or("info".to_string());
let mut filters = vec![];
filters.push(format!("actix_web={}", level));
filters.push(format!("collab={}", level));
filters.push(format!("collab_sync={}", level));
filters.push(format!("appflowy_cloud={}", level));
filters.push(format!("collab_plugins={}", level));
filters.push(format!("realtime={}", level));
let subscriber = get_subscriber(
"appflowy_cloud".to_string(),
"info".to_string(),
filters.join(","),
std::io::stdout,
);
init_subscriber(subscriber);

View File

@ -28,6 +28,7 @@ where
.with_ansi(true)
.with_target(true)
.with_max_level(tracing::Level::TRACE)
.pretty()
.finish()
.with(env_filter)
.with(JsonStorageLayer)

View File

@ -1 +1,2 @@
pub const LOCALHOST_URL: &str = "http://localhost:8000";
pub const LOCALHOST_WS: &str = "ws://localhost:8000/ws";

View File

@ -1,17 +1,13 @@
use client_api::Client;
use shared_entity::server_error::ErrorCode;
use crate::client::{
constants::LOCALHOST_URL,
utils::{generate_unique_email, REGISTERED_EMAIL, REGISTERED_PASSWORD},
};
use crate::client::utils::{generate_unique_email, REGISTERED_EMAIL, REGISTERED_PASSWORD};
use crate::client_api_client;
#[tokio::test]
async fn sign_in_unknown_user() {
let email = generate_unique_email();
let password = "Hello123!";
let mut c = Client::from(reqwest::Client::new(), LOCALHOST_URL);
let mut c = client_api_client();
let err = c.sign_in_password(&email, password).await.unwrap_err();
assert_eq!(err.code, ErrorCode::OAuthError);
assert!(!err.message.is_empty());
@ -19,7 +15,7 @@ async fn sign_in_unknown_user() {
#[tokio::test]
async fn sign_in_wrong_password() {
let mut c = Client::from(reqwest::Client::new(), LOCALHOST_URL);
let mut c = client_api_client();
let email = generate_unique_email();
let password = "Hello123!";
@ -37,7 +33,7 @@ async fn sign_in_wrong_password() {
#[tokio::test]
async fn sign_in_unconfirmed_email() {
let mut c = Client::from(reqwest::Client::new(), LOCALHOST_URL);
let mut c = client_api_client();
let email = generate_unique_email();
let password = "Hello123!";
@ -51,7 +47,7 @@ async fn sign_in_unconfirmed_email() {
#[tokio::test]
async fn sign_in_success() {
let mut c = Client::from(reqwest::Client::new(), LOCALHOST_URL);
let mut c = client_api_client();
c.sign_in_password(&REGISTERED_EMAIL, &REGISTERED_PASSWORD)
.await
.unwrap();

View File

@ -1,20 +1,16 @@
use client_api::Client;
use crate::client::{
constants::LOCALHOST_URL,
utils::{REGISTERED_EMAIL, REGISTERED_PASSWORD},
};
use crate::client::utils::{REGISTERED_EMAIL, REGISTERED_PASSWORD};
use crate::client_api_client;
#[tokio::test]
async fn sign_out_but_not_sign_in() {
let c = Client::from(reqwest::Client::new(), LOCALHOST_URL);
let c = client_api_client();
let res = c.sign_out().await;
assert!(res.is_err());
}
#[tokio::test]
async fn sign_out_after_sign_in() {
let mut c = Client::from(reqwest::Client::new(), LOCALHOST_URL);
let mut c = client_api_client();
c.sign_in_password(&REGISTERED_EMAIL, &REGISTERED_PASSWORD)
.await

View File

@ -1,15 +1,13 @@
use crate::client::{
constants::LOCALHOST_URL,
utils::{generate_unique_email, REGISTERED_EMAIL, REGISTERED_PASSWORD},
};
use client_api::Client;
use crate::client::utils::{generate_unique_email, REGISTERED_EMAIL, REGISTERED_PASSWORD};
use crate::client_api_client;
use shared_entity::server_error::ErrorCode;
#[tokio::test]
async fn sign_up_success() {
let email = generate_unique_email();
let password = "Hello!123#";
let c = Client::from(reqwest::Client::new(), LOCALHOST_URL);
let c = client_api_client();
c.sign_up(&email, password).await.unwrap();
}
@ -17,7 +15,7 @@ async fn sign_up_success() {
async fn sign_up_invalid_email() {
let invalid_email = "not_email_address";
let password = "Hello!123#";
let c = Client::from(reqwest::Client::new(), LOCALHOST_URL);
let c = client_api_client();
let error = c.sign_up(invalid_email, password).await.unwrap_err();
assert_eq!(error.code, ErrorCode::InvalidEmail);
assert_eq!(error.message, "invalid email: not_email_address");
@ -27,7 +25,7 @@ async fn sign_up_invalid_email() {
async fn sign_up_invalid_password() {
let email = generate_unique_email();
let password = "123";
let c = Client::from(reqwest::Client::new(), LOCALHOST_URL);
let c = client_api_client();
let error = c.sign_up(&email, password).await.unwrap_err();
assert_eq!(error.code, ErrorCode::InvalidPassword);
assert_eq!(error.message, "invalid password: 123")
@ -35,7 +33,7 @@ async fn sign_up_invalid_password() {
#[tokio::test]
async fn sign_up_but_existing_user() {
let c = Client::from(reqwest::Client::new(), LOCALHOST_URL);
let c = client_api_client();
c.sign_up(&REGISTERED_EMAIL, &REGISTERED_PASSWORD)
.await
.unwrap();

View File

@ -1,13 +1,9 @@
use client_api::Client;
use crate::client::{
constants::LOCALHOST_URL,
utils::{generate_unique_email, REGISTERED_EMAIL, REGISTERED_PASSWORD},
};
use crate::client::utils::{generate_unique_email, REGISTERED_EMAIL, REGISTERED_PASSWORD};
use crate::client_api_client;
#[tokio::test]
async fn update_but_not_logged_in() {
let mut c = Client::from(reqwest::Client::new(), LOCALHOST_URL);
let mut c = client_api_client();
let new_email = generate_unique_email();
let new_password = "Hello123!";
let res = c.update(&new_email, new_password).await;
@ -16,7 +12,7 @@ async fn update_but_not_logged_in() {
#[tokio::test]
async fn update_password_same_password() {
let mut c = Client::from(reqwest::Client::new(), LOCALHOST_URL);
let mut c = client_api_client();
c.sign_in_password(&REGISTERED_EMAIL, &REGISTERED_PASSWORD)
.await
.unwrap();
@ -30,7 +26,7 @@ async fn update_password_and_revert() {
let new_password = "Hello456!";
{
// change password to new_password
let mut c = Client::from(reqwest::Client::new(), LOCALHOST_URL);
let mut c = client_api_client();
c.sign_in_password(&REGISTERED_EMAIL, &REGISTERED_PASSWORD)
.await
.unwrap();
@ -38,7 +34,7 @@ async fn update_password_and_revert() {
}
{
// revert password to old_password
let mut c = Client::from(reqwest::Client::new(), LOCALHOST_URL);
let mut c = client_api_client();
c.sign_in_password(&REGISTERED_EMAIL, new_password)
.await
.unwrap();

View File

@ -1,7 +1,7 @@
use crate::client::constants::LOCALHOST_URL;
use crate::client::utils::{REGISTERED_EMAIL, REGISTERED_PASSWORD};
use crate::client_api_client;
use crate::collab::workspace_id_from_client;
use client_api::Client;
use collab_define::CollabType;
use shared_entity::server_error::ErrorCode;
use storage_entity::{DeleteCollabParams, InsertCollabParams, QueryCollabParams};
@ -9,7 +9,7 @@ use uuid::Uuid;
#[tokio::test]
async fn success_insert_collab_test() {
let mut c = Client::from(reqwest::Client::new(), LOCALHOST_URL);
let mut c = client_api_client();
c.sign_in_password(&REGISTERED_EMAIL, &REGISTERED_PASSWORD)
.await
.unwrap();
@ -40,7 +40,7 @@ async fn success_insert_collab_test() {
#[tokio::test]
async fn success_delete_collab_test() {
let mut c = Client::from(reqwest::Client::new(), LOCALHOST_URL);
let mut c = client_api_client();
c.sign_in_password(&REGISTERED_EMAIL, &REGISTERED_PASSWORD)
.await
.unwrap();
@ -77,7 +77,7 @@ async fn success_delete_collab_test() {
#[tokio::test]
async fn fail_insert_collab_with_empty_payload_test() {
let mut c = Client::from(reqwest::Client::new(), LOCALHOST_URL);
let mut c = client_api_client();
c.sign_in_password(&REGISTERED_EMAIL, &REGISTERED_PASSWORD)
.await
.unwrap();
@ -99,7 +99,7 @@ async fn fail_insert_collab_with_empty_payload_test() {
#[tokio::test]
async fn fail_insert_collab_with_invalid_workspace_id_test() {
let mut c = Client::from(reqwest::Client::new(), LOCALHOST_URL);
let mut c = client_api_client();
c.sign_in_password(&REGISTERED_EMAIL, &REGISTERED_PASSWORD)
.await
.unwrap();

View File

@ -1,4 +1,11 @@
mod client;
mod gotrue;
use crate::client::constants::{LOCALHOST_URL, LOCALHOST_WS};
use client_api::Client;
mod client;
mod collab;
mod gotrue;
mod realtime;
pub fn client_api_client() -> Client {
Client::from(reqwest::Client::new(), LOCALHOST_URL, LOCALHOST_WS)
}

View File

@ -0,0 +1,92 @@
use crate::client::utils::{REGISTERED_EMAIL, REGISTERED_PASSWORD};
use crate::client_api_client;
use collab_ws::{ConnectState, WSClient, WSClientConfig};
#[tokio::test]
async fn realtime_connect_test() {
let mut c = client_api_client();
c.sign_in_password(&REGISTERED_EMAIL, &REGISTERED_PASSWORD)
.await
.unwrap();
let ws_client = WSClient::new(
c.ws_url().unwrap(),
WSClientConfig {
buffer_capacity: 100,
ping_per_secs: 2,
retry_connect_per_pings: 5,
},
);
let mut state = ws_client.subscribe_connect_state().await;
loop {
tokio::select! {
_ = ws_client.connect() => {},
value = state.recv() => {
let new_state = value.unwrap();
if new_state == ConnectState::Connected {
break;
}
},
}
}
}
// #[tokio::test]
// async fn realtime_write_test() {
// let mut c = client_api_client();
// c.sign_in_password(&REGISTERED_EMAIL, &REGISTERED_PASSWORD)
// .await
// .unwrap();
//
// let ws_client = WSClient::new(
// c.ws_url().unwrap(),
// WSClientConfig {
// buffer_capacity: 100,
// ping_per_secs: 2,
// retry_connect_per_pings: 5,
// },
// );
// ws_client.connect().await.unwrap();
//
// let handler = ws_client.subscribe(1, "test".to_string()).await.unwrap();
// let mut sink = handler.sink::<TestMessage>();
// let mut stream = handler.stream::<TestMessage>();
// let msg = TestMessage {
// object_id: "test".to_string(),
// };
// sink.send(msg.clone()).await.unwrap();
//
// tokio::time::sleep(Duration::from_secs(3)).await;
// // loop {
// // tokio::select! {
// // _ = sink.send(msg.clone()) => {},
// // msg = stream.next() => {
// // break;
// // },
// // }
// // }
// }
// #[derive(Clone, Debug)]
// struct TestMessage {
// object_id: String,
// }
//
// impl From<TestMessage> for WSMessage {
// fn from(value: TestMessage) -> Self {
// WSMessage {
// business_id: 0,
// object_id: value.object_id,
// payload: vec![],
// }
// }
// }
//
// impl From<WSMessage> for TestMessage {
// fn from(value: WSMessage) -> Self {
// TestMessage {
// object_id: value.object_id,
// }
// }
// }

1
tests/realtime/mod.rs Normal file
View File

@ -0,0 +1 @@
mod connect_test;