diff --git a/deploy.env b/deploy.env index 1f2788fc..c1b6dae1 100644 --- a/deploy.env +++ b/deploy.env @@ -127,3 +127,6 @@ APPFLOWY_HISTORY_DATABASE_URL=postgres://postgres:password@postgres:5432/postgre # AppFlowy Indexer APPFLOWY_INDEXER_DATABASE_URL=postgres://postgres:password@postgres:5432/postgres APPFLOWY_INDEXER_REDIS_URL=redis://redis:6379 + +# AppFlowy Collaborate +APPFLOWY_COLLABORATE_MULTI_THREAD=false diff --git a/dev.env b/dev.env index 421bfb84..95e783dd 100644 --- a/dev.env +++ b/dev.env @@ -112,4 +112,7 @@ APPFLOWY_HISTORY_DATABASE_URL=postgres://postgres:password@postgres:5432/postgre # AppFlowy Indexer APPFLOWY_INDEXER_DATABASE_URL=postgres://postgres:password@postgres:5432/postgres -APPFLOWY_INDEXER_REDIS_URL=redis://redis:6379 \ No newline at end of file +APPFLOWY_INDEXER_REDIS_URL=redis://redis:6379 + +# AppFlowy Collaborate +APPFLOWY_COLLABORATE_MULTI_THREAD=false diff --git a/services/appflowy-collaborate/Cargo.toml b/services/appflowy-collaborate/Cargo.toml index ed15690b..f9aac2df 100644 --- a/services/appflowy-collaborate/Cargo.toml +++ b/services/appflowy-collaborate/Cargo.toml @@ -70,6 +70,3 @@ workspace-access.workspace = true [dev-dependencies] rand = "0.8.5" workspace-template.workspace = true - -[features] -collab-rt-multi-thread = [] diff --git a/services/appflowy-collaborate/src/config.rs b/services/appflowy-collaborate/src/config.rs index a595f887..da0eb219 100644 --- a/services/appflowy-collaborate/src/config.rs +++ b/services/appflowy-collaborate/src/config.rs @@ -1,11 +1,10 @@ -use std::fmt::Display; -use std::str::FromStr; - use anyhow::Context; use secrecy::Secret; use semver::Version; use serde::Deserialize; use sqlx::postgres::{PgConnectOptions, PgSslMode}; +use std::fmt::Display; +use std::str::FromStr; #[derive(Clone, Debug)] pub struct Config { diff --git a/services/appflowy-collaborate/src/rt_server.rs b/services/appflowy-collaborate/src/rt_server.rs index d1e9cb0e..3d2465b8 100644 --- a/services/appflowy-collaborate/src/rt_server.rs +++ b/services/appflowy-collaborate/src/rt_server.rs @@ -24,6 +24,8 @@ use crate::group::manager::GroupManager; use crate::indexer::IndexerProvider; use crate::metrics::CollabMetricsCalculate; +use crate::config::get_env_var; +use crate::rt_server::collaboration_runtime::COLLAB_RUNTIME; use crate::state::RedisConnectionManager; use crate::{spawn_metrics, CollabRealtimeMetrics, RealtimeClientWebsocketSink}; @@ -37,6 +39,7 @@ pub struct CollaborationServer { #[allow(dead_code)] metrics: Arc, metrics_calculate: CollabMetricsCalculate, + enable_custom_runtime: bool, } impl CollaborationServer @@ -56,8 +59,14 @@ where edit_state_max_secs: i64, indexer_provider: Arc, ) -> Result { - if cfg!(feature = "collab-rt-multi-thread") { - info!("CollaborationServer with multi-thread feature enabled"); + let enable_custom_runtime = get_env_var("APPFLOWY_COLLABORATE_MULTI_THREAD", "false") + .parse::() + .unwrap_or(false); + + if enable_custom_runtime { + info!("CollaborationServer with custom runtime"); + } else { + info!("CollaborationServer with actix-web runtime"); } let metrics_calculate = CollabMetricsCalculate::default(); @@ -95,6 +104,7 @@ where group_sender_by_object_id, metrics, metrics_calculate, + enable_custom_runtime, }) } @@ -178,6 +188,7 @@ where let group_sender_by_object_id = self.group_sender_by_object_id.clone(); let client_msg_router_by_user = self.connect_state.client_message_routers.clone(); let group_manager = self.group_manager.clone(); + let enable_custom_runtime = self.enable_custom_runtime; Box::pin(async move { for (object_id, collab_messages) in message_by_oid { @@ -200,7 +211,12 @@ where let object_id = entry.key().clone(); let clone_notify = notify.clone(); - tokio::spawn(runner.run(object_id, clone_notify)); + if enable_custom_runtime { + COLLAB_RUNTIME.spawn(runner.run(object_id, clone_notify)); + } else { + tokio::spawn(runner.run(object_id, clone_notify)); + } + entry.insert(new_sender.clone()); // wait for the runner to be ready to handle the message. diff --git a/services/appflowy-history/src/core/open_handle.rs b/services/appflowy-history/src/core/open_handle.rs index 8a9888fb..7f0fa7bd 100644 --- a/services/appflowy-history/src/core/open_handle.rs +++ b/services/appflowy-history/src/core/open_handle.rs @@ -197,7 +197,9 @@ fn apply_updates(messages: &[StreamMessage], collab: &mut Collab) -> Result<(), let CollabUpdateEvent::UpdateV1 { encode_update } = CollabUpdateEvent::decode(&message.data)?; let update = Update::decode_v1(&encode_update) .map_err(|e| CollabError::YrsEncodeStateError(e.to_string()))?; - txn.apply_update(update); + txn + .apply_update(update) + .map_err(|err| HistoryError::Internal(err.into()))?; } Ok(()) }