From 8b9e6584d235d0b2c876bcad68af4d601d29b3bc Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Wed, 10 May 2023 20:54:10 +0800 Subject: [PATCH] feat: ws reconnect (#5) * feat: ws reconnect * chore: update collab rev --- Cargo.lock | 5 ----- Cargo.toml | 10 +++++----- configuration/base.yaml | 3 +++ crates/websocket/src/client.rs | 20 ++++++++++++-------- crates/websocket/src/server.rs | 2 ++ src/api/ws.rs | 8 +++++++- src/application.rs | 5 +++-- src/config/config.rs | 14 ++++++++++---- tests/api/ws.rs | 4 ++-- tests/util/test_server.rs | 2 +- tests/ws/client.rs | 16 ++++++++++++---- tests/ws/mod.rs | 1 + tests/ws/test.rs | 11 +++-------- tests/ws/ws_reconnect.rs | 22 ++++++++++++++++++++++ 14 files changed, 83 insertions(+), 40 deletions(-) create mode 100644 tests/ws/ws_reconnect.rs diff --git a/Cargo.lock b/Cargo.lock index dc63905d..fb41d68a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -824,7 +824,6 @@ dependencies = [ [[package]] name = "collab" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=af4941#af4941ba5394157869eca56d4c937dbec1f0a0e3" dependencies = [ "anyhow", "bytes", @@ -841,7 +840,6 @@ dependencies = [ [[package]] name = "collab-client-ws" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=af4941#af4941ba5394157869eca56d4c937dbec1f0a0e3" dependencies = [ "bytes", "collab-sync", @@ -859,7 +857,6 @@ dependencies = [ [[package]] name = "collab-persistence" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=af4941#af4941ba5394157869eca56d4c937dbec1f0a0e3" dependencies = [ "bincode", "chrono", @@ -879,7 +876,6 @@ dependencies = [ [[package]] name = "collab-plugins" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=af4941#af4941ba5394157869eca56d4c937dbec1f0a0e3" dependencies = [ "collab", "collab-client-ws", @@ -895,7 +891,6 @@ dependencies = [ [[package]] name = "collab-sync" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=af4941#af4941ba5394157869eca56d4c937dbec1f0a0e3" dependencies = [ "bytes", "collab", diff --git a/Cargo.toml b/Cargo.toml index bf0d5494..4b83ef43 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -88,11 +88,11 @@ members = [ ] [patch.crates-io] -collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "af4941" } -collab-client-ws = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "af4941" } -collab-sync= { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "af4941" } -collab-persistence = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "af4941" } -collab-plugins = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "af4941" } +collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "ad656c" } +collab-client-ws = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "ad656c" } +collab-sync= { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "ad656c" } +collab-persistence = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "ad656c" } +collab-plugins = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "ad656c" } #collab = { path = "./crates/AppFlowy-Collab/collab" } #collab-client-ws = { path = "./crates/AppFlowy-Collab/collab-client-ws" } diff --git a/configuration/base.yaml b/configuration/base.yaml index 422d327b..fc5ac2ec 100644 --- a/configuration/base.yaml +++ b/configuration/base.yaml @@ -10,4 +10,7 @@ database: username: "postgres" password: "password" database_name: "appflowy" +websocket: + heartbeat_interval: 8 + client_timeout: 10 redis_uri: "redis://127.0.0.1:6379" diff --git a/crates/websocket/src/client.rs b/crates/websocket/src/client.rs index 2b84b2ef..3ce8d17b 100644 --- a/crates/websocket/src/client.rs +++ b/crates/websocket/src/client.rs @@ -10,32 +10,36 @@ use bytes::Bytes; use std::ops::Deref; use collab_plugins::sync::msg::CollabMessage; - use std::sync::Arc; - use std::time::{Duration, Instant}; -const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5); -const CLIENT_TIMEOUT: Duration = Duration::from_secs(10); - pub struct CollabSession { user: Arc, hb: Instant, pub server: Addr, + heartbeat_interval: Duration, + client_timeout: Duration, } impl CollabSession { - pub fn new(user: WSUser, server: Addr) -> Self { + pub fn new( + user: WSUser, + server: Addr, + heartbeat_interval: Duration, + client_timeout: Duration, + ) -> Self { Self { user: Arc::new(user), hb: Instant::now(), server, + heartbeat_interval, + client_timeout, } } fn hb(&self, ctx: &mut ws::WebsocketContext) { - ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| { - if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT { + ctx.run_interval(self.heartbeat_interval, |act, ctx| { + if Instant::now().duration_since(act.hb) > act.client_timeout { act.server.do_send(Disconnect { user: act.user.clone(), }); diff --git a/crates/websocket/src/server.rs b/crates/websocket/src/server.rs index 8d057a6f..a34dcd2a 100644 --- a/crates/websocket/src/server.rs +++ b/crates/websocket/src/server.rs @@ -30,7 +30,9 @@ pub struct CollabServer { collab_id_gen: Arc>, /// Memory cache for fast lookup of collab_id from object_id collab_id_by_object_id: Arc>, + /// Keep track of all collab groups collab_groups: Arc>>, + /// Keep track of all client streams client_streams: Arc, WSClientStream>>>, } diff --git a/src/api/ws.rs b/src/api/ws.rs index 462cc105..cb23ccb9 100644 --- a/src/api/ws.rs +++ b/src/api/ws.rs @@ -5,6 +5,7 @@ use actix_web::web::{Data, Path, Payload}; use actix_web::{get, web, HttpRequest, HttpResponse, Result, Scope}; use actix_web_actors::ws; use secrecy::Secret; +use std::time::Duration; use websocket::entities::WSUser; use websocket::{CollabServer, CollabSession}; @@ -23,7 +24,12 @@ pub async fn establish_ws_connection( ) -> Result { tracing::trace!("{:?}", request); let user = LoggedUser::from_token(&state.config.application.server_key, token.as_str())?; - let client = CollabSession::new(user.into(), server.get_ref().clone()); + let client = CollabSession::new( + user.into(), + server.get_ref().clone(), + Duration::from_secs(state.config.websocket.heartbeat_interval as u64), + Duration::from_secs(state.config.websocket.client_timeout as u64), + ); match ws::start(client, &request, payload) { Ok(response) => Ok(response), Err(e) => { diff --git a/src/application.rs b/src/application.rs index 37e059e0..67e242e7 100644 --- a/src/application.rs +++ b/src/application.rs @@ -20,6 +20,7 @@ use snowflake::Snowflake; use sqlx::{postgres::PgPoolOptions, PgPool}; use std::net::TcpListener; use std::sync::Arc; + use tokio::sync::RwLock; use tracing_actix_web::TracingLogger; @@ -69,7 +70,7 @@ pub async fn run( .map(|(_, server_key)| Key::from(server_key.expose_secret().as_bytes())) .unwrap_or_else(Key::generate); - let collab_server = CollabServer::new(state.rocksdb.clone()).unwrap().start(); + let collab_server_addr = CollabServer::new(state.rocksdb.clone()).unwrap().start(); let mut server = HttpServer::new(move || { App::new() .wrap( @@ -84,7 +85,7 @@ pub async fn run( .app_data(web::JsonConfig::default().limit(4096)) .service(user_scope()) .service(ws_scope()) - .app_data(Data::new(collab_server.clone())) + .app_data(Data::new(collab_server_addr.clone())) .app_data(Data::new(state.clone())) }); diff --git a/src/config/config.rs b/src/config/config.rs index d0da1972..c73ceb52 100644 --- a/src/config/config.rs +++ b/src/config/config.rs @@ -8,7 +8,8 @@ use std::path::PathBuf; #[derive(serde::Deserialize, Clone, Debug)] pub struct Config { pub database: DatabaseSetting, - pub application: ApplicationSettings, + pub application: ApplicationSetting, + pub websocket: WebsocketSetting, pub redis_uri: Secret, } @@ -21,7 +22,7 @@ pub struct Config { // it to 0.0.0.0 in our Docker images. // #[derive(serde::Deserialize, Clone, Debug)] -pub struct ApplicationSettings { +pub struct ApplicationSetting { #[serde(deserialize_with = "deserialize_number_from_string")] pub port: u16, pub host: String, @@ -30,7 +31,7 @@ pub struct ApplicationSettings { pub tls_config: Option, } -impl ApplicationSettings { +impl ApplicationSetting { pub fn use_https(&self) -> bool { match &self.tls_config { None => false, @@ -89,7 +90,7 @@ pub fn get_configuration() -> Result { let configuration_dir = base_path.join("configuration"); let environment: Environment = std::env::var("APP_ENVIRONMENT") - .unwrap_or_else(|_| "local".into()) + .unwrap_or_else(|_| "local".to_string()) .try_into() .expect("Failed to parse APP_ENVIRONMENT."); @@ -143,3 +144,8 @@ impl TryFrom for Environment { } } } +#[derive(serde::Deserialize, Clone, Debug)] +pub struct WebsocketSetting { + pub heartbeat_interval: u8, + pub client_timeout: u8, +} diff --git a/tests/api/ws.rs b/tests/api/ws.rs index aa823df7..c85d4da8 100644 --- a/tests/api/ws.rs +++ b/tests/api/ws.rs @@ -1,5 +1,5 @@ use crate::util::{spawn_server, TestUser}; -use collab_client_ws::WSClient; +use collab_client_ws::{WSClient, WSClientConfig}; #[actix_rt::test] async fn ws_conn_test() { @@ -8,6 +8,6 @@ async fn ws_conn_test() { let token = test_user.register(&server).await; let address = format!("{}/{}", server.ws_addr, token); - let client = WSClient::new(address, 100); + let client = WSClient::new(address, WSClientConfig::default()); let _ = client.connect().await; } diff --git a/tests/util/test_server.rs b/tests/util/test_server.rs index 1c952e42..69a67a95 100644 --- a/tests/util/test_server.rs +++ b/tests/util/test_server.rs @@ -20,7 +20,7 @@ use sqlx::{Connection, Executor, PgConnection, PgPool}; // Ensure that the `tracing` stack is only initialised once using `once_cell` static TRACING: Lazy<()> = Lazy::new(|| { - let level = "debug".to_string(); + let level = "trace".to_string(); let mut filters = vec![]; filters.push(format!("appflowy_server={}", level)); filters.push(format!("collab_client_ws={}", level)); diff --git a/tests/ws/client.rs b/tests/ws/client.rs index 9c9998b1..039bed41 100644 --- a/tests/ws/client.rs +++ b/tests/ws/client.rs @@ -1,7 +1,7 @@ use collab::core::collab::MutexCollab; use collab::core::origin::{CollabClient, CollabOrigin}; -use collab_client_ws::{WSClient, WSMessageHandler}; +use collab_client_ws::{WSBusinessHandler, WSClient, WSClientConfig}; use collab_plugins::disk::kv::rocks_kv::RocksCollabDB; use collab_plugins::disk::rocksdb::RocksdbDiskPlugin; use collab_plugins::sync::SyncPlugin; @@ -9,6 +9,7 @@ use std::net::SocketAddr; use std::ops::Deref; use std::path::PathBuf; use std::sync::Arc; +use std::time::Duration; use tempfile::TempDir; pub async fn spawn_client( @@ -16,10 +17,13 @@ pub async fn spawn_client( object_id: &str, address: String, ) -> std::io::Result { - let ws_client = WSClient::new(address, 100); + let ws_client = WSClient::new(address, WSClientConfig::default()); let addr = ws_client.connect().await.unwrap().unwrap(); let origin = origin_from_tcp_stream(&addr); - let handler = ws_client.subscribe("collab".to_string()).await.unwrap(); + let handler = ws_client + .subscribe_business("collab".to_string()) + .await + .unwrap(); // let (sink, stream) = (handler.sink(), handler.stream()); @@ -60,7 +64,7 @@ pub struct TestClient { cleaner: Cleaner, #[allow(dead_code)] - handlers: Vec>, + handlers: Vec>, } struct Cleaner(PathBuf); @@ -88,3 +92,7 @@ impl Deref for TestClient { &self.collab } } + +pub async fn wait(secs: u64) { + tokio::time::sleep(Duration::from_secs(secs)).await; +} diff --git a/tests/ws/mod.rs b/tests/ws/mod.rs index 0f3cbef3..44a68920 100644 --- a/tests/ws/mod.rs +++ b/tests/ws/mod.rs @@ -1,2 +1,3 @@ mod client; mod test; +mod ws_reconnect; diff --git a/tests/ws/test.rs b/tests/ws/test.rs index 16c17380..9b3fb6f6 100644 --- a/tests/ws/test.rs +++ b/tests/ws/test.rs @@ -1,7 +1,6 @@ use crate::util::{spawn_server, TestUser}; -use crate::ws::client::spawn_client; +use crate::ws::client::{spawn_client, wait}; use serde_json::json; -use std::time::Duration; #[actix_rt::test] async fn ws_conn_test() { @@ -11,12 +10,12 @@ async fn ws_conn_test() { let address = format!("{}/{}", server.ws_addr, token); let client = spawn_client(1, "1", address).await.unwrap(); - wait_a_sec().await; + wait(1).await; { let collab = client.lock(); collab.insert("1", "a"); } - wait_a_sec().await; + wait(1).await; let value = server.get_doc("1"); assert_json_diff::assert_json_eq!( @@ -26,7 +25,3 @@ async fn ws_conn_test() { }) ); } - -async fn wait_a_sec() { - tokio::time::sleep(Duration::from_secs(2)).await; -} diff --git a/tests/ws/ws_reconnect.rs b/tests/ws/ws_reconnect.rs new file mode 100644 index 00000000..1cdf351a --- /dev/null +++ b/tests/ws/ws_reconnect.rs @@ -0,0 +1,22 @@ +use crate::util::{spawn_server, TestUser}; + +use collab_client_ws::{WSClient, WSClientConfig}; + +#[actix_rt::test] +async fn ws_retry_connect() { + let server = spawn_server().await; + let test_user = TestUser::generate(); + let token = test_user.register(&server).await; + let address = format!("{}/{}", server.ws_addr, token); + + let ws_client = WSClient::new( + address, + WSClientConfig { + buffer_capacity: 100, + ping_per_secs: 2, + retry_connect_per_pings: 5, + }, + ); + let _addr = ws_client.connect().await.unwrap().unwrap(); + // wait(20).await; +}