From d3186cc07a210c6f991cb98680a8e710fe73c507 Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Thu, 5 Oct 2023 17:43:50 +0800 Subject: [PATCH] feat: websocket config (#99) * chore: enable tls feature * chore: update ws client * chore: update ws client * chore: expost database entities * chore: update ws nginx config * chore: rename error file * chore: fix clippy --- Cargo.lock | 3 +++ libs/client-api/Cargo.toml | 2 +- libs/client-api/src/http.rs | 2 +- libs/client-api/src/lib.rs | 4 +++- libs/client-api/src/ws/client.rs | 24 +++++++++---------- libs/client-api/src/ws/ping.rs | 8 +++---- .../src/{error.rs => database_error.rs} | 0 libs/database-entity/src/lib.rs | 2 +- libs/database/src/collab.rs | 2 +- libs/database/src/collab_db_ops.rs | 2 +- libs/realtime/src/collaborate/plugin.rs | 2 +- libs/realtime/src/error.rs | 2 +- .../src/{error.rs => app_error.rs} | 2 +- libs/shared-entity/src/data.rs | 2 +- libs/shared-entity/src/error_code.rs | 2 +- libs/shared-entity/src/lib.rs | 2 +- nginx/nginx.conf | 10 ++++++++ src/api/collaborate.rs | 4 ++-- src/api/workspace.rs | 2 +- src/biz/collab.rs | 2 +- src/biz/file_storage.rs | 2 +- src/biz/user.rs | 2 +- src/biz/workspace.rs | 2 +- tests/realtime/connect_test.rs | 4 ++-- tests/realtime/test_client.rs | 5 ++-- 25 files changed, 54 insertions(+), 40 deletions(-) rename libs/database-entity/src/{error.rs => database_error.rs} (100%) rename libs/shared-entity/src/{error.rs => app_error.rs} (98%) diff --git a/Cargo.lock b/Cargo.lock index 57f1a99c..6eaa84e2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4018,7 +4018,9 @@ checksum = "212d5dcb2a1ce06d81107c3d0ffa3121fe974b73f068c8282cb1c32328113b6c" dependencies = [ "futures-util", "log", + "native-tls", "tokio", + "tokio-native-tls", "tungstenite", ] @@ -4166,6 +4168,7 @@ dependencies = [ "http", "httparse", "log", + "native-tls", "rand 0.8.5", "sha1", "thiserror", diff --git a/libs/client-api/Cargo.toml b/libs/client-api/Cargo.toml index bc2a909f..dc6dc0a5 100644 --- a/libs/client-api/Cargo.toml +++ b/libs/client-api/Cargo.toml @@ -23,7 +23,7 @@ mime = "0.3.17" tracing = { version = "0.1" } thiserror = "1.0.39" serde = { version = "1.0", features = ["derive"] } -tokio-tungstenite = { version = "0.20.1" } +tokio-tungstenite = { version = "0.20.1", features = ["native-tls"] } tokio = { version = "1.26", features = ["full"] } futures-util = "0.3.26" futures-core = "0.3.26" diff --git a/libs/client-api/src/http.rs b/libs/client-api/src/http.rs index 84157aa1..7445bc98 100644 --- a/libs/client-api/src/http.rs +++ b/libs/client-api/src/http.rs @@ -27,7 +27,7 @@ use crate::notify::{ClientToken, TokenStateReceiver}; use database_entity::{AFUserProfileView, AFWorkspaceMember, InsertCollabParams}; use database_entity::{AFWorkspaces, QueryCollabParams}; use database_entity::{DeleteCollabParams, RawData}; -use shared_entity::error::AppError; +use shared_entity::app_error::AppError; use shared_entity::error_code::url_missing_param; use shared_entity::error_code::ErrorCode; diff --git a/libs/client-api/src/lib.rs b/libs/client-api/src/lib.rs index 2445584c..573b3592 100644 --- a/libs/client-api/src/lib.rs +++ b/libs/client-api/src/lib.rs @@ -9,11 +9,13 @@ pub mod ws; pub use http::*; pub mod error { - pub use shared_entity::error::AppError; + pub use shared_entity::app_error::AppError; pub use shared_entity::error_code::ErrorCode; } +// Export all entities that will be used in the frontend application pub mod entity { + pub use database_entity::*; pub use gotrue_entity::*; pub use shared_entity::*; } diff --git a/libs/client-api/src/ws/client.rs b/libs/client-api/src/ws/client.rs index f38aa835..59241fd2 100644 --- a/libs/client-api/src/ws/client.rs +++ b/libs/client-api/src/ws/client.rs @@ -1,6 +1,7 @@ use futures_util::{SinkExt, StreamExt}; use std::borrow::Cow; +use parking_lot::RwLock; use std::collections::HashMap; use std::net::SocketAddr; use std::sync::{Arc, Weak}; @@ -12,7 +13,7 @@ use crate::ws::state::{ConnectState, ConnectStateNotify}; use crate::ws::{BusinessID, ClientRealtimeMessage, WSError, WebSocketChannel}; use tokio::sync::broadcast::{channel, Receiver, Sender}; -use tokio::sync::{oneshot, Mutex, RwLock}; +use tokio::sync::{oneshot, Mutex}; use tokio_retry::strategy::FixedInterval; use tokio_retry::{Condition, RetryIf}; use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode; @@ -43,10 +44,11 @@ impl Default for WSClientConfig { type ChannelByObjectId = HashMap>; pub type WSConnectStateReceiver = Receiver; + pub struct WSClient { addr: Arc>>, config: WSClientConfig, - state_notify: Arc>, + state_notify: Arc>, /// Sender used to send messages to the websocket. sender: Sender, channels: Arc>>, @@ -57,13 +59,13 @@ pub struct WSClient { impl WSClient { pub fn new(config: WSClientConfig) -> Self { let (sender, _) = channel(config.buffer_capacity); - let state = Arc::new(Mutex::new(ConnectStateNotify::new())); + let state_notify = Arc::new(parking_lot::Mutex::new(ConnectStateNotify::new())); let channels = Arc::new(RwLock::new(HashMap::new())); let ping = Arc::new(Mutex::new(None)); WSClient { addr: Arc::new(parking_lot::Mutex::new(None)), config, - state_notify: state, + state_notify, sender, channels, ping, @@ -118,7 +120,6 @@ impl WSClient { if let Some(channels) = weak_channels.upgrade() { if let Some(channel) = channels .read() - .await .get(&msg.business_id) .and_then(|map| map.get(&msg.object_id)) { @@ -181,7 +182,7 @@ impl WSClient { /// Return a [WebSocketChannel] that can be used to send messages to the websocket. Caller should /// keep the channel alive as long as it wants to receive messages from the websocket. - pub async fn subscribe( + pub fn subscribe( &self, business_id: BusinessID, object_id: String, @@ -190,19 +191,18 @@ impl WSClient { self .channels .write() - .await .entry(business_id) .or_insert_with(HashMap::new) .insert(object_id, Arc::downgrade(&channel)); Ok(channel) } - pub async fn subscribe_connect_state(&self) -> WSConnectStateReceiver { - self.state_notify.lock().await.subscribe() + pub fn subscribe_connect_state(&self) -> WSConnectStateReceiver { + self.state_notify.lock().subscribe() } - pub async fn is_connected(&self) -> bool { - self.state_notify.lock().await.state.is_connected() + pub fn is_connected(&self) -> bool { + self.state_notify.lock().state.is_connected() } pub async fn disconnect(&self) { @@ -222,7 +222,7 @@ impl WSClient { async fn set_state(&self, state: ConnectState) { trace!("websocket state: {:?}", state); - self.state_notify.lock().await.set_state(state); + self.state_notify.lock().set_state(state); } } diff --git a/libs/client-api/src/ws/ping.rs b/libs/client-api/src/ws/ping.rs index 62fa0117..bea68d26 100644 --- a/libs/client-api/src/ws/ping.rs +++ b/libs/client-api/src/ws/ping.rs @@ -11,7 +11,7 @@ pub(crate) struct ServerFixIntervalPing { #[allow(dead_code)] stop_tx: tokio::sync::mpsc::Sender<()>, stop_rx: Option>, - state: Arc>, + state: Arc>, ping_count: Arc>, maximum_ping_count: u32, } @@ -19,7 +19,7 @@ pub(crate) struct ServerFixIntervalPing { impl ServerFixIntervalPing { pub(crate) fn new( duration: Duration, - state: Arc>, + state: Arc>, sender: Sender, maximum_ping_count: u32, ) -> Self { @@ -58,7 +58,7 @@ impl ServerFixIntervalPing { let mut lock = ping_count.lock().await; if *lock >= reconnect_per_ping { if let Some(state) =weak_state.upgrade() { - state.lock().await.set_state(ConnectState::PingTimeout); + state.lock().set_state(ConnectState::PingTimeout); } } else { *lock +=1; @@ -73,7 +73,7 @@ impl ServerFixIntervalPing { *lock = 0; if let Some(state) =weak_state.upgrade() { - state.lock().await.set_state(ConnectState::Connected); + state.lock().set_state(ConnectState::Connected); } } } diff --git a/libs/database-entity/src/error.rs b/libs/database-entity/src/database_error.rs similarity index 100% rename from libs/database-entity/src/error.rs rename to libs/database-entity/src/database_error.rs diff --git a/libs/database-entity/src/lib.rs b/libs/database-entity/src/lib.rs index a288e3ff..0b47f01c 100644 --- a/libs/database-entity/src/lib.rs +++ b/libs/database-entity/src/lib.rs @@ -6,7 +6,7 @@ use std::ops::Deref; use validator::{Validate, ValidationError}; pub type RawData = Vec; -pub mod error; +pub mod database_error; #[derive(Debug, Clone, Validate, Serialize, Deserialize)] pub struct InsertCollabParams { diff --git a/libs/database/src/collab.rs b/libs/database/src/collab.rs index 77a2a1db..0b2f2815 100644 --- a/libs/database/src/collab.rs +++ b/libs/database/src/collab.rs @@ -3,7 +3,7 @@ use anyhow::Context; use async_trait::async_trait; use collab::core::collab::MutexCollab; use collab_define::CollabType; -use database_entity::error::DatabaseError; +use database_entity::database_error::DatabaseError; use database_entity::{ AFCollabSnapshots, InsertCollabParams, InsertSnapshotParams, QueryCollabParams, QueryObjectSnapshotParams, QuerySnapshotParams, RawData, diff --git a/libs/database/src/collab_db_ops.rs b/libs/database/src/collab_db_ops.rs index bf57454e..8565e626 100644 --- a/libs/database/src/collab_db_ops.rs +++ b/libs/database/src/collab_db_ops.rs @@ -3,7 +3,7 @@ use std::{ops::DerefMut, str::FromStr}; use anyhow::Context; use database_entity::{ - error::DatabaseError, AFCollabSnapshot, AFCollabSnapshots, InsertCollabParams, + database_error::DatabaseError, AFCollabSnapshot, AFCollabSnapshots, InsertCollabParams, }; use sqlx::{PgPool, Transaction}; use tracing::trace; diff --git a/libs/realtime/src/collaborate/plugin.rs b/libs/realtime/src/collaborate/plugin.rs index 0dc84d83..00cecb7b 100644 --- a/libs/realtime/src/collaborate/plugin.rs +++ b/libs/realtime/src/collaborate/plugin.rs @@ -4,7 +4,7 @@ use bytes::Bytes; use collab::core::collab::TransactionMutExt; use collab::core::origin::CollabOrigin; use collab::preclude::{CollabPlugin, Doc, TransactionMut}; -use database_entity::error::DatabaseError; +use database_entity::database_error::DatabaseError; use collab::sync_protocol::awareness::Awareness; use collab_define::CollabType; diff --git a/libs/realtime/src/error.rs b/libs/realtime/src/error.rs index 9abe452d..1d83ae99 100644 --- a/libs/realtime/src/error.rs +++ b/libs/realtime/src/error.rs @@ -1,5 +1,5 @@ use collab::error::CollabError; -use database_entity::error::DatabaseError; +use database_entity::database_error::DatabaseError; #[derive(Debug, thiserror::Error)] pub enum RealtimeError { diff --git a/libs/shared-entity/src/error.rs b/libs/shared-entity/src/app_error.rs similarity index 98% rename from libs/shared-entity/src/error.rs rename to libs/shared-entity/src/app_error.rs index d947e3dc..0d2d4b9d 100644 --- a/libs/shared-entity/src/error.rs +++ b/libs/shared-entity/src/app_error.rs @@ -1,4 +1,4 @@ -use database_entity::error::DatabaseError; +use database_entity::database_error::DatabaseError; use std::fmt::Display; use std::num::ParseIntError; use std::time::SystemTimeError; diff --git a/libs/shared-entity/src/data.rs b/libs/shared-entity/src/data.rs index bccc6b29..3df5dc65 100644 --- a/libs/shared-entity/src/data.rs +++ b/libs/shared-entity/src/data.rs @@ -1,4 +1,4 @@ -use crate::error::AppError; +use crate::app_error::AppError; use serde::{Deserialize, Serialize}; use std::borrow::Cow; diff --git a/libs/shared-entity/src/error_code.rs b/libs/shared-entity/src/error_code.rs index cb8adbd3..77cbf213 100644 --- a/libs/shared-entity/src/error_code.rs +++ b/libs/shared-entity/src/error_code.rs @@ -1,6 +1,6 @@ use serde_repr::{Deserialize_repr, Serialize_repr}; -use crate::error::AppError; +use crate::app_error::AppError; use thiserror::Error; #[derive(Clone, Copy, Debug, Error, Default, PartialEq, Eq, Serialize_repr, Deserialize_repr)] diff --git a/libs/shared-entity/src/lib.rs b/libs/shared-entity/src/lib.rs index 7ed81ae0..ef205c8b 100644 --- a/libs/shared-entity/src/lib.rs +++ b/libs/shared-entity/src/lib.rs @@ -1,6 +1,6 @@ +pub mod app_error; pub mod data; pub mod dto; -pub mod error; pub mod error_code; #[cfg(feature = "cloud")] diff --git a/nginx/nginx.conf b/nginx/nginx.conf index 4f03f249..4a95570f 100644 --- a/nginx/nginx.conf +++ b/nginx/nginx.conf @@ -18,6 +18,16 @@ http { proxy_pass http://gotrue:9999; } + # WebSocket + location /ws { + proxy_pass http://appflowy_cloud; + proxy_http_version 1.1; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "Upgrade"; + proxy_set_header Host $host; + proxy_read_timeout 86400; + } + # AppFlowy-Cloud location / { proxy_pass http://appflowy_cloud:8000; diff --git a/src/api/collaborate.rs b/src/api/collaborate.rs index 3dab26ba..f4bbeed2 100644 --- a/src/api/collaborate.rs +++ b/src/api/collaborate.rs @@ -6,13 +6,13 @@ use actix_web::web::{Data, Json}; use actix_web::Result; use actix_web::{web, Scope}; use database::collab::CollabStorage; -use database_entity::error::DatabaseError; +use database_entity::database_error::DatabaseError; use database_entity::{ AFCollabSnapshots, DeleteCollabParams, InsertCollabParams, QueryCollabParams, QueryObjectSnapshotParams, QuerySnapshotParams, RawData, }; +use shared_entity::app_error::AppError; use shared_entity::data::AppResponse; -use shared_entity::error::AppError; use shared_entity::error_code::ErrorCode; use tracing::{debug, instrument}; diff --git a/src/api/workspace.rs b/src/api/workspace.rs index 6a6ec87a..ad1a1069 100644 --- a/src/api/workspace.rs +++ b/src/api/workspace.rs @@ -1,9 +1,9 @@ use crate::biz; use crate::state::AppState; use database_entity::{AFWorkspaceMember, AFWorkspaces}; +use shared_entity::app_error::AppError; use shared_entity::data::{AppResponse, JsonAppResponse}; use shared_entity::dto::WorkspaceMembersParams; -use shared_entity::error::AppError; use sqlx::types::uuid; use crate::component::auth::jwt::UserUuid; diff --git a/src/biz/collab.rs b/src/biz/collab.rs index 342f1ede..03e6d068 100644 --- a/src/biz/collab.rs +++ b/src/biz/collab.rs @@ -6,7 +6,7 @@ use database_entity::{ AFCollabSnapshots, DeleteCollabParams, InsertCollabParams, QueryObjectSnapshotParams, QuerySnapshotParams, }; -use shared_entity::{error::AppError, error_code::ErrorCode}; +use shared_entity::{app_error::AppError, error_code::ErrorCode}; use sqlx::{types::Uuid, PgPool}; use validator::Validate; diff --git a/src/biz/file_storage.rs b/src/biz/file_storage.rs index 4d62ad6c..f37d3849 100644 --- a/src/biz/file_storage.rs +++ b/src/biz/file_storage.rs @@ -5,7 +5,7 @@ use futures_util::Stream; use bytes::Bytes; use database::{file_storage, user::uid_from_uuid}; use s3::request::{ResponseData, ResponseDataStream}; -use shared_entity::{error::AppError, error_code::ErrorCode}; +use shared_entity::{app_error::AppError, error_code::ErrorCode}; use sqlx::types::uuid; use tokio_stream::StreamExt; diff --git a/src/biz/user.rs b/src/biz/user.rs index fd447200..aefc0dc1 100644 --- a/src/biz/user.rs +++ b/src/biz/user.rs @@ -5,7 +5,7 @@ use database::{ }; use database_entity::AFUserProfileView; use gotrue::api::Client; -use shared_entity::error::AppError; +use shared_entity::app_error::AppError; use sqlx::{types::uuid, PgPool}; diff --git a/src/biz/workspace.rs b/src/biz/workspace.rs index 1bf7b44d..487b9bd3 100644 --- a/src/biz/workspace.rs +++ b/src/biz/workspace.rs @@ -3,7 +3,7 @@ use database::workspace::{ select_user_is_workspace_owner, select_workspace_members, }; use database_entity::{AFRole, AFWorkspaceMember, AFWorkspaces}; -use shared_entity::{error::AppError, error_code::ErrorCode}; +use shared_entity::{app_error::AppError, error_code::ErrorCode}; use sqlx::{types::uuid, PgPool}; pub async fn get_workspaces( diff --git a/tests/realtime/connect_test.rs b/tests/realtime/connect_test.rs index 5e99cf1e..e9e1aec0 100644 --- a/tests/realtime/connect_test.rs +++ b/tests/realtime/connect_test.rs @@ -10,7 +10,7 @@ async fn realtime_connect_test() { ping_per_secs: 6, retry_connect_per_pings: 5, }); - let mut state = ws_client.subscribe_connect_state().await; + let mut state = ws_client.subscribe_connect_state(); loop { tokio::select! { @@ -38,7 +38,7 @@ async fn realtime_disconnect_test() { .await .unwrap(); - let mut state = ws_client.subscribe_connect_state().await; + let mut state = ws_client.subscribe_connect_state(); loop { tokio::select! { _ = ws_client.disconnect() => {}, diff --git a/tests/realtime/test_client.rs b/tests/realtime/test_client.rs index 94fcaf1b..a62238e1 100644 --- a/tests/realtime/test_client.rs +++ b/tests/realtime/test_client.rs @@ -90,7 +90,7 @@ impl TestClient { #[allow(dead_code)] pub(crate) async fn wait_ws_connected(&self) { - let mut connect_state = self.ws_client.subscribe_connect_state().await; + let mut connect_state = self.ws_client.subscribe_connect_state(); const TIMEOUT_DURATION: Duration = Duration::from_secs(10); while let Ok(Ok(state)) = timeout(TIMEOUT_DURATION, connect_state.recv()).await { @@ -124,13 +124,12 @@ impl TestClient { let handler = self .ws_client .subscribe(BusinessID::CollabId, object_id.to_string()) - .await .unwrap(); let (sink, stream) = (handler.sink(), handler.stream()); let origin = CollabOrigin::Client(CollabClient::new(uid, self.device_id.clone())); let collab = Arc::new(MutexCollab::new(origin.clone(), object_id, vec![])); - let ws_connect_state = self.ws_client.subscribe_connect_state().await; + let ws_connect_state = self.ws_client.subscribe_connect_state(); let object = SyncObject::new(object_id, workspace_id, collab_type, &self.device_id); let sync_plugin = SyncPlugin::new( origin.clone(),