fix: fix some bugs (#145)

* chore: update

* chore: pub error code

* chore: update

* chore: update

* chore: update

* chore: update

* chore: update

* chore: update max frame size

* chore: update max frame size

* chore: ws buffer size

* chore: update
This commit is contained in:
Nathan.fooo 2023-11-03 14:15:08 +08:00 committed by GitHub
parent 9c911a3a94
commit 937e3bd9b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 71 additions and 33 deletions

1
Cargo.lock generated
View File

@ -545,6 +545,7 @@ dependencies = [
"token",
"tokio",
"tokio-stream",
"tokio-tungstenite",
"tokio-util",
"tracing",
"tracing-actix-web",

View File

@ -93,6 +93,7 @@ scraper = "0.17.1"
client-api = { path = "libs/client-api", features = ["collab-sync"] }
opener = "0.6.1"
image = "0.23.14"
tokio-tungstenite = { version = "0.20.1", features = ["native-tls"] }
[[bin]]
name = "appflowy_cloud"

View File

@ -218,16 +218,16 @@ impl ErrorCode {
}
#[derive(Serialize)]
struct AFErrorSerde {
struct AppErrorSerde {
code: ErrorCode,
msg: String,
message: String,
}
impl From<&AppError> for AFErrorSerde {
impl From<&AppError> for AppErrorSerde {
fn from(value: &AppError) -> Self {
Self {
code: value.code(),
msg: value.to_string(),
message: value.to_string(),
}
}
}
@ -239,6 +239,6 @@ impl actix_web::error::ResponseError for AppError {
}
fn error_response(&self) -> actix_web::HttpResponse {
actix_web::HttpResponse::Ok().json(AFErrorSerde::from(self))
actix_web::HttpResponse::Ok().json(AppErrorSerde::from(self))
}
}

View File

@ -13,7 +13,7 @@ use realtime_entity::collab_msg::{CollabSinkMessage, MsgId};
use tokio::spawn;
use tokio::sync::{mpsc, oneshot, watch, Mutex};
use tokio::time::{interval, Instant, Interval};
use tracing::{debug, error, trace, warn};
use tracing::{debug, error, event, trace, warn};
#[derive(Clone, Debug)]
pub enum SinkState {
@ -251,6 +251,12 @@ where
while let Some(pending_msg) = pending_msg_queue.pop() {
if !sending_msg.merge(pending_msg, &self.config.maximum_payload_size) {
break;
} else {
event!(
tracing::Level::TRACE,
"Did merge message: {}",
sending_msg.get_msg()
);
}
}
}

View File

@ -79,9 +79,9 @@ impl Client {
}
#[instrument(level = "debug", skip_all, err)]
pub fn restore_token(&self, token: &str) -> Result<(), AppError> {
pub fn restore_token(&self, token: &str) -> Result<(), AppResponseError> {
if token.is_empty() {
return Err(AppError::OAuthError("Empty token".to_string()));
return Err(AppError::OAuthError("Empty token".to_string()).into());
}
let token = serde_json::from_str::<AccessTokenResponse>(token)?;
self.token.write().set(token);
@ -95,12 +95,12 @@ impl Client {
/// string representation of the access token. If the lock cannot be acquired or
/// the token is not present, an error is returned.
#[instrument(level = "debug", skip_all, err)]
pub fn get_token(&self) -> Result<String, AppError> {
pub fn get_token(&self) -> Result<String, AppResponseError> {
let token_str = self
.token
.read()
.try_get()
.map_err(|err| AppError::OAuthError(err.to_string()))?;
.map_err(|err| AppResponseError::from(AppError::OAuthError(err.to_string())))?;
Ok(token_str)
}
@ -174,7 +174,7 @@ impl Client {
///
/// # Returns
/// - `Ok(String)`: A `String` containing the constructed authorization URL if the specified provider is available.
/// - `Err(AppError)`: An `AppError` indicating either the OAuth provider is invalid or other issues occurred while fetching settings.
/// - `Err(AppResponseError)`: An `AppResponseError` indicating either the OAuth provider is invalid or other issues occurred while fetching settings.
///
#[instrument(level = "debug", skip_all, err)]
pub async fn generate_oauth_url_with_provider(
@ -340,13 +340,15 @@ impl Client {
/// - `Err(AppError)`: An `AppError` indicating either an inability to read the token or that the user is not logged in.
///
#[inline]
pub fn token_expires_at(&self) -> Result<i64, AppError> {
pub fn token_expires_at(&self) -> Result<i64, AppResponseError> {
match &self.token.try_read() {
None => Err(AppError::Unhandled("Failed to read token".to_string())),
None => Err(AppError::Unhandled("Failed to read token".to_string()).into()),
Some(token) => Ok(
token
.as_ref()
.ok_or(AppError::NotLoggedIn("fail to get expires_at".to_string()))?
.ok_or(AppResponseError::from(AppError::NotLoggedIn(
"fail to get expires_at".to_string(),
)))?
.expires_at,
),
}
@ -358,17 +360,17 @@ impl Client {
///
/// # Returns
/// - `Ok(String)`: A `String` containing the access token.
/// - `Err(AppError)`: An `AppError` indicating either an inability to read the token or that the user is not logged in.
/// - `Err(AppResponseError)`: An `AppResponseError` indicating either an inability to read the token or that the user is not logged in.
///
pub fn access_token(&self) -> Result<String, AppError> {
pub fn access_token(&self) -> Result<String, AppResponseError> {
match &self.token.try_read_for(Duration::from_secs(2)) {
None => Err(AppError::Unhandled("Failed to read token".to_string())),
None => Err(AppError::Unhandled("Failed to read token".to_string()).into()),
Some(token) => Ok(
token
.as_ref()
.ok_or(AppError::NotLoggedIn(
.ok_or(AppResponseError::from(AppError::NotLoggedIn(
"fail to get access token. Token is empty".to_string(),
))?
)))?
.access_token
.clone(),
),
@ -562,7 +564,6 @@ impl Client {
},
Err(err) => {
event!(tracing::Level::ERROR, "refresh token failed: {}", err);
self.token.write().unset();
Err(AppResponseError::from(err))
},
}

View File

@ -10,6 +10,7 @@ pub use http::*;
pub mod error {
pub use shared_entity::response::AppResponseError;
pub use shared_entity::response::ErrorCode;
}
// Export all dto entities that will be used in the frontend application

View File

@ -234,6 +234,15 @@ impl WSClient {
}
}
pub fn send<M: Into<Message>>(&self, msg: M) -> Result<(), WSError> {
self.sender.send(msg.into()).unwrap();
Ok(())
}
pub fn sender(&self) -> Sender<Message> {
self.sender.clone()
}
async fn set_state(&self, state: ConnectState) {
self.state_notify.lock().set_state(state);
}
@ -246,7 +255,7 @@ struct RetryCondition {
impl Condition<WSError> for RetryCondition {
fn should_retry(&mut self, error: &WSError) -> bool {
if let WSError::AuthError(err) = error {
debug!("WSClient auth error: {}, stop retry connn", err);
debug!("WSClient auth error: {}, stop retry connect", err);
return false;
}

View File

@ -4,7 +4,8 @@ use std::pin::Pin;
use crate::ws::WSError;
use tokio::net::TcpStream;
use tokio_retry::Action;
use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
use tokio_tungstenite::tungstenite::protocol::WebSocketConfig;
use tokio_tungstenite::{connect_async_with_config, MaybeTlsStream, WebSocketStream};
use tracing::{error, info};
pub(crate) struct ConnectAction {
@ -26,7 +27,17 @@ impl Action for ConnectAction {
let cloned_addr = self.addr.clone();
Box::pin(async move {
info!("🔵websocket start connecting: {}", cloned_addr);
match connect_async(&cloned_addr).await {
match connect_async_with_config(
&cloned_addr,
Some(WebSocketConfig {
max_message_size: Some(65_536), // 64KB
max_frame_size: Some(65_536), // 64KB
..WebSocketConfig::default()
}),
false,
)
.await
{
Ok((stream, _response)) => {
info!("🟢websocket connect success");
Ok(stream)

View File

@ -1,7 +1,8 @@
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use app_error::{AppError, ErrorCode};
use app_error::AppError;
pub use app_error::ErrorCode;
use std::fmt::{Debug, Display};
#[cfg(feature = "cloud")]

View File

@ -51,7 +51,7 @@ pub async fn establish_ws_connection(
);
match ws::WsResponseBuilder::new(client, &request, payload)
.frame_size(MAX_FRAME_SIZE)
.frame_size(MAX_FRAME_SIZE * 10)
.start()
{
Ok(response) => Ok(response),

View File

@ -282,7 +282,7 @@ where
oid: &str,
user_uuid: &Uuid,
method: Method,
_path: Path<Url>,
_path: &Path<Url>,
) -> Result<(), AppError> {
let can_access = self
.0

View File

@ -50,7 +50,7 @@ pub trait HttpAccessControlService: Send + Sync {
oid: &str,
user_uuid: &Uuid,
method: Method,
path: Path<Url>,
path: &Path<Url>,
) -> Result<(), AppError> {
Ok(())
}
@ -82,7 +82,7 @@ where
oid: &str,
user_uuid: &Uuid,
method: Method,
path: Path<Url>,
path: &Path<Url>,
) -> Result<(), AppError> {
self
.as_ref()
@ -211,7 +211,11 @@ where
.check_workspace_permission(&workspace_id, &user_uuid, method.clone())
.await
{
error!("workspace access control: {:?}", err);
error!(
"workspace access control: {}, with path:{}",
err,
path.as_str()
);
return Err(Error::from(err));
}
};
@ -221,10 +225,14 @@ where
if let Some(collab_object_id) = collab_object_id {
if let Some(acs) = services.get(&AccessResource::Collab) {
if let Err(err) = acs
.check_collab_permission(&collab_object_id, &user_uuid, method, path)
.check_collab_permission(&collab_object_id, &user_uuid, method, &path)
.await
{
error!("collab access control: {:?}", err);
error!(
"collab access control: {:?}, with path:{}",
err,
path.as_str()
);
return Err(Error::from(err));
}
};

View File

@ -1,6 +1,5 @@
use client_api::ws::{ConnectState, WSClient, WSClientConfig};
use crate::user::utils::generate_unique_registered_user_client;
use client_api::ws::{ConnectState, WSClient, WSClientConfig};
#[tokio::test]
async fn realtime_connect_test() {