diff --git a/Cargo.lock b/Cargo.lock index 6a664473..aafa05c2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2080,7 +2080,7 @@ dependencies = [ [[package]] name = "collab" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=6f98f054a8306a8791b65bf1825091ab3fe60166#6f98f054a8306a8791b65bf1825091ab3fe60166" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=ec4f92941942f20c84dce142cd2eeafd44ca7362#ec4f92941942f20c84dce142cd2eeafd44ca7362" dependencies = [ "anyhow", "arc-swap", @@ -2105,7 +2105,7 @@ dependencies = [ [[package]] name = "collab-document" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=6f98f054a8306a8791b65bf1825091ab3fe60166#6f98f054a8306a8791b65bf1825091ab3fe60166" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=ec4f92941942f20c84dce142cd2eeafd44ca7362#ec4f92941942f20c84dce142cd2eeafd44ca7362" dependencies = [ "anyhow", "arc-swap", @@ -2125,7 +2125,7 @@ dependencies = [ [[package]] name = "collab-entity" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=6f98f054a8306a8791b65bf1825091ab3fe60166#6f98f054a8306a8791b65bf1825091ab3fe60166" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=ec4f92941942f20c84dce142cd2eeafd44ca7362#ec4f92941942f20c84dce142cd2eeafd44ca7362" dependencies = [ "anyhow", "bytes", @@ -2144,7 +2144,7 @@ dependencies = [ [[package]] name = "collab-folder" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=6f98f054a8306a8791b65bf1825091ab3fe60166#6f98f054a8306a8791b65bf1825091ab3fe60166" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=ec4f92941942f20c84dce142cd2eeafd44ca7362#ec4f92941942f20c84dce142cd2eeafd44ca7362" dependencies = [ "anyhow", "arc-swap", @@ -2228,7 +2228,7 @@ dependencies = [ [[package]] name = "collab-user" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=6f98f054a8306a8791b65bf1825091ab3fe60166#6f98f054a8306a8791b65bf1825091ab3fe60166" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=ec4f92941942f20c84dce142cd2eeafd44ca7362#ec4f92941942f20c84dce142cd2eeafd44ca7362" dependencies = [ "anyhow", "collab", diff --git a/Cargo.toml b/Cargo.toml index 21925e37..ca3bfc75 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -283,11 +283,11 @@ debug = true [patch.crates-io] # It's diffcult to resovle different version with the same crate used in AppFlowy Frontend and the Client-API crate. # So using patch to workaround this issue. -collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "6f98f054a8306a8791b65bf1825091ab3fe60166" } -collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "6f98f054a8306a8791b65bf1825091ab3fe60166" } -collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "6f98f054a8306a8791b65bf1825091ab3fe60166" } -collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "6f98f054a8306a8791b65bf1825091ab3fe60166" } -collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "6f98f054a8306a8791b65bf1825091ab3fe60166" } +collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "ec4f92941942f20c84dce142cd2eeafd44ca7362" } +collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "ec4f92941942f20c84dce142cd2eeafd44ca7362" } +collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "ec4f92941942f20c84dce142cd2eeafd44ca7362" } +collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "ec4f92941942f20c84dce142cd2eeafd44ca7362" } +collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "ec4f92941942f20c84dce142cd2eeafd44ca7362" } [features] history = [] diff --git a/libs/app-error/src/lib.rs b/libs/app-error/src/lib.rs index a59b1b0d..e2a47176 100644 --- a/libs/app-error/src/lib.rs +++ b/libs/app-error/src/lib.rs @@ -316,6 +316,7 @@ pub enum ErrorCode { #[cfg(feature = "sqlx_error")] SqlxArgEncodingError = 1035, InvalidContentType = 1036, + SingleUploadLimitExceeded = 1037, } impl ErrorCode { diff --git a/libs/client-api/src/native/http_native.rs b/libs/client-api/src/native/http_native.rs index 18b4ea44..8ab165b0 100644 --- a/libs/client-api/src/native/http_native.rs +++ b/libs/client-api/src/native/http_native.rs @@ -25,10 +25,11 @@ use std::future::Future; use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC}; use std::pin::Pin; use std::sync::atomic::Ordering; + use std::task::{Context, Poll}; use std::time::Duration; use tokio_retry::strategy::{ExponentialBackoff, FixedInterval}; -use tokio_retry::{Retry, RetryIf}; +use tokio_retry::{Condition, RetryIf}; use tracing::{event, info, instrument, trace}; pub use infra::file_util::ChunkedBytes; @@ -138,7 +139,7 @@ impl Client { // 2 seconds, 4 seconds, 8 seconds let retry_strategy = ExponentialBackoff::from_millis(2).factor(1000).take(3); let action = GetCollabAction::new(self.clone(), params); - Retry::spawn(retry_strategy, action).await + RetryIf::spawn(retry_strategy, action, RetryGetCollabCondition).await } #[instrument(level = "debug", skip_all, err)] @@ -391,3 +392,10 @@ where Ok(Bytes::from(chunk)) } + +struct RetryGetCollabCondition; +impl Condition for RetryGetCollabCondition { + fn should_retry(&mut self, error: &AppResponseError) -> bool { + !error.is_record_not_found() + } +} diff --git a/services/appflowy-collaborate/src/api.rs b/services/appflowy-collaborate/src/api.rs index 45cd94bf..36b6ba5c 100644 --- a/services/appflowy-collaborate/src/api.rs +++ b/services/appflowy-collaborate/src/api.rs @@ -18,6 +18,7 @@ use tokio_stream::StreamExt; use tracing::{debug, error, event, instrument, trace}; use app_error::AppError; +use authentication::jwt::{authorization_from_token, UserUuid}; use collab_rt_entity::user::{AFUserChange, RealtimeUser, UserMessage}; use collab_rt_entity::{HttpRealtimeMessage, RealtimeMessage}; use shared_entity::response::{AppResponse, AppResponseError}; @@ -31,7 +32,6 @@ use crate::compression::{ decompress, CompressionType, X_COMPRESSION_BUFFER_SIZE, X_COMPRESSION_TYPE, }; use crate::state::AppState; -use authentication::jwt::{authorization_from_token, UserUuid}; pub fn ws_scope() -> Scope { web::scope("/ws").service(web::resource("/v1").route(web::get().to(establish_ws_connection_v1))) @@ -76,6 +76,10 @@ pub async fn establish_ws_connection_v1( }, }; + if client_version < state.config.websocket.min_client_version { + return Err(AppError::Connect("Client version is too low".to_string()).into()); + } + start_connect( &request, payload, diff --git a/services/appflowy-collaborate/src/collab/disk_cache.rs b/services/appflowy-collaborate/src/collab/disk_cache.rs index a3d21d88..916d9da0 100644 --- a/services/appflowy-collaborate/src/collab/disk_cache.rs +++ b/services/appflowy-collaborate/src/collab/disk_cache.rs @@ -4,7 +4,7 @@ use std::time::Duration; use anyhow::anyhow; use collab::entity::EncodedCollab; use collab_entity::CollabType; -use sqlx::{PgPool, Transaction}; +use sqlx::{Error, PgPool, Transaction}; use tokio::time::sleep; use tracing::{event, instrument, Level}; use uuid::Uuid; @@ -100,19 +100,21 @@ impl CollabDiskCache { .await?; }, Err(e) => { - // Handle non-retryable errors immediately - if matches!(e, sqlx::Error::RowNotFound) { - let msg = format!("Can't find the row for query: {:?}", query); - return Err(AppError::RecordNotFound(msg)); - } - - // Increment attempts and retry if below MAX_ATTEMPTS and the error is retryable - if attempts < MAX_ATTEMPTS - 1 && matches!(e, sqlx::Error::PoolTimedOut) { - attempts += 1; - sleep(Duration::from_millis(500 * attempts as u64)).await; - continue; - } else { - return Err(e.into()); + match e { + Error::RowNotFound => { + let msg = format!("Can't find the row for query: {:?}", query); + return Err(AppError::RecordNotFound(msg)); + }, + _ => { + // Increment attempts and retry if below MAX_ATTEMPTS and the error is retryable + if attempts < MAX_ATTEMPTS - 1 && matches!(e, sqlx::Error::PoolTimedOut) { + attempts += 1; + sleep(Duration::from_millis(500 * attempts as u64)).await; + continue; + } else { + return Err(e.into()); + } + }, } }, } diff --git a/services/appflowy-collaborate/src/config.rs b/services/appflowy-collaborate/src/config.rs index fe807f62..a595f887 100644 --- a/services/appflowy-collaborate/src/config.rs +++ b/services/appflowy-collaborate/src/config.rs @@ -3,6 +3,7 @@ use std::str::FromStr; use anyhow::Context; use secrecy::Secret; +use semver::Version; use serde::Deserialize; use sqlx::postgres::{PgConnectOptions, PgSslMode}; @@ -70,6 +71,7 @@ impl AISettings { pub struct WebsocketSetting { pub heartbeat_interval: u8, pub client_timeout: u8, + pub min_client_version: Version, } #[derive(Clone, Debug)] @@ -140,6 +142,7 @@ pub fn get_configuration() -> Result { websocket: WebsocketSetting { heartbeat_interval: get_env_var("APPFLOWY_WEBSOCKET_HEARTBEAT_INTERVAL", "6").parse()?, client_timeout: get_env_var("APPFLOWY_WEBSOCKET_CLIENT_TIMEOUT", "60").parse()?, + min_client_version: get_env_var("APPFLOWY_WEBSOCKET_CLIENT_MIN_VERSION", "0.5.0").parse()?, }, db_settings: DatabaseSetting { pg_conn_opts: PgConnectOptions::from_str(&get_env_var( diff --git a/services/appflowy-collaborate/src/group/cmd.rs b/services/appflowy-collaborate/src/group/cmd.rs index 5bfc8480..32d0bdf6 100644 --- a/services/appflowy-collaborate/src/group/cmd.rs +++ b/services/appflowy-collaborate/src/group/cmd.rs @@ -8,7 +8,7 @@ use tracing::{instrument, trace, warn}; use access_control::collab::RealtimeAccessControl; use collab_rt_entity::user::RealtimeUser; -use collab_rt_entity::{ClientCollabMessage, ServerCollabMessage, SinkMessage}; +use collab_rt_entity::{AckCode, ClientCollabMessage, ServerCollabMessage, SinkMessage}; use collab_rt_entity::{CollabAck, RealtimeMessage}; use database::collab::CollabStorage; @@ -155,7 +155,11 @@ where let origin = first_message.origin().clone(); let msg_id = first_message.msg_id(); let object_id = first_message.object_id().to_string(); - let ack = CollabAck::new(origin, object_id, msg_id, 0); + + // when the group with given id is not found and the the first message is not init sync. + // Return AckCode::CannotApplyUpdate to the client and then client will send the init sync message. + let ack = + CollabAck::new(origin, object_id, msg_id, 0).with_code(AckCode::CannotApplyUpdate); entry .value() .send_message(ServerCollabMessage::ClientAck(ack).into()) diff --git a/services/appflowy-collaborate/src/group/group_init.rs b/services/appflowy-collaborate/src/group/group_init.rs index 2bc58c98..d759e234 100644 --- a/services/appflowy-collaborate/src/group/group_init.rs +++ b/services/appflowy-collaborate/src/group/group_init.rs @@ -11,7 +11,7 @@ use collab_entity::CollabType; use dashmap::DashMap; use futures_util::{SinkExt, StreamExt}; use tokio::sync::{mpsc, RwLock}; -use tracing::{error, event, trace}; +use tracing::{error, event, info, trace}; use yrs::updates::decoder::Decode; use yrs::updates::encoder::Encode; use yrs::Update; @@ -200,7 +200,26 @@ impl CollabGroup { modified_at.elapsed().as_secs() > 60 * 60 } else { let elapsed_secs = modified_at.elapsed().as_secs(); - elapsed_secs > self.timeout_secs() && self.subscribers.is_empty() + if elapsed_secs > self.timeout_secs() { + // Mark the group as inactive if it has been inactive for more than 3 hours, regardless of the number of subscribers. + // Otherwise, return `true` only if there are no subscribers remaining in the group. + // If a client modifies a group that has already been marked as inactive (removed), + // the client will automatically send an initialization sync to reinitialize the group. + const MAXIMUM_SECS: u64 = 3 * 60 * 60; + if elapsed_secs > MAXIMUM_SECS { + info!( + "Group:{} is inactive for {} seconds, subscribers: {}", + self.object_id, + modified_at.elapsed().as_secs(), + self.subscribers.len() + ); + true + } else { + self.subscribers.is_empty() + } + } else { + false + } } } diff --git a/services/appflowy-collaborate/src/indexer/provider.rs b/services/appflowy-collaborate/src/indexer/provider.rs index 06ea8c67..4288720a 100644 --- a/services/appflowy-collaborate/src/indexer/provider.rs +++ b/services/appflowy-collaborate/src/indexer/provider.rs @@ -64,7 +64,7 @@ impl IndexerProvider { fn get_unindexed_collabs( &self, - ) -> Pin>>> { + ) -> Pin> + Send>> { let db = self.db.clone(); Box::pin(try_stream! { let collabs = get_collabs_without_embeddings(&db).await?; diff --git a/services/appflowy-collaborate/src/rt_server.rs b/services/appflowy-collaborate/src/rt_server.rs index a4fd4a23..06d81c69 100644 --- a/services/appflowy-collaborate/src/rt_server.rs +++ b/services/appflowy-collaborate/src/rt_server.rs @@ -1,11 +1,10 @@ +use anyhow::Result; +use dashmap::mapref::entry::Entry; +use dashmap::DashMap; use std::future::Future; use std::pin::Pin; use std::sync::{Arc, Weak}; use std::time::Duration; - -use anyhow::Result; -use dashmap::mapref::entry::Entry; -use dashmap::DashMap; use tokio::sync::Notify; use tokio::time::interval; use tracing::{error, info, trace}; @@ -254,7 +253,7 @@ where } fn spawn_handle_unindexed_collabs(indexer_provider: Arc) { - tokio::task::spawn_local(IndexerProvider::handle_unindexed_collabs(indexer_provider)); + tokio::spawn(IndexerProvider::handle_unindexed_collabs(indexer_provider)); } fn spawn_period_check_inactive_group( diff --git a/src/api/ws.rs b/src/api/ws.rs index 3372d7aa..d23b4e40 100644 --- a/src/api/ws.rs +++ b/src/api/ws.rs @@ -26,7 +26,7 @@ use crate::state::AppState; pub fn ws_scope() -> Scope { web::scope("/ws") - .service(establish_ws_connection) + //.service(establish_ws_connection) .service(web::resource("/v1").route(web::get().to(establish_ws_connection_v1))) } const MAX_FRAME_SIZE: usize = 65_536; // 64 KiB @@ -86,6 +86,10 @@ pub async fn establish_ws_connection_v1( }, }; + if client_version < state.config.websocket.min_client_version { + return Err(AppError::Connect("Client version is too low".to_string()).into()); + } + start_connect( &request, payload, diff --git a/src/config/config.rs b/src/config/config.rs index c0ad3fc1..349c8ec1 100644 --- a/src/config/config.rs +++ b/src/config/config.rs @@ -1,11 +1,14 @@ -use anyhow::Context; -use infra::env_util::get_env_var; -use secrecy::{ExposeSecret, Secret}; -use serde::Deserialize; -use sqlx::postgres::{PgConnectOptions, PgSslMode}; use std::fmt::Display; use std::str::FromStr; +use anyhow::Context; +use secrecy::{ExposeSecret, Secret}; +use semver::Version; +use serde::Deserialize; +use sqlx::postgres::{PgConnectOptions, PgSslMode}; + +use infra::env_util::get_env_var; + #[derive(Clone, Debug)] pub struct Config { pub app_env: Environment, @@ -166,6 +169,7 @@ pub fn get_configuration() -> Result { websocket: WebsocketSetting { heartbeat_interval: get_env_var("APPFLOWY_WEBSOCKET_HEARTBEAT_INTERVAL", "6").parse()?, client_timeout: get_env_var("APPFLOWY_WEBSOCKET_CLIENT_TIMEOUT", "60").parse()?, + min_client_version: get_env_var("APPFLOWY_WEBSOCKET_CLIENT_MIN_VERSION", "0.5.0").parse()?, }, redis_uri: get_env_var("APPFLOWY_REDIS_URI", "redis://localhost:6379").into(), s3: S3Setting { @@ -239,4 +243,5 @@ impl FromStr for Environment { pub struct WebsocketSetting { pub heartbeat_interval: u8, pub client_timeout: u8, + pub min_client_version: Version, }