feat: websocket config (#99)

* chore: enable tls feature

* chore: update ws client

* chore: update ws client

* chore: expost database entities

* chore: update ws nginx config

* chore: rename error file

* chore: fix clippy
This commit is contained in:
Nathan.fooo 2023-10-05 17:43:50 +08:00 committed by GitHub
parent 4a279bc108
commit d3186cc07a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 54 additions and 40 deletions

3
Cargo.lock generated
View File

@ -4018,7 +4018,9 @@ checksum = "212d5dcb2a1ce06d81107c3d0ffa3121fe974b73f068c8282cb1c32328113b6c"
dependencies = [
"futures-util",
"log",
"native-tls",
"tokio",
"tokio-native-tls",
"tungstenite",
]
@ -4166,6 +4168,7 @@ dependencies = [
"http",
"httparse",
"log",
"native-tls",
"rand 0.8.5",
"sha1",
"thiserror",

View File

@ -23,7 +23,7 @@ mime = "0.3.17"
tracing = { version = "0.1" }
thiserror = "1.0.39"
serde = { version = "1.0", features = ["derive"] }
tokio-tungstenite = { version = "0.20.1" }
tokio-tungstenite = { version = "0.20.1", features = ["native-tls"] }
tokio = { version = "1.26", features = ["full"] }
futures-util = "0.3.26"
futures-core = "0.3.26"

View File

@ -27,7 +27,7 @@ use crate::notify::{ClientToken, TokenStateReceiver};
use database_entity::{AFUserProfileView, AFWorkspaceMember, InsertCollabParams};
use database_entity::{AFWorkspaces, QueryCollabParams};
use database_entity::{DeleteCollabParams, RawData};
use shared_entity::error::AppError;
use shared_entity::app_error::AppError;
use shared_entity::error_code::url_missing_param;
use shared_entity::error_code::ErrorCode;

View File

@ -9,11 +9,13 @@ pub mod ws;
pub use http::*;
pub mod error {
pub use shared_entity::error::AppError;
pub use shared_entity::app_error::AppError;
pub use shared_entity::error_code::ErrorCode;
}
// Export all entities that will be used in the frontend application
pub mod entity {
pub use database_entity::*;
pub use gotrue_entity::*;
pub use shared_entity::*;
}

View File

@ -1,6 +1,7 @@
use futures_util::{SinkExt, StreamExt};
use std::borrow::Cow;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::{Arc, Weak};
@ -12,7 +13,7 @@ use crate::ws::state::{ConnectState, ConnectStateNotify};
use crate::ws::{BusinessID, ClientRealtimeMessage, WSError, WebSocketChannel};
use tokio::sync::broadcast::{channel, Receiver, Sender};
use tokio::sync::{oneshot, Mutex, RwLock};
use tokio::sync::{oneshot, Mutex};
use tokio_retry::strategy::FixedInterval;
use tokio_retry::{Condition, RetryIf};
use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode;
@ -43,10 +44,11 @@ impl Default for WSClientConfig {
type ChannelByObjectId = HashMap<String, Weak<WebSocketChannel>>;
pub type WSConnectStateReceiver = Receiver<ConnectState>;
pub struct WSClient {
addr: Arc<parking_lot::Mutex<Option<String>>>,
config: WSClientConfig,
state_notify: Arc<Mutex<ConnectStateNotify>>,
state_notify: Arc<parking_lot::Mutex<ConnectStateNotify>>,
/// Sender used to send messages to the websocket.
sender: Sender<Message>,
channels: Arc<RwLock<HashMap<BusinessID, ChannelByObjectId>>>,
@ -57,13 +59,13 @@ pub struct WSClient {
impl WSClient {
pub fn new(config: WSClientConfig) -> Self {
let (sender, _) = channel(config.buffer_capacity);
let state = Arc::new(Mutex::new(ConnectStateNotify::new()));
let state_notify = Arc::new(parking_lot::Mutex::new(ConnectStateNotify::new()));
let channels = Arc::new(RwLock::new(HashMap::new()));
let ping = Arc::new(Mutex::new(None));
WSClient {
addr: Arc::new(parking_lot::Mutex::new(None)),
config,
state_notify: state,
state_notify,
sender,
channels,
ping,
@ -118,7 +120,6 @@ impl WSClient {
if let Some(channels) = weak_channels.upgrade() {
if let Some(channel) = channels
.read()
.await
.get(&msg.business_id)
.and_then(|map| map.get(&msg.object_id))
{
@ -181,7 +182,7 @@ impl WSClient {
/// Return a [WebSocketChannel] that can be used to send messages to the websocket. Caller should
/// keep the channel alive as long as it wants to receive messages from the websocket.
pub async fn subscribe(
pub fn subscribe(
&self,
business_id: BusinessID,
object_id: String,
@ -190,19 +191,18 @@ impl WSClient {
self
.channels
.write()
.await
.entry(business_id)
.or_insert_with(HashMap::new)
.insert(object_id, Arc::downgrade(&channel));
Ok(channel)
}
pub async fn subscribe_connect_state(&self) -> WSConnectStateReceiver {
self.state_notify.lock().await.subscribe()
pub fn subscribe_connect_state(&self) -> WSConnectStateReceiver {
self.state_notify.lock().subscribe()
}
pub async fn is_connected(&self) -> bool {
self.state_notify.lock().await.state.is_connected()
pub fn is_connected(&self) -> bool {
self.state_notify.lock().state.is_connected()
}
pub async fn disconnect(&self) {
@ -222,7 +222,7 @@ impl WSClient {
async fn set_state(&self, state: ConnectState) {
trace!("websocket state: {:?}", state);
self.state_notify.lock().await.set_state(state);
self.state_notify.lock().set_state(state);
}
}

View File

@ -11,7 +11,7 @@ pub(crate) struct ServerFixIntervalPing {
#[allow(dead_code)]
stop_tx: tokio::sync::mpsc::Sender<()>,
stop_rx: Option<tokio::sync::mpsc::Receiver<()>>,
state: Arc<Mutex<ConnectStateNotify>>,
state: Arc<parking_lot::Mutex<ConnectStateNotify>>,
ping_count: Arc<Mutex<u32>>,
maximum_ping_count: u32,
}
@ -19,7 +19,7 @@ pub(crate) struct ServerFixIntervalPing {
impl ServerFixIntervalPing {
pub(crate) fn new(
duration: Duration,
state: Arc<Mutex<ConnectStateNotify>>,
state: Arc<parking_lot::Mutex<ConnectStateNotify>>,
sender: Sender<Message>,
maximum_ping_count: u32,
) -> Self {
@ -58,7 +58,7 @@ impl ServerFixIntervalPing {
let mut lock = ping_count.lock().await;
if *lock >= reconnect_per_ping {
if let Some(state) =weak_state.upgrade() {
state.lock().await.set_state(ConnectState::PingTimeout);
state.lock().set_state(ConnectState::PingTimeout);
}
} else {
*lock +=1;
@ -73,7 +73,7 @@ impl ServerFixIntervalPing {
*lock = 0;
if let Some(state) =weak_state.upgrade() {
state.lock().await.set_state(ConnectState::Connected);
state.lock().set_state(ConnectState::Connected);
}
}
}

View File

@ -6,7 +6,7 @@ use std::ops::Deref;
use validator::{Validate, ValidationError};
pub type RawData = Vec<u8>;
pub mod error;
pub mod database_error;
#[derive(Debug, Clone, Validate, Serialize, Deserialize)]
pub struct InsertCollabParams {

View File

@ -3,7 +3,7 @@ use anyhow::Context;
use async_trait::async_trait;
use collab::core::collab::MutexCollab;
use collab_define::CollabType;
use database_entity::error::DatabaseError;
use database_entity::database_error::DatabaseError;
use database_entity::{
AFCollabSnapshots, InsertCollabParams, InsertSnapshotParams, QueryCollabParams,
QueryObjectSnapshotParams, QuerySnapshotParams, RawData,

View File

@ -3,7 +3,7 @@ use std::{ops::DerefMut, str::FromStr};
use anyhow::Context;
use database_entity::{
error::DatabaseError, AFCollabSnapshot, AFCollabSnapshots, InsertCollabParams,
database_error::DatabaseError, AFCollabSnapshot, AFCollabSnapshots, InsertCollabParams,
};
use sqlx::{PgPool, Transaction};
use tracing::trace;

View File

@ -4,7 +4,7 @@ use bytes::Bytes;
use collab::core::collab::TransactionMutExt;
use collab::core::origin::CollabOrigin;
use collab::preclude::{CollabPlugin, Doc, TransactionMut};
use database_entity::error::DatabaseError;
use database_entity::database_error::DatabaseError;
use collab::sync_protocol::awareness::Awareness;
use collab_define::CollabType;

View File

@ -1,5 +1,5 @@
use collab::error::CollabError;
use database_entity::error::DatabaseError;
use database_entity::database_error::DatabaseError;
#[derive(Debug, thiserror::Error)]
pub enum RealtimeError {

View File

@ -1,4 +1,4 @@
use database_entity::error::DatabaseError;
use database_entity::database_error::DatabaseError;
use std::fmt::Display;
use std::num::ParseIntError;
use std::time::SystemTimeError;

View File

@ -1,4 +1,4 @@
use crate::error::AppError;
use crate::app_error::AppError;
use serde::{Deserialize, Serialize};
use std::borrow::Cow;

View File

@ -1,6 +1,6 @@
use serde_repr::{Deserialize_repr, Serialize_repr};
use crate::error::AppError;
use crate::app_error::AppError;
use thiserror::Error;
#[derive(Clone, Copy, Debug, Error, Default, PartialEq, Eq, Serialize_repr, Deserialize_repr)]

View File

@ -1,6 +1,6 @@
pub mod app_error;
pub mod data;
pub mod dto;
pub mod error;
pub mod error_code;
#[cfg(feature = "cloud")]

View File

@ -18,6 +18,16 @@ http {
proxy_pass http://gotrue:9999;
}
# WebSocket
location /ws {
proxy_pass http://appflowy_cloud;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";
proxy_set_header Host $host;
proxy_read_timeout 86400;
}
# AppFlowy-Cloud
location / {
proxy_pass http://appflowy_cloud:8000;

View File

@ -6,13 +6,13 @@ use actix_web::web::{Data, Json};
use actix_web::Result;
use actix_web::{web, Scope};
use database::collab::CollabStorage;
use database_entity::error::DatabaseError;
use database_entity::database_error::DatabaseError;
use database_entity::{
AFCollabSnapshots, DeleteCollabParams, InsertCollabParams, QueryCollabParams,
QueryObjectSnapshotParams, QuerySnapshotParams, RawData,
};
use shared_entity::app_error::AppError;
use shared_entity::data::AppResponse;
use shared_entity::error::AppError;
use shared_entity::error_code::ErrorCode;
use tracing::{debug, instrument};

View File

@ -1,9 +1,9 @@
use crate::biz;
use crate::state::AppState;
use database_entity::{AFWorkspaceMember, AFWorkspaces};
use shared_entity::app_error::AppError;
use shared_entity::data::{AppResponse, JsonAppResponse};
use shared_entity::dto::WorkspaceMembersParams;
use shared_entity::error::AppError;
use sqlx::types::uuid;
use crate::component::auth::jwt::UserUuid;

View File

@ -6,7 +6,7 @@ use database_entity::{
AFCollabSnapshots, DeleteCollabParams, InsertCollabParams, QueryObjectSnapshotParams,
QuerySnapshotParams,
};
use shared_entity::{error::AppError, error_code::ErrorCode};
use shared_entity::{app_error::AppError, error_code::ErrorCode};
use sqlx::{types::Uuid, PgPool};
use validator::Validate;

View File

@ -5,7 +5,7 @@ use futures_util::Stream;
use bytes::Bytes;
use database::{file_storage, user::uid_from_uuid};
use s3::request::{ResponseData, ResponseDataStream};
use shared_entity::{error::AppError, error_code::ErrorCode};
use shared_entity::{app_error::AppError, error_code::ErrorCode};
use sqlx::types::uuid;
use tokio_stream::StreamExt;

View File

@ -5,7 +5,7 @@ use database::{
};
use database_entity::AFUserProfileView;
use gotrue::api::Client;
use shared_entity::error::AppError;
use shared_entity::app_error::AppError;
use sqlx::{types::uuid, PgPool};

View File

@ -3,7 +3,7 @@ use database::workspace::{
select_user_is_workspace_owner, select_workspace_members,
};
use database_entity::{AFRole, AFWorkspaceMember, AFWorkspaces};
use shared_entity::{error::AppError, error_code::ErrorCode};
use shared_entity::{app_error::AppError, error_code::ErrorCode};
use sqlx::{types::uuid, PgPool};
pub async fn get_workspaces(

View File

@ -10,7 +10,7 @@ async fn realtime_connect_test() {
ping_per_secs: 6,
retry_connect_per_pings: 5,
});
let mut state = ws_client.subscribe_connect_state().await;
let mut state = ws_client.subscribe_connect_state();
loop {
tokio::select! {
@ -38,7 +38,7 @@ async fn realtime_disconnect_test() {
.await
.unwrap();
let mut state = ws_client.subscribe_connect_state().await;
let mut state = ws_client.subscribe_connect_state();
loop {
tokio::select! {
_ = ws_client.disconnect() => {},

View File

@ -90,7 +90,7 @@ impl TestClient {
#[allow(dead_code)]
pub(crate) async fn wait_ws_connected(&self) {
let mut connect_state = self.ws_client.subscribe_connect_state().await;
let mut connect_state = self.ws_client.subscribe_connect_state();
const TIMEOUT_DURATION: Duration = Duration::from_secs(10);
while let Ok(Ok(state)) = timeout(TIMEOUT_DURATION, connect_state.recv()).await {
@ -124,13 +124,12 @@ impl TestClient {
let handler = self
.ws_client
.subscribe(BusinessID::CollabId, object_id.to_string())
.await
.unwrap();
let (sink, stream) = (handler.sink(), handler.stream());
let origin = CollabOrigin::Client(CollabClient::new(uid, self.device_id.clone()));
let collab = Arc::new(MutexCollab::new(origin.clone(), object_id, vec![]));
let ws_connect_state = self.ws_client.subscribe_connect_state().await;
let ws_connect_state = self.ws_client.subscribe_connect_state();
let object = SyncObject::new(object_id, workspace_id, collab_type, &self.device_id);
let sync_plugin = SyncPlugin::new(
origin.clone(),