refactor: ws ping (#66)

* chore: ws client

* chore: update ws ping

* chore: fix test

* chore: public funcs

* chore: fix test

* chore: check clent-api compile
This commit is contained in:
Nathan.fooo 2023-09-20 20:53:39 +08:00 committed by GitHub
parent 0676db7ed6
commit ca0813e265
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 213 additions and 162 deletions

25
.github/workflows/client_api_check.yml vendored Normal file
View File

@ -0,0 +1,25 @@
name: ClientAPI Check
on:
push:
branches: [ main ]
pull_request:
types: [ opened, synchronize, reopened ]
branches: [ main ]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: dtolnay/rust-toolchain@stable
- uses: Swatinem/rust-cache@v2
with:
workspaces: |
AppFlowy-Cloud
- name: Check
working-directory: ./libs/client-api
run: cargo check

14
Cargo.lock generated
View File

@ -719,6 +719,9 @@ name = "bytes"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223"
dependencies = [
"serde",
]
[[package]]
name = "bytestring"
@ -2915,6 +2918,7 @@ dependencies = [
"sqlx",
"thiserror",
"url",
"uuid",
"validator",
]
@ -3514,9 +3518,9 @@ dependencies = [
[[package]]
name = "tokio-tungstenite"
version = "0.18.0"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54319c93411147bced34cb5609a80e0a8e44c5999c93903a81cd866630ec0bfd"
checksum = "2b2dbec703c26b00d74844519606ef15d09a7d6857860f84ad223dec002ddea2"
dependencies = [
"futures-util",
"log",
@ -3658,13 +3662,13 @@ checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed"
[[package]]
name = "tungstenite"
version = "0.18.0"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30ee6ab729cd4cf0fd55218530c4522ed30b7b6081752839b68fcec8d0960788"
checksum = "e862a1c4128df0112ab625f55cd5c934bcb4312ba80b39ae4b4835a3fd58e649"
dependencies = [
"base64 0.13.1",
"byteorder",
"bytes",
"data-encoding",
"http",
"httparse",
"log",

View File

@ -21,7 +21,7 @@ parking_lot = "0.12.1"
tracing = { version = "0.1" }
thiserror = "1.0.39"
serde = { version = "1.0", features = ["derive"] }
tokio-tungstenite = { version = "0.18" }
tokio-tungstenite = { version = "0.20" }
tokio = { version = "1.26", features = ["full"] }
futures-util = "0.3.26"
tokio-retry = "0.3"

View File

@ -5,11 +5,13 @@ use std::net::SocketAddr;
use std::sync::{Arc, Weak};
use std::time::Duration;
use crate::ws::ping::ServerFixIntervalPing;
use crate::ws::retry::ConnectAction;
use crate::ws::state::{ConnectState, ConnectStateNotify};
use crate::ws::{BusinessID, ClientRealtimeMessage, WSError, WebSocketChannel};
use tokio::sync::broadcast::{channel, Receiver, Sender};
use tokio::sync::{Mutex, RwLock};
use tokio_retry::strategy::FixedInterval;
use tokio_retry::strategy::FibonacciBackoff;
use tokio_retry::{Condition, RetryIf};
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::MaybeTlsStream;
@ -29,7 +31,7 @@ impl Default for WSClientConfig {
Self {
buffer_capacity: 1000,
ping_per_secs: 8,
retry_connect_per_pings: 10,
retry_connect_per_pings: 20,
}
}
}
@ -39,7 +41,7 @@ type ChannelByObjectId = HashMap<String, Weak<WebSocketChannel>>;
pub struct WSClient {
addr: Arc<parking_lot::Mutex<Option<String>>>,
config: WSClientConfig,
state: Arc<Mutex<ConnectStateNotify>>,
state_notify: Arc<Mutex<ConnectStateNotify>>,
sender: Sender<Message>,
channels: Arc<RwLock<HashMap<BusinessID, ChannelByObjectId>>>,
ping: Arc<Mutex<Option<ServerFixIntervalPing>>>,
@ -54,7 +56,7 @@ impl WSClient {
WSClient {
addr: Arc::new(parking_lot::Mutex::new(None)),
config,
state,
state_notify: state,
sender,
channels,
ping,
@ -63,9 +65,12 @@ impl WSClient {
pub async fn connect(&self, addr: String) -> Result<Option<SocketAddr>, WSError> {
*self.addr.lock() = Some(addr.clone());
if let Some(old_ping) = self.ping.lock().await.as_ref() {
old_ping.stop().await;
}
self.set_state(ConnectState::Connecting).await;
let retry_strategy = FixedInterval::new(Duration::from_secs(2)).take(3);
let retry_strategy = FibonacciBackoff::from_millis(2000).max_delay(Duration::from_secs(5 * 60));
let action = ConnectAction::new(addr.clone());
let cond = RetryCondition {
connecting_addr: addr,
@ -84,7 +89,7 @@ impl WSClient {
let mut ping = ServerFixIntervalPing::new(
Duration::from_secs(self.config.ping_per_secs),
self.state.clone(),
self.state_notify.clone(),
sender.clone(),
self.config.retry_connect_per_pings,
);
@ -160,11 +165,11 @@ impl WSClient {
}
pub async fn subscribe_connect_state(&self) -> Receiver<ConnectState> {
self.state.lock().await.subscribe()
self.state_notify.lock().await.subscribe()
}
pub async fn is_connected(&self) -> bool {
self.state.lock().await.state.is_connected()
self.state_notify.lock().await.state.is_connected()
}
pub async fn disconnect(&self) {
@ -174,137 +179,7 @@ impl WSClient {
}
async fn set_state(&self, state: ConnectState) {
self.state.lock().await.set_state(state);
}
}
struct ServerFixIntervalPing {
duration: Duration,
sender: Option<Sender<Message>>,
#[allow(dead_code)]
stop_tx: tokio::sync::mpsc::Sender<()>,
stop_rx: Option<tokio::sync::mpsc::Receiver<()>>,
state: Arc<Mutex<ConnectStateNotify>>,
ping_count: Arc<Mutex<u32>>,
retry_connect_per_pings: u32,
}
impl ServerFixIntervalPing {
fn new(
duration: Duration,
state: Arc<Mutex<ConnectStateNotify>>,
sender: Sender<Message>,
retry_connect_per_pings: u32,
) -> Self {
let (tx, rx) = tokio::sync::mpsc::channel(1000);
Self {
duration,
stop_tx: tx,
stop_rx: Some(rx),
state,
sender: Some(sender),
ping_count: Arc::new(Mutex::new(0)),
retry_connect_per_pings,
}
}
fn run(&mut self) {
let mut stop_rx = self.stop_rx.take().expect("Only take once");
let mut interval = tokio::time::interval(self.duration);
let sender = self.sender.take().expect("Only take once");
let mut receiver = sender.subscribe();
let weak_ping_count = Arc::downgrade(&self.ping_count);
let weak_state = Arc::downgrade(&self.state);
let reconnect_per_ping = self.retry_connect_per_pings;
tokio::spawn(async move {
loop {
tokio::select! {
_ = interval.tick() => {
// Send the ping
tracing::trace!("🙂ping");
let _ = sender.send(Message::Ping(vec![]));
if let Some(ping_count) = weak_ping_count.upgrade() {
let mut lock = ping_count.lock().await;
// After ten ping were sent, mark the connection as disconnected
if *lock >= reconnect_per_ping {
if let Some(state) =weak_state.upgrade() {
state.lock().await.set_state(ConnectState::Disconnected);
}
} else {
*lock +=1;
}
}
},
msg = receiver.recv() => {
if let Ok(Message::Pong(_)) = msg {
tracing::trace!("🟢Receive pong from server");
if let Some(ping_count) = weak_ping_count.upgrade() {
let mut lock = ping_count.lock().await;
*lock = 0;
if let Some(state) =weak_state.upgrade() {
state.lock().await.set_state(ConnectState::Connected);
}
}
}
},
_ = stop_rx.recv() => {
break;
}
}
}
});
}
}
pub struct ConnectStateNotify {
state: ConnectState,
sender: Sender<ConnectState>,
}
impl ConnectStateNotify {
fn new() -> Self {
let (sender, _) = channel(100);
Self {
state: ConnectState::Disconnected,
sender,
}
}
fn set_state(&mut self, state: ConnectState) {
if self.state != state {
tracing::trace!("[🙂Client]: connect state changed to {:?}", state);
self.state = state.clone();
let _ = self.sender.send(state);
}
}
fn subscribe(&self) -> Receiver<ConnectState> {
self.sender.subscribe()
}
}
#[derive(Clone, Eq, PartialEq, Debug)]
pub enum ConnectState {
Connecting,
Connected,
Disconnected,
}
impl ConnectState {
#[allow(dead_code)]
fn is_connecting(&self) -> bool {
matches!(self, ConnectState::Connecting)
}
#[allow(dead_code)]
fn is_connected(&self) -> bool {
matches!(self, ConnectState::Connected)
}
#[allow(dead_code)]
fn is_disconnected(&self) -> bool {
matches!(self, ConnectState::Disconnected)
self.state_notify.lock().await.set_state(state);
}
}

View File

@ -1,5 +1,4 @@
use crate::ws::ClientRealtimeMessage;
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
#[derive(Debug, thiserror::Error)]
pub enum WSError {
@ -15,9 +14,6 @@ pub enum WSError {
#[error(transparent)]
SenderError(#[from] tokio::sync::broadcast::error::SendError<ClientRealtimeMessage>),
#[error(transparent)]
BroadcastStreamRecvError(#[from] BroadcastStreamRecvError),
#[error("Internal failure: {0}")]
Internal(#[from] Box<dyn std::error::Error + Send + Sync>),
}

View File

@ -2,9 +2,12 @@ mod client;
mod error;
mod handler;
mod msg;
pub(crate) mod ping;
mod retry;
mod state;
pub use client::*;
pub use error::*;
pub use handler::*;
pub use msg::*;
pub use state::*;

View File

@ -0,0 +1,88 @@
use crate::ws::state::{ConnectState, ConnectStateNotify};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast::Sender;
use tokio::sync::Mutex;
use tokio_tungstenite::tungstenite::Message;
pub(crate) struct ServerFixIntervalPing {
duration: Duration,
sender: Option<Sender<Message>>,
#[allow(dead_code)]
stop_tx: tokio::sync::mpsc::Sender<()>,
stop_rx: Option<tokio::sync::mpsc::Receiver<()>>,
state: Arc<Mutex<ConnectStateNotify>>,
ping_count: Arc<Mutex<u32>>,
maximum_ping_count: u32,
}
impl ServerFixIntervalPing {
pub(crate) fn new(
duration: Duration,
state: Arc<Mutex<ConnectStateNotify>>,
sender: Sender<Message>,
maximum_ping_count: u32,
) -> Self {
let (tx, rx) = tokio::sync::mpsc::channel(1000);
Self {
duration,
stop_tx: tx,
stop_rx: Some(rx),
state,
sender: Some(sender),
ping_count: Arc::new(Mutex::new(0)),
maximum_ping_count,
}
}
pub(crate) async fn stop(&self) {
let _ = self.stop_tx.send(()).await;
}
pub(crate) fn run(&mut self) {
let mut stop_rx = self.stop_rx.take().expect("Only take once");
let mut interval = tokio::time::interval(self.duration);
let sender = self.sender.take().expect("Only take once");
let mut receiver = sender.subscribe();
let weak_ping_count = Arc::downgrade(&self.ping_count);
let weak_state = Arc::downgrade(&self.state);
let reconnect_per_ping = self.maximum_ping_count;
tokio::spawn(async move {
loop {
tokio::select! {
_ = interval.tick() => {
// Send the ping
tracing::trace!("🙂ping");
let _ = sender.send(Message::Ping(vec![]));
if let Some(ping_count) = weak_ping_count.upgrade() {
let mut lock = ping_count.lock().await;
if *lock >= reconnect_per_ping {
if let Some(state) =weak_state.upgrade() {
state.lock().await.set_state(ConnectState::PingTimeout);
}
} else {
*lock +=1;
}
}
},
msg = receiver.recv() => {
if let Ok(Message::Pong(_)) = msg {
tracing::trace!("🟢Receive pong from server");
if let Some(ping_count) = weak_ping_count.upgrade() {
let mut lock = ping_count.lock().await;
*lock = 0;
if let Some(state) =weak_state.upgrade() {
state.lock().await.set_state(ConnectState::Connected);
}
}
}
},
_ = stop_rx.recv() => {
break;
}
}
}
});
}
}

View File

@ -0,0 +1,57 @@
use tokio::sync::broadcast::{channel, Receiver, Sender};
pub struct ConnectStateNotify {
pub(crate) state: ConnectState,
sender: Sender<ConnectState>,
}
impl ConnectStateNotify {
pub(crate) fn new() -> Self {
let (sender, _) = channel(100);
Self {
state: ConnectState::Disconnected,
sender,
}
}
pub(crate) fn set_state(&mut self, state: ConnectState) {
if self.state != state {
tracing::trace!("[🙂Client]: connect state changed to {:?}", state);
self.state = state.clone();
let _ = self.sender.send(state);
}
}
pub(crate) fn subscribe(&self) -> Receiver<ConnectState> {
self.sender.subscribe()
}
}
#[derive(Clone, Eq, PartialEq, Debug)]
pub enum ConnectState {
PingTimeout,
Connecting,
Connected,
Disconnected,
}
impl ConnectState {
#[allow(dead_code)]
pub fn is_connecting(&self) -> bool {
matches!(self, ConnectState::Connecting)
}
pub fn is_connected(&self) -> bool {
matches!(self, ConnectState::Connected)
}
#[allow(dead_code)]
pub fn is_timeout(&self) -> bool {
matches!(self, ConnectState::PingTimeout)
}
#[allow(dead_code)]
pub fn is_disconnected(&self) -> bool {
matches!(self, ConnectState::Disconnected)
}
}

View File

@ -11,7 +11,7 @@ actix-web-actors = { version = "4.2.0" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0.30"
bytes = "1.0"
bytes = { version = "1.0", features = ["serde"] }
parking_lot = "0.12.1"
tracing = "0.1.25"
futures-util = "0.3.26"

View File

@ -49,7 +49,7 @@ pub struct ClientMessage<U> {
pub struct RealtimeMessage {
pub business_id: BusinessID,
pub object_id: String,
pub payload: Vec<u8>,
pub payload: Bytes,
}
impl RealtimeMessage {
@ -70,7 +70,7 @@ impl From<CollabMessage> for RealtimeMessage {
Self {
business_id: BusinessID::CollabId,
object_id: msg.object_id().to_string(),
payload: msg.to_vec(),
payload: Bytes::from(msg.to_vec()),
}
}
}
@ -83,7 +83,7 @@ where
Self {
business_id: client_msg.business_id,
object_id: client_msg.content.object_id().to_string(),
payload: client_msg.content.to_vec(),
payload: Bytes::from(client_msg.content.to_vec()),
}
}
}

View File

@ -12,6 +12,7 @@ serde_json = "1.0.105"
serde_repr = "0.1.16"
thiserror = "1.0.47"
reqwest = "0.11.18"
uuid = { version = "1.3.3", features = ["v4"] }
actix-web = { version = "4.4.0", default-features = false, features = ["http2"], optional = true }
sqlx = { version = "0.7", default-features = false, features = ["postgres"], optional = true }

View File

@ -1,7 +1,5 @@
// Data Transfer Objects (DTO)
use sqlx::types::uuid;
#[derive(serde::Deserialize, serde::Serialize)]
pub struct WorkspaceMembersParams {
pub workspace_uuid: uuid::Uuid,

View File

@ -85,14 +85,14 @@ async fn same_user_with_same_device_id_test() {
let device_id = Uuid::new_v4().to_string();
let client_1_1 =
TestClient::new_with_device_id(&object_id, &device_id, collab_type.clone()).await;
client_1_1.collab.lock().insert("1", "a");
client_1_1.collab.lock().insert("3", "c");
tokio::time::sleep(Duration::from_millis(500)).await;
let mut client_1_2 =
TestClient::new_with_device_id(&object_id, &device_id, collab_type.clone()).await;
client_1_1.collab.lock().insert("1", "a");
client_1_2.collab.lock().insert("2", "b");
client_1_1.collab.lock().insert("3", "c");
tokio::time::sleep(Duration::from_millis(200)).await;
tokio::time::sleep(Duration::from_millis(500)).await;
let json_1 = client_1_1.collab.lock().to_json_value();
let json_2 = client_1_2.collab.lock().to_json_value();
@ -106,6 +106,8 @@ async fn same_user_with_same_device_id_test() {
assert_json_eq!(
json_2,
json!({
"1": "a",
"3": "c",
"2": "b"
})
);
@ -115,7 +117,9 @@ async fn same_user_with_same_device_id_test() {
&collab_type,
5,
json!({
"2": "b"
"1": "a",
"2": "b",
"3": "c"
}),
)
.await;