diff --git a/Cargo.lock b/Cargo.lock index 912063c5..2dfbd69d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -545,6 +545,7 @@ dependencies = [ "token", "tokio", "tokio-stream", + "tokio-tungstenite", "tokio-util", "tracing", "tracing-actix-web", diff --git a/Cargo.toml b/Cargo.toml index 9d31d82c..088efec1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -93,6 +93,7 @@ scraper = "0.17.1" client-api = { path = "libs/client-api", features = ["collab-sync"] } opener = "0.6.1" image = "0.23.14" +tokio-tungstenite = { version = "0.20.1", features = ["native-tls"] } [[bin]] name = "appflowy_cloud" diff --git a/libs/app_error/src/lib.rs b/libs/app_error/src/lib.rs index 836790bb..52731b1b 100644 --- a/libs/app_error/src/lib.rs +++ b/libs/app_error/src/lib.rs @@ -218,16 +218,16 @@ impl ErrorCode { } #[derive(Serialize)] -struct AFErrorSerde { +struct AppErrorSerde { code: ErrorCode, - msg: String, + message: String, } -impl From<&AppError> for AFErrorSerde { +impl From<&AppError> for AppErrorSerde { fn from(value: &AppError) -> Self { Self { code: value.code(), - msg: value.to_string(), + message: value.to_string(), } } } @@ -239,6 +239,6 @@ impl actix_web::error::ResponseError for AppError { } fn error_response(&self) -> actix_web::HttpResponse { - actix_web::HttpResponse::Ok().json(AFErrorSerde::from(self)) + actix_web::HttpResponse::Ok().json(AppErrorSerde::from(self)) } } diff --git a/libs/client-api/src/collab_sync/sink.rs b/libs/client-api/src/collab_sync/sink.rs index befe5b14..8254c1e6 100644 --- a/libs/client-api/src/collab_sync/sink.rs +++ b/libs/client-api/src/collab_sync/sink.rs @@ -13,7 +13,7 @@ use realtime_entity::collab_msg::{CollabSinkMessage, MsgId}; use tokio::spawn; use tokio::sync::{mpsc, oneshot, watch, Mutex}; use tokio::time::{interval, Instant, Interval}; -use tracing::{debug, error, trace, warn}; +use tracing::{debug, error, event, trace, warn}; #[derive(Clone, Debug)] pub enum SinkState { @@ -251,6 +251,12 @@ where while let Some(pending_msg) = pending_msg_queue.pop() { if !sending_msg.merge(pending_msg, &self.config.maximum_payload_size) { break; + } else { + event!( + tracing::Level::TRACE, + "Did merge message: {}", + sending_msg.get_msg() + ); } } } diff --git a/libs/client-api/src/http.rs b/libs/client-api/src/http.rs index 04568ef3..0df39367 100644 --- a/libs/client-api/src/http.rs +++ b/libs/client-api/src/http.rs @@ -79,9 +79,9 @@ impl Client { } #[instrument(level = "debug", skip_all, err)] - pub fn restore_token(&self, token: &str) -> Result<(), AppError> { + pub fn restore_token(&self, token: &str) -> Result<(), AppResponseError> { if token.is_empty() { - return Err(AppError::OAuthError("Empty token".to_string())); + return Err(AppError::OAuthError("Empty token".to_string()).into()); } let token = serde_json::from_str::(token)?; self.token.write().set(token); @@ -95,12 +95,12 @@ impl Client { /// string representation of the access token. If the lock cannot be acquired or /// the token is not present, an error is returned. #[instrument(level = "debug", skip_all, err)] - pub fn get_token(&self) -> Result { + pub fn get_token(&self) -> Result { let token_str = self .token .read() .try_get() - .map_err(|err| AppError::OAuthError(err.to_string()))?; + .map_err(|err| AppResponseError::from(AppError::OAuthError(err.to_string())))?; Ok(token_str) } @@ -174,7 +174,7 @@ impl Client { /// /// # Returns /// - `Ok(String)`: A `String` containing the constructed authorization URL if the specified provider is available. - /// - `Err(AppError)`: An `AppError` indicating either the OAuth provider is invalid or other issues occurred while fetching settings. + /// - `Err(AppResponseError)`: An `AppResponseError` indicating either the OAuth provider is invalid or other issues occurred while fetching settings. /// #[instrument(level = "debug", skip_all, err)] pub async fn generate_oauth_url_with_provider( @@ -340,13 +340,15 @@ impl Client { /// - `Err(AppError)`: An `AppError` indicating either an inability to read the token or that the user is not logged in. /// #[inline] - pub fn token_expires_at(&self) -> Result { + pub fn token_expires_at(&self) -> Result { match &self.token.try_read() { - None => Err(AppError::Unhandled("Failed to read token".to_string())), + None => Err(AppError::Unhandled("Failed to read token".to_string()).into()), Some(token) => Ok( token .as_ref() - .ok_or(AppError::NotLoggedIn("fail to get expires_at".to_string()))? + .ok_or(AppResponseError::from(AppError::NotLoggedIn( + "fail to get expires_at".to_string(), + )))? .expires_at, ), } @@ -358,17 +360,17 @@ impl Client { /// /// # Returns /// - `Ok(String)`: A `String` containing the access token. - /// - `Err(AppError)`: An `AppError` indicating either an inability to read the token or that the user is not logged in. + /// - `Err(AppResponseError)`: An `AppResponseError` indicating either an inability to read the token or that the user is not logged in. /// - pub fn access_token(&self) -> Result { + pub fn access_token(&self) -> Result { match &self.token.try_read_for(Duration::from_secs(2)) { - None => Err(AppError::Unhandled("Failed to read token".to_string())), + None => Err(AppError::Unhandled("Failed to read token".to_string()).into()), Some(token) => Ok( token .as_ref() - .ok_or(AppError::NotLoggedIn( + .ok_or(AppResponseError::from(AppError::NotLoggedIn( "fail to get access token. Token is empty".to_string(), - ))? + )))? .access_token .clone(), ), @@ -562,7 +564,6 @@ impl Client { }, Err(err) => { event!(tracing::Level::ERROR, "refresh token failed: {}", err); - self.token.write().unset(); Err(AppResponseError::from(err)) }, } diff --git a/libs/client-api/src/lib.rs b/libs/client-api/src/lib.rs index dd3e7a01..3b3f92f3 100644 --- a/libs/client-api/src/lib.rs +++ b/libs/client-api/src/lib.rs @@ -10,6 +10,7 @@ pub use http::*; pub mod error { pub use shared_entity::response::AppResponseError; + pub use shared_entity::response::ErrorCode; } // Export all dto entities that will be used in the frontend application diff --git a/libs/client-api/src/ws/client.rs b/libs/client-api/src/ws/client.rs index 288970db..04933872 100644 --- a/libs/client-api/src/ws/client.rs +++ b/libs/client-api/src/ws/client.rs @@ -234,6 +234,15 @@ impl WSClient { } } + pub fn send>(&self, msg: M) -> Result<(), WSError> { + self.sender.send(msg.into()).unwrap(); + Ok(()) + } + + pub fn sender(&self) -> Sender { + self.sender.clone() + } + async fn set_state(&self, state: ConnectState) { self.state_notify.lock().set_state(state); } @@ -246,7 +255,7 @@ struct RetryCondition { impl Condition for RetryCondition { fn should_retry(&mut self, error: &WSError) -> bool { if let WSError::AuthError(err) = error { - debug!("WSClient auth error: {}, stop retry connn", err); + debug!("WSClient auth error: {}, stop retry connect", err); return false; } diff --git a/libs/client-api/src/ws/retry.rs b/libs/client-api/src/ws/retry.rs index 131f6698..51c4eeb8 100644 --- a/libs/client-api/src/ws/retry.rs +++ b/libs/client-api/src/ws/retry.rs @@ -4,7 +4,8 @@ use std::pin::Pin; use crate::ws::WSError; use tokio::net::TcpStream; use tokio_retry::Action; -use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream}; +use tokio_tungstenite::tungstenite::protocol::WebSocketConfig; +use tokio_tungstenite::{connect_async_with_config, MaybeTlsStream, WebSocketStream}; use tracing::{error, info}; pub(crate) struct ConnectAction { @@ -26,7 +27,17 @@ impl Action for ConnectAction { let cloned_addr = self.addr.clone(); Box::pin(async move { info!("🔵websocket start connecting: {}", cloned_addr); - match connect_async(&cloned_addr).await { + match connect_async_with_config( + &cloned_addr, + Some(WebSocketConfig { + max_message_size: Some(65_536), // 64KB + max_frame_size: Some(65_536), // 64KB + ..WebSocketConfig::default() + }), + false, + ) + .await + { Ok((stream, _response)) => { info!("🟢websocket connect success"); Ok(stream) diff --git a/libs/shared-entity/src/response.rs b/libs/shared-entity/src/response.rs index 4e7667ec..adf9c8e3 100644 --- a/libs/shared-entity/src/response.rs +++ b/libs/shared-entity/src/response.rs @@ -1,7 +1,8 @@ use serde::{Deserialize, Serialize}; use std::borrow::Cow; -use app_error::{AppError, ErrorCode}; +use app_error::AppError; +pub use app_error::ErrorCode; use std::fmt::{Debug, Display}; #[cfg(feature = "cloud")] diff --git a/src/api/ws.rs b/src/api/ws.rs index 8ab34c72..92657e53 100644 --- a/src/api/ws.rs +++ b/src/api/ws.rs @@ -51,7 +51,7 @@ pub async fn establish_ws_connection( ); match ws::WsResponseBuilder::new(client, &request, payload) - .frame_size(MAX_FRAME_SIZE) + .frame_size(MAX_FRAME_SIZE * 10) .start() { Ok(response) => Ok(response), diff --git a/src/biz/collab/access_control.rs b/src/biz/collab/access_control.rs index 1e3ca77b..9d4c87cb 100644 --- a/src/biz/collab/access_control.rs +++ b/src/biz/collab/access_control.rs @@ -282,7 +282,7 @@ where oid: &str, user_uuid: &Uuid, method: Method, - _path: Path, + _path: &Path, ) -> Result<(), AppError> { let can_access = self .0 diff --git a/src/middleware/access_control_mw.rs b/src/middleware/access_control_mw.rs index c39a637e..c123c7cc 100644 --- a/src/middleware/access_control_mw.rs +++ b/src/middleware/access_control_mw.rs @@ -50,7 +50,7 @@ pub trait HttpAccessControlService: Send + Sync { oid: &str, user_uuid: &Uuid, method: Method, - path: Path, + path: &Path, ) -> Result<(), AppError> { Ok(()) } @@ -82,7 +82,7 @@ where oid: &str, user_uuid: &Uuid, method: Method, - path: Path, + path: &Path, ) -> Result<(), AppError> { self .as_ref() @@ -211,7 +211,11 @@ where .check_workspace_permission(&workspace_id, &user_uuid, method.clone()) .await { - error!("workspace access control: {:?}", err); + error!( + "workspace access control: {}, with path:{}", + err, + path.as_str() + ); return Err(Error::from(err)); } }; @@ -221,10 +225,14 @@ where if let Some(collab_object_id) = collab_object_id { if let Some(acs) = services.get(&AccessResource::Collab) { if let Err(err) = acs - .check_collab_permission(&collab_object_id, &user_uuid, method, path) + .check_collab_permission(&collab_object_id, &user_uuid, method, &path) .await { - error!("collab access control: {:?}", err); + error!( + "collab access control: {:?}, with path:{}", + err, + path.as_str() + ); return Err(Error::from(err)); } }; diff --git a/tests/websocket/connect.rs b/tests/websocket/connect.rs index e9e1aec0..7ec5f7ea 100644 --- a/tests/websocket/connect.rs +++ b/tests/websocket/connect.rs @@ -1,6 +1,5 @@ -use client_api::ws::{ConnectState, WSClient, WSClientConfig}; - use crate::user::utils::generate_unique_registered_user_client; +use client_api::ws::{ConnectState, WSClient, WSClientConfig}; #[tokio::test] async fn realtime_connect_test() {