From 59b3d69c426394680eb1cdcdf00fe08681ba7a5e Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Wed, 1 May 2024 23:07:57 +0800 Subject: [PATCH] chore: fix local set warnings (#515) * chore: fix local set warnings * chore: clippy --- .../src/client/client_msg_router.rs | 5 +- services/appflowy-collaborate/src/command.rs | 3 +- .../src/group/broadcast.rs | 5 +- .../src/group/group_init.rs | 3 +- .../src/group/plugin/history_plugin.rs | 4 +- .../appflowy-collaborate/src/rt_server.rs | 61 +++++++++---------- 6 files changed, 37 insertions(+), 44 deletions(-) diff --git a/services/appflowy-collaborate/src/client/client_msg_router.rs b/services/appflowy-collaborate/src/client/client_msg_router.rs index d18da9aa..3494719b 100644 --- a/services/appflowy-collaborate/src/client/client_msg_router.rs +++ b/services/appflowy-collaborate/src/client/client_msg_router.rs @@ -1,4 +1,3 @@ -use crate::rt_server::rt_spawn; use crate::util::channel_ext::UnboundedSenderSink; use crate::RealtimeAccessControl; use async_trait::async_trait; @@ -73,7 +72,7 @@ impl ClientMessageRouter { let sink_workspace_id = workspace_id.to_string(); let uid = user.uid; let client_sink = UnboundedSenderSink::::new(client_sink_tx); - rt_spawn(async move { + tokio::spawn(async move { while let Some(msg) = client_sink_rx.recv().await { let result = sink_access_control .can_read_collab(&sink_workspace_id, &uid, &target_object_id) @@ -102,7 +101,7 @@ impl ClientMessageRouter { // forward the message to the subscriber which is the broadcast channel [CollabBroadcast]. let (client_msg_rx, rx) = tokio::sync::mpsc::channel(100); let client_stream = ReceiverStream::new(rx); - rt_spawn(async move { + tokio::spawn(async move { while let Some(Ok(realtime_msg)) = stream_rx.next().await { match realtime_msg.transform() { Ok(messages_by_oid) => { diff --git a/services/appflowy-collaborate/src/command.rs b/services/appflowy-collaborate/src/command.rs index b875ae4c..40761915 100644 --- a/services/appflowy-collaborate/src/command.rs +++ b/services/appflowy-collaborate/src/command.rs @@ -1,5 +1,4 @@ use crate::group::cmd::{GroupCommand, GroupCommandSender}; -use crate::rt_server::rt_spawn; use collab::entity::EncodedCollab; use dashmap::DashMap; use std::sync::Arc; @@ -21,7 +20,7 @@ pub(crate) fn spawn_collaboration_command( group_sender_by_object_id: &Arc>, ) { let group_sender_by_object_id = group_sender_by_object_id.clone(); - rt_spawn(async move { + tokio::spawn(async move { while let Some(cmd) = command_recv.recv().await { match cmd { CollaborationCommand::GetEncodeCollab { object_id, ret } => { diff --git a/services/appflowy-collaborate/src/group/broadcast.rs b/services/appflowy-collaborate/src/group/broadcast.rs index 094391d4..718672e8 100644 --- a/services/appflowy-collaborate/src/group/broadcast.rs +++ b/services/appflowy-collaborate/src/group/broadcast.rs @@ -2,7 +2,6 @@ use crate::error::RealtimeError; use crate::group::group_init::EditState; use crate::group::protocol::ServerSyncProtocol; use crate::metrics::CollabMetricsCalculate; -use crate::rt_server::rt_spawn; use anyhow::anyhow; use bytes::Bytes; use collab::core::awareness::{gen_awareness_update_message, AwarenessUpdateSubscription}; @@ -190,7 +189,7 @@ impl CollabBroadcast { // connected subscriber using its Sink. The loop will break if the stop_rx receives a message. let mut receiver = self.broadcast_sender.subscribe(); let cloned_user = user.clone(); - rt_spawn(async move { + tokio::spawn(async move { loop { select! { _ = stop_rx.recv() => break, @@ -226,7 +225,7 @@ impl CollabBroadcast { // the stream will continue to receive messages from the client and it will stop if the stop_rx // receives a message. If the client's message alter the document state, it will trigger the // document observer and broadcast the update to all connected subscribers. Check out the [observe_update_v1] and [sink_task] above. - rt_spawn(async move { + tokio::spawn(async move { loop { select! { _ = stop_rx.recv() => { diff --git a/services/appflowy-collaborate/src/group/group_init.rs b/services/appflowy-collaborate/src/group/group_init.rs index 266f432f..ea544e15 100644 --- a/services/appflowy-collaborate/src/group/group_init.rs +++ b/services/appflowy-collaborate/src/group/group_init.rs @@ -13,7 +13,6 @@ use collab_rt_entity::CollabMessage; use collab_rt_entity::MessageByObjectId; use database::collab::CollabStorage; -use crate::rt_server::rt_spawn; use collab::core::collab::MutexCollab; use futures_util::{SinkExt, StreamExt}; @@ -63,7 +62,7 @@ impl CollabGroup { let broadcast = CollabBroadcast::new(&object_id, 10, edit_state.clone(), &collab).await; let (destroy_group_tx, rx) = mpsc::channel(1); - rt_spawn( + tokio::spawn( GroupPersistence::new( workspace_id.clone(), object_id.clone(), diff --git a/services/appflowy-collaborate/src/group/plugin/history_plugin.rs b/services/appflowy-collaborate/src/group/plugin/history_plugin.rs index 31211c68..9cb64cdb 100644 --- a/services/appflowy-collaborate/src/group/plugin/history_plugin.rs +++ b/services/appflowy-collaborate/src/group/plugin/history_plugin.rs @@ -1,5 +1,3 @@ -use crate::rt_server::rt_spawn; - use collab::core::collab::WeakMutexCollab; use collab::preclude::CollabPlugin; use collab_entity::CollabType; @@ -73,7 +71,7 @@ where let object_id = self.object_id.clone(); let workspace_id = self.workspace_id.clone(); - rt_spawn(async move { + tokio::spawn(async move { sleep(std::time::Duration::from_secs(2)).await; match storage.should_create_snapshot(&object_id).await { Ok(should_do) => { diff --git a/services/appflowy-collaborate/src/rt_server.rs b/services/appflowy-collaborate/src/rt_server.rs index 61669fd6..4076d35a 100644 --- a/services/appflowy-collaborate/src/rt_server.rs +++ b/services/appflowy-collaborate/src/rt_server.rs @@ -14,22 +14,17 @@ use collab_rt_entity::MessageByObjectId; use dashmap::mapref::entry::Entry; use dashmap::DashMap; use database::collab::CollabStorage; -use lazy_static::lazy_static; + use std::future::Future; -use std::io; + use std::pin::Pin; use std::sync::{Arc, Weak}; use std::time::Duration; -use tokio::runtime; -use tokio::runtime::Runtime; + use tokio::sync::Notify; use tokio::time::interval; use tracing::{error, info, trace}; -lazy_static! { - pub(crate) static ref COLLAB_RUNTIME: Runtime = default_tokio_runtime().unwrap(); -} - #[derive(Clone)] pub struct CollaborationServer { /// Keep track of all collab groups @@ -178,7 +173,7 @@ where let object_id = entry.key().clone(); let clone_notify = notify.clone(); - rt_spawn(runner.run(object_id, clone_notify)); + tokio::spawn(runner.run(object_id, clone_notify)); entry.insert(new_sender.clone()); // wait for the runner to be ready to handle the message. @@ -239,32 +234,36 @@ fn spawn_period_check_inactive_group( }); } -pub fn default_tokio_runtime() -> io::Result { - runtime::Builder::new_multi_thread() - .thread_name("collab-rt") - .enable_io() - .enable_time() - .build() -} - /// When the CollaborationServer operates within an actix-web actor, utilizing tokio::spawn for /// task execution confines all tasks to the same thread, attributable to the actor's reliance on a /// single-threaded Tokio runtime. To circumvent this limitation and enable task execution across /// multiple threads, we've incorporated a multi-thread feature. +/// +/// When appflowy-collaborate is deployed as a standalone service, we can use tokio multi-thread. #[cfg(feature = "collab-rt-multi-thread")] -pub(crate) fn rt_spawn(future: T) -> tokio::task::JoinHandle -where - T: Future + Send + 'static, - T::Output: Send + 'static, -{ - COLLAB_RUNTIME.spawn(future) -} +mod collaboration_runtime { + use lazy_static::lazy_static; + use std::future::Future; + use std::io; + use tokio::runtime; + use tokio::runtime::Runtime; + lazy_static! { + pub(crate) static ref COLLAB_RUNTIME: Runtime = default_tokio_runtime().unwrap(); + } -#[cfg(not(feature = "collab-rt-multi-thread"))] -pub(crate) fn rt_spawn(future: T) -> tokio::task::JoinHandle -where - T: Future + 'static, - T::Output: Send + 'static, -{ - tokio::task::spawn_local(future) + pub fn default_tokio_runtime() -> io::Result { + runtime::Builder::new_multi_thread() + .thread_name("collab-rt") + .enable_io() + .enable_time() + .build() + } + + pub(crate) fn spawn(future: T) -> tokio::task::JoinHandle + where + T: Future + Send + 'static, + T::Output: Send + 'static, + { + COLLAB_RUNTIME.spawn(future) + } }