chore: use env to use control multiple thread runtime or not (#760)

This commit is contained in:
Nathan.fooo 2024-08-28 00:13:24 +08:00 committed by GitHub
parent 5badffc97b
commit ab14568bbf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 31 additions and 11 deletions

View File

@ -127,3 +127,6 @@ APPFLOWY_HISTORY_DATABASE_URL=postgres://postgres:password@postgres:5432/postgre
# AppFlowy Indexer
APPFLOWY_INDEXER_DATABASE_URL=postgres://postgres:password@postgres:5432/postgres
APPFLOWY_INDEXER_REDIS_URL=redis://redis:6379
# AppFlowy Collaborate
APPFLOWY_COLLABORATE_MULTI_THREAD=false

View File

@ -112,4 +112,7 @@ APPFLOWY_HISTORY_DATABASE_URL=postgres://postgres:password@postgres:5432/postgre
# AppFlowy Indexer
APPFLOWY_INDEXER_DATABASE_URL=postgres://postgres:password@postgres:5432/postgres
APPFLOWY_INDEXER_REDIS_URL=redis://redis:6379
APPFLOWY_INDEXER_REDIS_URL=redis://redis:6379
# AppFlowy Collaborate
APPFLOWY_COLLABORATE_MULTI_THREAD=false

View File

@ -70,6 +70,3 @@ workspace-access.workspace = true
[dev-dependencies]
rand = "0.8.5"
workspace-template.workspace = true
[features]
collab-rt-multi-thread = []

View File

@ -1,11 +1,10 @@
use std::fmt::Display;
use std::str::FromStr;
use anyhow::Context;
use secrecy::Secret;
use semver::Version;
use serde::Deserialize;
use sqlx::postgres::{PgConnectOptions, PgSslMode};
use std::fmt::Display;
use std::str::FromStr;
#[derive(Clone, Debug)]
pub struct Config {

View File

@ -24,6 +24,8 @@ use crate::group::manager::GroupManager;
use crate::indexer::IndexerProvider;
use crate::metrics::CollabMetricsCalculate;
use crate::config::get_env_var;
use crate::rt_server::collaboration_runtime::COLLAB_RUNTIME;
use crate::state::RedisConnectionManager;
use crate::{spawn_metrics, CollabRealtimeMetrics, RealtimeClientWebsocketSink};
@ -37,6 +39,7 @@ pub struct CollaborationServer<S, AC> {
#[allow(dead_code)]
metrics: Arc<CollabRealtimeMetrics>,
metrics_calculate: CollabMetricsCalculate,
enable_custom_runtime: bool,
}
impl<S, AC> CollaborationServer<S, AC>
@ -56,8 +59,14 @@ where
edit_state_max_secs: i64,
indexer_provider: Arc<IndexerProvider>,
) -> Result<Self, RealtimeError> {
if cfg!(feature = "collab-rt-multi-thread") {
info!("CollaborationServer with multi-thread feature enabled");
let enable_custom_runtime = get_env_var("APPFLOWY_COLLABORATE_MULTI_THREAD", "false")
.parse::<bool>()
.unwrap_or(false);
if enable_custom_runtime {
info!("CollaborationServer with custom runtime");
} else {
info!("CollaborationServer with actix-web runtime");
}
let metrics_calculate = CollabMetricsCalculate::default();
@ -95,6 +104,7 @@ where
group_sender_by_object_id,
metrics,
metrics_calculate,
enable_custom_runtime,
})
}
@ -178,6 +188,7 @@ where
let group_sender_by_object_id = self.group_sender_by_object_id.clone();
let client_msg_router_by_user = self.connect_state.client_message_routers.clone();
let group_manager = self.group_manager.clone();
let enable_custom_runtime = self.enable_custom_runtime;
Box::pin(async move {
for (object_id, collab_messages) in message_by_oid {
@ -200,7 +211,12 @@ where
let object_id = entry.key().clone();
let clone_notify = notify.clone();
tokio::spawn(runner.run(object_id, clone_notify));
if enable_custom_runtime {
COLLAB_RUNTIME.spawn(runner.run(object_id, clone_notify));
} else {
tokio::spawn(runner.run(object_id, clone_notify));
}
entry.insert(new_sender.clone());
// wait for the runner to be ready to handle the message.

View File

@ -197,7 +197,9 @@ fn apply_updates(messages: &[StreamMessage], collab: &mut Collab) -> Result<(),
let CollabUpdateEvent::UpdateV1 { encode_update } = CollabUpdateEvent::decode(&message.data)?;
let update = Update::decode_v1(&encode_update)
.map_err(|e| CollabError::YrsEncodeStateError(e.to_string()))?;
txn.apply_update(update);
txn
.apply_update(update)
.map_err(|err| HistoryError::Internal(err.into()))?;
}
Ok(())
}