From c7e90eb24bb02b44b21cea6dbb86708b56fee606 Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Sat, 11 Nov 2023 12:04:52 +0800 Subject: [PATCH] feat: stop recv messages when subscribe drop (#159) * feat: stop recv messages when subscribe drop * chore: update collab rev * chore: fmt --- Cargo.lock | 4 +- Cargo.toml | 10 ++-- admin_frontend/Cargo.toml | 4 +- libs/app_error/Cargo.toml | 4 +- libs/client-api/Cargo.toml | 4 +- libs/client-api/src/collab_sync/sync.rs | 66 +++++++++++----------- libs/client-api/src/http.rs | 6 +- libs/client-api/src/ws/handler.rs | 1 + libs/database-entity/Cargo.toml | 4 +- libs/database/Cargo.toml | 4 +- libs/gotrue-entity/Cargo.toml | 4 +- libs/gotrue-entity/src/dto.rs | 2 +- libs/gotrue/Cargo.toml | 4 +- libs/infra/Cargo.toml | 4 +- libs/realtime-entity/Cargo.toml | 4 +- libs/realtime-entity/src/collab_msg.rs | 12 ---- libs/realtime/Cargo.toml | 4 +- libs/realtime/src/collaborate/broadcast.rs | 2 +- libs/shared-entity/Cargo.toml | 2 +- libs/token/Cargo.toml | 2 +- 20 files changed, 68 insertions(+), 79 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 042a999d..3d1c3413 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1182,7 +1182,7 @@ dependencies = [ [[package]] name = "collab" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=508dd160a#508dd160a252228f0556ce594bae63af35eaa0f2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2644fba6#2644fba621c515e7d62fa4d21350b49dbcc14671" dependencies = [ "anyhow", "async-trait", @@ -1202,7 +1202,7 @@ dependencies = [ [[package]] name = "collab-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=508dd160a#508dd160a252228f0556ce594bae63af35eaa0f2" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=2644fba6#2644fba621c515e7d62fa4d21350b49dbcc14671" dependencies = [ "anyhow", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 6ccdc395..811ae45d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,8 +19,8 @@ actix-session = { version = "0.8", features = ["redis-rs-tls-session"] } openssl = "0.10.45" # serde -serde_json = "1.0.108" -serde = { version = "1.0", features = ["derive"] } +serde_json.workspace = true +serde.workspace = true serde-aux = "4.1.2" tokio = { version = "1.26.0", features = [ @@ -124,6 +124,8 @@ members = [ [workspace.dependencies] realtime-entity = { path = "libs/realtime-entity" } app-error = { path = "libs/app_error" } +serde_json = "1.0.108" +serde = { version = "1.0.108", features = ["derive"] } [profile.release] lto = true @@ -140,8 +142,8 @@ lto = false opt-level = 3 [patch.crates-io] -collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "508dd160a" } -collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "508dd160a" } +collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2644fba6" } +collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2644fba6" } # Comment the above and uncomment the below to use local version of collab by cloning the repo and placing it in libs folder #collab = { path = "libs/AppFlowy-Collab/collab" } diff --git a/admin_frontend/Cargo.toml b/admin_frontend/Cargo.toml index 1874b58a..3e5ef362 100644 --- a/admin_frontend/Cargo.toml +++ b/admin_frontend/Cargo.toml @@ -15,8 +15,8 @@ axum = {version = "0.6.20", features = ["json"]} tokio = { version = "1.32.0", features = ["rt-multi-thread", "macros"] } askama = "0.12.1" axum-extra = { version = "0.8.0", features = ["cookie"] } -serde = { version = "1.0.188", features = ["derive"] } -serde_json = "1.0.108" +serde.workspace = true +serde_json.workspace = true redis = { version = "0.23.3", features = [ "aio", "tokio-comp", "connection-manager"] } uuid = { version = "1.4.1", features = ["v4"] } dotenv = "0.15.0" diff --git a/libs/app_error/Cargo.toml b/libs/app_error/Cargo.toml index 62ae05a1..e2260755 100644 --- a/libs/app_error/Cargo.toml +++ b/libs/app_error/Cargo.toml @@ -8,7 +8,7 @@ edition = "2021" [dependencies] thiserror = "1.0.47" serde_repr = "0.1.16" -serde = { version = "1.0"} +serde.workspace = true anyhow = "1.0.75" uuid = { version = "1.4.1", features = ["v4"] } sqlx = { version = "0.7", default-features = false, features = ["postgres", "json"], optional = true } @@ -17,7 +17,7 @@ rust-s3 = { version = "0.33.0", optional = true } url = { version = "2.4.1"} actix-web = { version = "4.3.1", optional = true } reqwest = { version = "0.11" } -serde_json = "1.0.108" +serde_json.workspace = true [features] default = [] diff --git a/libs/client-api/Cargo.toml b/libs/client-api/Cargo.toml index 33df5823..d733b53f 100644 --- a/libs/client-api/Cargo.toml +++ b/libs/client-api/Cargo.toml @@ -8,7 +8,7 @@ edition = "2021" [dependencies] reqwest = { version = "0.11.20", default-features = false, features = ["json","multipart"] } anyhow = "1.0.75" -serde_json = "1.0.108" +serde_json.workspace = true serde_repr = "0.1.16" gotrue = { path = "../gotrue" } gotrue-entity = { path = "../gotrue-entity" } @@ -23,7 +23,7 @@ app-error = { workspace = true } # ws tracing = { version = "0.1" } thiserror = "1.0.39" -serde = { version = "1.0", features = ["derive"] } +serde.workspace = true tokio-tungstenite = { version = "0.20.1", features = ["native-tls"] } tokio = { version = "1.26", features = ["full"] } futures-util = "0.3.26" diff --git a/libs/client-api/src/collab_sync/sync.rs b/libs/client-api/src/collab_sync/sync.rs index aefa49f4..f6a7d0da 100644 --- a/libs/client-api/src/collab_sync/sync.rs +++ b/libs/client-api/src/collab_sync/sync.rs @@ -1,11 +1,7 @@ -use bytes::Bytes; -use std::marker::PhantomData; -use std::ops::Deref; -use std::sync::{Arc, Weak}; - use crate::collab_sync::{ CollabSink, CollabSinkRunner, DefaultMsgIdCounter, SinkConfig, SinkState, SyncError, SyncObject, }; +use bytes::Bytes; use collab::core::collab::MutexCollab; use collab::core::collab_state::SyncState; use collab::core::origin::CollabOrigin; @@ -15,11 +11,13 @@ use collab::sync_protocol::{handle_msg, ClientSyncProtocol, CollabSyncProtocol}; use futures_util::{SinkExt, StreamExt}; use lib0::decoding::Cursor; use realtime_entity::collab_msg::{ClientCollabInit, CollabMessage, ServerCollabInit, UpdateSync}; +use std::marker::PhantomData; +use std::ops::Deref; +use std::sync::{Arc, Weak}; use tokio::spawn; use tokio::sync::watch; - use tokio_stream::wrappers::WatchStream; -use tracing::{error, trace, warn}; +use tracing::{error, trace, warn, Level}; use yrs::updates::decoder::DecoderV1; use yrs::updates::encoder::{Encoder, EncoderV1}; @@ -238,33 +236,30 @@ where ) where P: CollabSyncProtocol + Send + Sync + 'static, { - loop { - while let Some(collab_message) = stream.next().await { - match collab_message { - Ok(msg) => match (weak_collab.upgrade(), weak_sink.upgrade()) { - (Some(awareness), Some(sink)) => { - if let Err(error) = SyncStream::::process_message::

( - &origin, &object_id, &protocol, &awareness, &sink, msg, - ) - .await - { - error!( - "Stop receive incoming changes. Failed to process message: {}", - error - ); - break; - } - }, - _ => { - warn!("Stop receive doc incoming changes."); - break; - }, + while let Some(collab_message) = stream.next().await { + match collab_message { + Ok(msg) => match (weak_collab.upgrade(), weak_sink.upgrade()) { + (Some(collab), Some(sink)) => { + let span = tracing::span!(Level::TRACE, "doc_stream", object_id = %msg.object_id()); + let _enter = span.enter(); + if let Err(error) = SyncStream::::process_message::

( + &origin, &object_id, &protocol, &collab, &sink, msg, + ) + .await + { + error!("Error while processing message: {}", error); + } }, - Err(e) => { - warn!("Stream error: {},stop receive incoming changes", e.into()); + _ => { + // The collab or sink is dropped, stop the stream. + warn!("Stop receive doc incoming changes."); break; }, - } + }, + Err(e) => { + warn!("Stream error: {},stop receive incoming changes", e.into()); + break; + }, } } } @@ -290,7 +285,7 @@ where if should_process { if let Some(payload) = msg.payload() { if !payload.is_empty() { - trace!("start process message: {:?}", msg.msg_id()); + trace!("start process message:{:?}", msg.msg_id()); SyncStream::::process_payload( origin, payload, object_id, protocol, collab, sink, ) @@ -319,10 +314,13 @@ where let msg = msg?; trace!(" {}", msg); let is_sync_step_1 = matches!(msg, Message::Sync(SyncMessage::SyncStep1(_))); - if let Some(payload) = handle_msg(&Some(origin), protocol, collab, msg).await? { + if let Some(payload) = handle_msg(&Some(origin), protocol, collab, msg)? { if is_sync_step_1 { // flush - collab.lock().flush() + match collab.try_lock() { + None => warn!("Failed to acquire lock for flushing collab"), + Some(collab_guard) => collab_guard.flush(), + } } let object_id = object_id.to_string(); diff --git a/libs/client-api/src/http.rs b/libs/client-api/src/http.rs index f2c2629a..58f06434 100644 --- a/libs/client-api/src/http.rs +++ b/libs/client-api/src/http.rs @@ -369,7 +369,7 @@ impl Client { token .as_ref() .ok_or(AppResponseError::from(AppError::NotLoggedIn( - "fail to get expires_at".to_string(), + "token is empty".to_string(), )))? .expires_at, ), @@ -996,8 +996,8 @@ impl Client { .duration_since(SystemTime::UNIX_EPOCH) .unwrap() .as_secs() as i64; - if time_now_sec + 60 > expires_at { - // Add 60 seconds buffer + if time_now_sec + 10 > expires_at { + // Add 10 seconds buffer self.refresh_token().await?; } diff --git a/libs/client-api/src/ws/handler.rs b/libs/client-api/src/ws/handler.rs index 452558ca..fe5da1a7 100644 --- a/libs/client-api/src/ws/handler.rs +++ b/libs/client-api/src/ws/handler.rs @@ -65,6 +65,7 @@ where while let Ok(msg) = recv.recv().await { if let Err(err) = tx.send(Ok(msg)) { trace!("Failed to send message to channel stream: {}", err); + break; } } trace!("WebSocketChannel {} stream closed", object_id); diff --git a/libs/database-entity/Cargo.toml b/libs/database-entity/Cargo.toml index f3225d52..d631284c 100644 --- a/libs/database-entity/Cargo.toml +++ b/libs/database-entity/Cargo.toml @@ -6,8 +6,8 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -serde = { version = "1.0.130", features = ["derive"] } -serde_json = "1.0.108" +serde.workspace = true +serde_json.workspace = true sqlx = { version = "0.7", default-features = false, features = ["macros"] } collab-entity = { version = "0.1.0" } validator = { version = "0.16", features = ["validator_derive", "derive"] } diff --git a/libs/database/Cargo.toml b/libs/database/Cargo.toml index d6fa839d..3649ee95 100644 --- a/libs/database/Cargo.toml +++ b/libs/database/Cargo.toml @@ -15,8 +15,8 @@ app-error = { workspace = true, features = ["sqlx_error", "validation_error", "s tokio = { version = "1.26", features = ["sync"] } async-trait = "0.1.73" anyhow = "1.0.75" -serde = { version = "1.0.130", features = ["derive"] } -serde_json = "1.0.68" +serde.workspace = true +serde_json.workspace = true sqlx = { version = "0.7", default-features = false, features = ["postgres", "chrono", "uuid", "macros", "runtime-tokio-rustls", "rust_decimal"] } tracing = { version = "0.1.37" } diff --git a/libs/gotrue-entity/Cargo.toml b/libs/gotrue-entity/Cargo.toml index 86620ac3..30ccfd59 100644 --- a/libs/gotrue-entity/Cargo.toml +++ b/libs/gotrue-entity/Cargo.toml @@ -6,8 +6,8 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0.105" +serde.workspace = true +serde_json.workspace = true anyhow = "1.0.75" reqwest = "0.11.20" lazy_static = "1.4.0" diff --git a/libs/gotrue-entity/src/dto.rs b/libs/gotrue-entity/src/dto.rs index 89d9992f..422347bb 100644 --- a/libs/gotrue-entity/src/dto.rs +++ b/libs/gotrue-entity/src/dto.rs @@ -78,7 +78,7 @@ pub struct GotrueTokenResponse { pub token_type: String, /// the access_token will remain valid before it expires and needs to be refreshed. pub expires_in: i64, - /// a timestamp indicating the exact time at which the access_token will expire. + /// a timestamp in seconds indicating the exact time at which the access_token will expire. pub expires_at: i64, /// The refresh token is used to obtain a new access_token once the current access_token expires. /// Refresh tokens are usually long-lived and are stored securely by the client. diff --git a/libs/gotrue/Cargo.toml b/libs/gotrue/Cargo.toml index 80e95c3f..91be9dcb 100644 --- a/libs/gotrue/Cargo.toml +++ b/libs/gotrue/Cargo.toml @@ -6,8 +6,8 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0.105" +serde.workspace = true +serde_json.workspace = true futures-util = "0.3.8" anyhow = "1.0.75" reqwest = { version = "0.11.20", default-features = false, features = ["json", "rustls-tls", "cookies"] } diff --git a/libs/infra/Cargo.toml b/libs/infra/Cargo.toml index f9b91a45..f1e35b8d 100644 --- a/libs/infra/Cargo.toml +++ b/libs/infra/Cargo.toml @@ -8,5 +8,5 @@ edition = "2021" [dependencies] reqwest = { version = "0.11.20", default-features = false } anyhow = "1.0.75" -serde = { version = "1.0.130" } -serde_json = "1.0.105" \ No newline at end of file +serde.workspace = true +serde_json ="1.0.105" \ No newline at end of file diff --git a/libs/realtime-entity/Cargo.toml b/libs/realtime-entity/Cargo.toml index df8885b4..2bf0c9d1 100644 --- a/libs/realtime-entity/Cargo.toml +++ b/libs/realtime-entity/Cargo.toml @@ -8,8 +8,8 @@ edition = "2021" [dependencies] collab = { version = "0.1.0" } collab-entity = { version = "0.1.0" } -serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" +serde.workspace = true +serde_json.workspace = true bytes = { version = "1.0", features = ["serde"] } anyhow = "1.0.75" actix = { version = "0.13", optional = true } diff --git a/libs/realtime-entity/src/collab_msg.rs b/libs/realtime-entity/src/collab_msg.rs index 4f8d939a..93372835 100644 --- a/libs/realtime-entity/src/collab_msg.rs +++ b/libs/realtime-entity/src/collab_msg.rs @@ -110,15 +110,9 @@ impl Ord for CollabMessage { } impl CollabMessage { - /// Currently, only have one business id. So just return 1. - pub fn business_id(&self) -> u8 { - 1 - } - pub fn is_init(&self) -> bool { matches!(self, CollabMessage::ClientInit(_)) } - pub fn msg_id(&self) -> Option { match self { CollabMessage::ClientInit(value) => Some(value.msg_id), @@ -130,22 +124,18 @@ impl CollabMessage { CollabMessage::CloseCollab(_) => None, } } - pub fn to_vec(&self) -> Vec { serde_json::to_vec(self).unwrap_or_default() } - pub fn from_vec(data: &[u8]) -> Result { serde_json::from_slice(data) } - pub fn len(&self) -> usize { self .payload() .map(|payload| payload.len()) .unwrap_or_default() } - pub fn payload(&self) -> Option<&Bytes> { match self { CollabMessage::ClientInit(value) => Some(&value.payload), @@ -157,11 +147,9 @@ impl CollabMessage { CollabMessage::CloseCollab(_) => None, } } - pub fn is_empty(&self) -> bool { self.len() == 0 } - pub fn origin(&self) -> Option<&CollabOrigin> { match self { CollabMessage::ClientInit(value) => Some(&value.origin), diff --git a/libs/realtime/Cargo.toml b/libs/realtime/Cargo.toml index 3c9f7bd0..f884cfa6 100644 --- a/libs/realtime/Cargo.toml +++ b/libs/realtime/Cargo.toml @@ -8,8 +8,8 @@ edition = "2021" [dependencies] actix = "0.13" actix-web-actors = { version = "4.2.0" } -serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0.108" +serde.workspace = true +serde_json.workspace = true thiserror = "1.0.30" bytes = { version = "1.0", features = ["serde"] } parking_lot = { version = "0.12.1", features = ["arc_lock"] } diff --git a/libs/realtime/src/collaborate/broadcast.rs b/libs/realtime/src/collaborate/broadcast.rs index 6ee29ad7..3771277a 100644 --- a/libs/realtime/src/collaborate/broadcast.rs +++ b/libs/realtime/src/collaborate/broadcast.rs @@ -200,7 +200,7 @@ impl CollabBroadcast { match msg { Ok(msg) => { if let Ok(payload) = - handle_msg(&collab_msg_origin, &ServerSyncProtocol, &collab, msg).await { + handle_msg(&collab_msg_origin, &ServerSyncProtocol, &collab, msg) { // Send the response to the corresponding client match collab_msg_origin { None => warn!("Client message does not have a origin"), diff --git a/libs/shared-entity/Cargo.toml b/libs/shared-entity/Cargo.toml index 6ceed23b..63c82578 100644 --- a/libs/shared-entity/Cargo.toml +++ b/libs/shared-entity/Cargo.toml @@ -8,7 +8,7 @@ edition = "2021" [dependencies] anyhow = "1.0.75" serde = "1.0.188" -serde_json = "1.0.105" +serde_json.workspace = true serde_repr = "0.1.16" thiserror = "1.0.47" reqwest = "0.11.18" diff --git a/libs/token/Cargo.toml b/libs/token/Cargo.toml index 574be2f7..cbd60a92 100644 --- a/libs/token/Cargo.toml +++ b/libs/token/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -serde = { version = "1.0", features = ["derive"] } +serde.workspace = true chrono = { version = "0.4.23", features = ["serde", "clock"], default-features = false } jwt = "0.16.0" thiserror = "1.0.30"