diff --git a/.github/workflows/client_api_check.yml b/.github/workflows/client_api_check.yml new file mode 100644 index 00000000..8f1c5baa --- /dev/null +++ b/.github/workflows/client_api_check.yml @@ -0,0 +1,25 @@ +name: ClientAPI Check + +on: + push: + branches: [ main ] + pull_request: + types: [ opened, synchronize, reopened ] + branches: [ main ] + +jobs: + test: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + - uses: dtolnay/rust-toolchain@stable + - uses: Swatinem/rust-cache@v2 + with: + workspaces: | + AppFlowy-Cloud + + - name: Check + working-directory: ./libs/client-api + run: cargo check + diff --git a/Cargo.lock b/Cargo.lock index 6f2c5635..8765c229 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -719,6 +719,9 @@ name = "bytes" version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" +dependencies = [ + "serde", +] [[package]] name = "bytestring" @@ -2915,6 +2918,7 @@ dependencies = [ "sqlx", "thiserror", "url", + "uuid", "validator", ] @@ -3514,9 +3518,9 @@ dependencies = [ [[package]] name = "tokio-tungstenite" -version = "0.18.0" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54319c93411147bced34cb5609a80e0a8e44c5999c93903a81cd866630ec0bfd" +checksum = "2b2dbec703c26b00d74844519606ef15d09a7d6857860f84ad223dec002ddea2" dependencies = [ "futures-util", "log", @@ -3658,13 +3662,13 @@ checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" [[package]] name = "tungstenite" -version = "0.18.0" +version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30ee6ab729cd4cf0fd55218530c4522ed30b7b6081752839b68fcec8d0960788" +checksum = "e862a1c4128df0112ab625f55cd5c934bcb4312ba80b39ae4b4835a3fd58e649" dependencies = [ - "base64 0.13.1", "byteorder", "bytes", + "data-encoding", "http", "httparse", "log", diff --git a/libs/client-api/Cargo.toml b/libs/client-api/Cargo.toml index 84a5edfa..10f6cdc6 100644 --- a/libs/client-api/Cargo.toml +++ b/libs/client-api/Cargo.toml @@ -21,7 +21,7 @@ parking_lot = "0.12.1" tracing = { version = "0.1" } thiserror = "1.0.39" serde = { version = "1.0", features = ["derive"] } -tokio-tungstenite = { version = "0.18" } +tokio-tungstenite = { version = "0.20" } tokio = { version = "1.26", features = ["full"] } futures-util = "0.3.26" tokio-retry = "0.3" diff --git a/libs/client-api/src/ws/client.rs b/libs/client-api/src/ws/client.rs index f2fae51b..ec557593 100644 --- a/libs/client-api/src/ws/client.rs +++ b/libs/client-api/src/ws/client.rs @@ -5,11 +5,13 @@ use std::net::SocketAddr; use std::sync::{Arc, Weak}; use std::time::Duration; +use crate::ws::ping::ServerFixIntervalPing; use crate::ws::retry::ConnectAction; +use crate::ws::state::{ConnectState, ConnectStateNotify}; use crate::ws::{BusinessID, ClientRealtimeMessage, WSError, WebSocketChannel}; use tokio::sync::broadcast::{channel, Receiver, Sender}; use tokio::sync::{Mutex, RwLock}; -use tokio_retry::strategy::FixedInterval; +use tokio_retry::strategy::FibonacciBackoff; use tokio_retry::{Condition, RetryIf}; use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::MaybeTlsStream; @@ -29,7 +31,7 @@ impl Default for WSClientConfig { Self { buffer_capacity: 1000, ping_per_secs: 8, - retry_connect_per_pings: 10, + retry_connect_per_pings: 20, } } } @@ -39,7 +41,7 @@ type ChannelByObjectId = HashMap>; pub struct WSClient { addr: Arc>>, config: WSClientConfig, - state: Arc>, + state_notify: Arc>, sender: Sender, channels: Arc>>, ping: Arc>>, @@ -54,7 +56,7 @@ impl WSClient { WSClient { addr: Arc::new(parking_lot::Mutex::new(None)), config, - state, + state_notify: state, sender, channels, ping, @@ -63,9 +65,12 @@ impl WSClient { pub async fn connect(&self, addr: String) -> Result, WSError> { *self.addr.lock() = Some(addr.clone()); - + if let Some(old_ping) = self.ping.lock().await.as_ref() { + old_ping.stop().await; + } self.set_state(ConnectState::Connecting).await; - let retry_strategy = FixedInterval::new(Duration::from_secs(2)).take(3); + + let retry_strategy = FibonacciBackoff::from_millis(2000).max_delay(Duration::from_secs(5 * 60)); let action = ConnectAction::new(addr.clone()); let cond = RetryCondition { connecting_addr: addr, @@ -84,7 +89,7 @@ impl WSClient { let mut ping = ServerFixIntervalPing::new( Duration::from_secs(self.config.ping_per_secs), - self.state.clone(), + self.state_notify.clone(), sender.clone(), self.config.retry_connect_per_pings, ); @@ -160,11 +165,11 @@ impl WSClient { } pub async fn subscribe_connect_state(&self) -> Receiver { - self.state.lock().await.subscribe() + self.state_notify.lock().await.subscribe() } pub async fn is_connected(&self) -> bool { - self.state.lock().await.state.is_connected() + self.state_notify.lock().await.state.is_connected() } pub async fn disconnect(&self) { @@ -174,137 +179,7 @@ impl WSClient { } async fn set_state(&self, state: ConnectState) { - self.state.lock().await.set_state(state); - } -} - -struct ServerFixIntervalPing { - duration: Duration, - sender: Option>, - #[allow(dead_code)] - stop_tx: tokio::sync::mpsc::Sender<()>, - stop_rx: Option>, - state: Arc>, - ping_count: Arc>, - retry_connect_per_pings: u32, -} - -impl ServerFixIntervalPing { - fn new( - duration: Duration, - state: Arc>, - sender: Sender, - retry_connect_per_pings: u32, - ) -> Self { - let (tx, rx) = tokio::sync::mpsc::channel(1000); - Self { - duration, - stop_tx: tx, - stop_rx: Some(rx), - state, - sender: Some(sender), - ping_count: Arc::new(Mutex::new(0)), - retry_connect_per_pings, - } - } - - fn run(&mut self) { - let mut stop_rx = self.stop_rx.take().expect("Only take once"); - let mut interval = tokio::time::interval(self.duration); - let sender = self.sender.take().expect("Only take once"); - let mut receiver = sender.subscribe(); - let weak_ping_count = Arc::downgrade(&self.ping_count); - let weak_state = Arc::downgrade(&self.state); - let reconnect_per_ping = self.retry_connect_per_pings; - tokio::spawn(async move { - loop { - tokio::select! { - _ = interval.tick() => { - // Send the ping - tracing::trace!("🙂ping"); - let _ = sender.send(Message::Ping(vec![])); - if let Some(ping_count) = weak_ping_count.upgrade() { - let mut lock = ping_count.lock().await; - // After ten ping were sent, mark the connection as disconnected - if *lock >= reconnect_per_ping { - if let Some(state) =weak_state.upgrade() { - state.lock().await.set_state(ConnectState::Disconnected); - } - } else { - *lock +=1; - } - } - }, - msg = receiver.recv() => { - if let Ok(Message::Pong(_)) = msg { - tracing::trace!("🟢Receive pong from server"); - if let Some(ping_count) = weak_ping_count.upgrade() { - let mut lock = ping_count.lock().await; - *lock = 0; - - if let Some(state) =weak_state.upgrade() { - state.lock().await.set_state(ConnectState::Connected); - } - } - } - }, - _ = stop_rx.recv() => { - break; - } - } - } - }); - } -} - -pub struct ConnectStateNotify { - state: ConnectState, - sender: Sender, -} - -impl ConnectStateNotify { - fn new() -> Self { - let (sender, _) = channel(100); - Self { - state: ConnectState::Disconnected, - sender, - } - } - - fn set_state(&mut self, state: ConnectState) { - if self.state != state { - tracing::trace!("[🙂Client]: connect state changed to {:?}", state); - self.state = state.clone(); - let _ = self.sender.send(state); - } - } - - fn subscribe(&self) -> Receiver { - self.sender.subscribe() - } -} - -#[derive(Clone, Eq, PartialEq, Debug)] -pub enum ConnectState { - Connecting, - Connected, - Disconnected, -} - -impl ConnectState { - #[allow(dead_code)] - fn is_connecting(&self) -> bool { - matches!(self, ConnectState::Connecting) - } - - #[allow(dead_code)] - fn is_connected(&self) -> bool { - matches!(self, ConnectState::Connected) - } - - #[allow(dead_code)] - fn is_disconnected(&self) -> bool { - matches!(self, ConnectState::Disconnected) + self.state_notify.lock().await.set_state(state); } } diff --git a/libs/client-api/src/ws/error.rs b/libs/client-api/src/ws/error.rs index d5d2e063..2951b403 100644 --- a/libs/client-api/src/ws/error.rs +++ b/libs/client-api/src/ws/error.rs @@ -1,5 +1,4 @@ use crate::ws::ClientRealtimeMessage; -use tokio_stream::wrappers::errors::BroadcastStreamRecvError; #[derive(Debug, thiserror::Error)] pub enum WSError { @@ -15,9 +14,6 @@ pub enum WSError { #[error(transparent)] SenderError(#[from] tokio::sync::broadcast::error::SendError), - #[error(transparent)] - BroadcastStreamRecvError(#[from] BroadcastStreamRecvError), - #[error("Internal failure: {0}")] Internal(#[from] Box), } diff --git a/libs/client-api/src/ws/mod.rs b/libs/client-api/src/ws/mod.rs index d1938fe8..c3e64ca1 100644 --- a/libs/client-api/src/ws/mod.rs +++ b/libs/client-api/src/ws/mod.rs @@ -2,9 +2,12 @@ mod client; mod error; mod handler; mod msg; +pub(crate) mod ping; mod retry; +mod state; pub use client::*; pub use error::*; pub use handler::*; pub use msg::*; +pub use state::*; diff --git a/libs/client-api/src/ws/ping.rs b/libs/client-api/src/ws/ping.rs new file mode 100644 index 00000000..dc2741d3 --- /dev/null +++ b/libs/client-api/src/ws/ping.rs @@ -0,0 +1,88 @@ +use crate::ws::state::{ConnectState, ConnectStateNotify}; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::broadcast::Sender; +use tokio::sync::Mutex; +use tokio_tungstenite::tungstenite::Message; + +pub(crate) struct ServerFixIntervalPing { + duration: Duration, + sender: Option>, + #[allow(dead_code)] + stop_tx: tokio::sync::mpsc::Sender<()>, + stop_rx: Option>, + state: Arc>, + ping_count: Arc>, + maximum_ping_count: u32, +} + +impl ServerFixIntervalPing { + pub(crate) fn new( + duration: Duration, + state: Arc>, + sender: Sender, + maximum_ping_count: u32, + ) -> Self { + let (tx, rx) = tokio::sync::mpsc::channel(1000); + Self { + duration, + stop_tx: tx, + stop_rx: Some(rx), + state, + sender: Some(sender), + ping_count: Arc::new(Mutex::new(0)), + maximum_ping_count, + } + } + + pub(crate) async fn stop(&self) { + let _ = self.stop_tx.send(()).await; + } + + pub(crate) fn run(&mut self) { + let mut stop_rx = self.stop_rx.take().expect("Only take once"); + let mut interval = tokio::time::interval(self.duration); + let sender = self.sender.take().expect("Only take once"); + let mut receiver = sender.subscribe(); + let weak_ping_count = Arc::downgrade(&self.ping_count); + let weak_state = Arc::downgrade(&self.state); + let reconnect_per_ping = self.maximum_ping_count; + tokio::spawn(async move { + loop { + tokio::select! { + _ = interval.tick() => { + // Send the ping + tracing::trace!("🙂ping"); + let _ = sender.send(Message::Ping(vec![])); + if let Some(ping_count) = weak_ping_count.upgrade() { + let mut lock = ping_count.lock().await; + if *lock >= reconnect_per_ping { + if let Some(state) =weak_state.upgrade() { + state.lock().await.set_state(ConnectState::PingTimeout); + } + } else { + *lock +=1; + } + } + }, + msg = receiver.recv() => { + if let Ok(Message::Pong(_)) = msg { + tracing::trace!("🟢Receive pong from server"); + if let Some(ping_count) = weak_ping_count.upgrade() { + let mut lock = ping_count.lock().await; + *lock = 0; + + if let Some(state) =weak_state.upgrade() { + state.lock().await.set_state(ConnectState::Connected); + } + } + } + }, + _ = stop_rx.recv() => { + break; + } + } + } + }); + } +} diff --git a/libs/client-api/src/ws/state.rs b/libs/client-api/src/ws/state.rs new file mode 100644 index 00000000..dc4057d4 --- /dev/null +++ b/libs/client-api/src/ws/state.rs @@ -0,0 +1,57 @@ +use tokio::sync::broadcast::{channel, Receiver, Sender}; + +pub struct ConnectStateNotify { + pub(crate) state: ConnectState, + sender: Sender, +} + +impl ConnectStateNotify { + pub(crate) fn new() -> Self { + let (sender, _) = channel(100); + Self { + state: ConnectState::Disconnected, + sender, + } + } + + pub(crate) fn set_state(&mut self, state: ConnectState) { + if self.state != state { + tracing::trace!("[🙂Client]: connect state changed to {:?}", state); + self.state = state.clone(); + let _ = self.sender.send(state); + } + } + + pub(crate) fn subscribe(&self) -> Receiver { + self.sender.subscribe() + } +} + +#[derive(Clone, Eq, PartialEq, Debug)] +pub enum ConnectState { + PingTimeout, + Connecting, + Connected, + Disconnected, +} + +impl ConnectState { + #[allow(dead_code)] + pub fn is_connecting(&self) -> bool { + matches!(self, ConnectState::Connecting) + } + + pub fn is_connected(&self) -> bool { + matches!(self, ConnectState::Connected) + } + + #[allow(dead_code)] + pub fn is_timeout(&self) -> bool { + matches!(self, ConnectState::PingTimeout) + } + + #[allow(dead_code)] + pub fn is_disconnected(&self) -> bool { + matches!(self, ConnectState::Disconnected) + } +} diff --git a/libs/realtime/Cargo.toml b/libs/realtime/Cargo.toml index 0c473f61..2643ea3c 100644 --- a/libs/realtime/Cargo.toml +++ b/libs/realtime/Cargo.toml @@ -11,7 +11,7 @@ actix-web-actors = { version = "4.2.0" } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" thiserror = "1.0.30" -bytes = "1.0" +bytes = { version = "1.0", features = ["serde"] } parking_lot = "0.12.1" tracing = "0.1.25" futures-util = "0.3.26" diff --git a/libs/realtime/src/entities.rs b/libs/realtime/src/entities.rs index 2235673e..f096858b 100644 --- a/libs/realtime/src/entities.rs +++ b/libs/realtime/src/entities.rs @@ -49,7 +49,7 @@ pub struct ClientMessage { pub struct RealtimeMessage { pub business_id: BusinessID, pub object_id: String, - pub payload: Vec, + pub payload: Bytes, } impl RealtimeMessage { @@ -70,7 +70,7 @@ impl From for RealtimeMessage { Self { business_id: BusinessID::CollabId, object_id: msg.object_id().to_string(), - payload: msg.to_vec(), + payload: Bytes::from(msg.to_vec()), } } } @@ -83,7 +83,7 @@ where Self { business_id: client_msg.business_id, object_id: client_msg.content.object_id().to_string(), - payload: client_msg.content.to_vec(), + payload: Bytes::from(client_msg.content.to_vec()), } } } diff --git a/libs/shared-entity/Cargo.toml b/libs/shared-entity/Cargo.toml index 8f360154..3844a492 100644 --- a/libs/shared-entity/Cargo.toml +++ b/libs/shared-entity/Cargo.toml @@ -12,6 +12,7 @@ serde_json = "1.0.105" serde_repr = "0.1.16" thiserror = "1.0.47" reqwest = "0.11.18" +uuid = { version = "1.3.3", features = ["v4"] } actix-web = { version = "4.4.0", default-features = false, features = ["http2"], optional = true } sqlx = { version = "0.7", default-features = false, features = ["postgres"], optional = true } diff --git a/libs/shared-entity/src/dto.rs b/libs/shared-entity/src/dto.rs index febcbb6c..cea5cf33 100644 --- a/libs/shared-entity/src/dto.rs +++ b/libs/shared-entity/src/dto.rs @@ -1,7 +1,5 @@ // Data Transfer Objects (DTO) -use sqlx::types::uuid; - #[derive(serde::Deserialize, serde::Serialize)] pub struct WorkspaceMembersParams { pub workspace_uuid: uuid::Uuid, diff --git a/tests/realtime/edit_collab_test.rs b/tests/realtime/edit_collab_test.rs index 5a6cfd99..db5c6e08 100644 --- a/tests/realtime/edit_collab_test.rs +++ b/tests/realtime/edit_collab_test.rs @@ -85,14 +85,14 @@ async fn same_user_with_same_device_id_test() { let device_id = Uuid::new_v4().to_string(); let client_1_1 = TestClient::new_with_device_id(&object_id, &device_id, collab_type.clone()).await; + client_1_1.collab.lock().insert("1", "a"); + client_1_1.collab.lock().insert("3", "c"); + tokio::time::sleep(Duration::from_millis(500)).await; + let mut client_1_2 = TestClient::new_with_device_id(&object_id, &device_id, collab_type.clone()).await; - - client_1_1.collab.lock().insert("1", "a"); client_1_2.collab.lock().insert("2", "b"); - client_1_1.collab.lock().insert("3", "c"); - - tokio::time::sleep(Duration::from_millis(200)).await; + tokio::time::sleep(Duration::from_millis(500)).await; let json_1 = client_1_1.collab.lock().to_json_value(); let json_2 = client_1_2.collab.lock().to_json_value(); @@ -106,6 +106,8 @@ async fn same_user_with_same_device_id_test() { assert_json_eq!( json_2, json!({ + "1": "a", + "3": "c", "2": "b" }) ); @@ -115,7 +117,9 @@ async fn same_user_with_same_device_id_test() { &collab_type, 5, json!({ - "2": "b" + "1": "a", + "2": "b", + "3": "c" }), ) .await;