chore: fix local set warnings (#515)

* chore: fix local set warnings

* chore: clippy
This commit is contained in:
Nathan.fooo 2024-05-01 23:07:57 +08:00 committed by GitHub
parent ca1623fcc2
commit 59b3d69c42
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 37 additions and 44 deletions

View File

@ -1,4 +1,3 @@
use crate::rt_server::rt_spawn;
use crate::util::channel_ext::UnboundedSenderSink;
use crate::RealtimeAccessControl;
use async_trait::async_trait;
@ -73,7 +72,7 @@ impl ClientMessageRouter {
let sink_workspace_id = workspace_id.to_string();
let uid = user.uid;
let client_sink = UnboundedSenderSink::<T>::new(client_sink_tx);
rt_spawn(async move {
tokio::spawn(async move {
while let Some(msg) = client_sink_rx.recv().await {
let result = sink_access_control
.can_read_collab(&sink_workspace_id, &uid, &target_object_id)
@ -102,7 +101,7 @@ impl ClientMessageRouter {
// forward the message to the subscriber which is the broadcast channel [CollabBroadcast].
let (client_msg_rx, rx) = tokio::sync::mpsc::channel(100);
let client_stream = ReceiverStream::new(rx);
rt_spawn(async move {
tokio::spawn(async move {
while let Some(Ok(realtime_msg)) = stream_rx.next().await {
match realtime_msg.transform() {
Ok(messages_by_oid) => {

View File

@ -1,5 +1,4 @@
use crate::group::cmd::{GroupCommand, GroupCommandSender};
use crate::rt_server::rt_spawn;
use collab::entity::EncodedCollab;
use dashmap::DashMap;
use std::sync::Arc;
@ -21,7 +20,7 @@ pub(crate) fn spawn_collaboration_command(
group_sender_by_object_id: &Arc<DashMap<String, GroupCommandSender>>,
) {
let group_sender_by_object_id = group_sender_by_object_id.clone();
rt_spawn(async move {
tokio::spawn(async move {
while let Some(cmd) = command_recv.recv().await {
match cmd {
CollaborationCommand::GetEncodeCollab { object_id, ret } => {

View File

@ -2,7 +2,6 @@ use crate::error::RealtimeError;
use crate::group::group_init::EditState;
use crate::group::protocol::ServerSyncProtocol;
use crate::metrics::CollabMetricsCalculate;
use crate::rt_server::rt_spawn;
use anyhow::anyhow;
use bytes::Bytes;
use collab::core::awareness::{gen_awareness_update_message, AwarenessUpdateSubscription};
@ -190,7 +189,7 @@ impl CollabBroadcast {
// connected subscriber using its Sink. The loop will break if the stop_rx receives a message.
let mut receiver = self.broadcast_sender.subscribe();
let cloned_user = user.clone();
rt_spawn(async move {
tokio::spawn(async move {
loop {
select! {
_ = stop_rx.recv() => break,
@ -226,7 +225,7 @@ impl CollabBroadcast {
// the stream will continue to receive messages from the client and it will stop if the stop_rx
// receives a message. If the client's message alter the document state, it will trigger the
// document observer and broadcast the update to all connected subscribers. Check out the [observe_update_v1] and [sink_task] above.
rt_spawn(async move {
tokio::spawn(async move {
loop {
select! {
_ = stop_rx.recv() => {

View File

@ -13,7 +13,6 @@ use collab_rt_entity::CollabMessage;
use collab_rt_entity::MessageByObjectId;
use database::collab::CollabStorage;
use crate::rt_server::rt_spawn;
use collab::core::collab::MutexCollab;
use futures_util::{SinkExt, StreamExt};
@ -63,7 +62,7 @@ impl CollabGroup {
let broadcast = CollabBroadcast::new(&object_id, 10, edit_state.clone(), &collab).await;
let (destroy_group_tx, rx) = mpsc::channel(1);
rt_spawn(
tokio::spawn(
GroupPersistence::new(
workspace_id.clone(),
object_id.clone(),

View File

@ -1,5 +1,3 @@
use crate::rt_server::rt_spawn;
use collab::core::collab::WeakMutexCollab;
use collab::preclude::CollabPlugin;
use collab_entity::CollabType;
@ -73,7 +71,7 @@ where
let object_id = self.object_id.clone();
let workspace_id = self.workspace_id.clone();
rt_spawn(async move {
tokio::spawn(async move {
sleep(std::time::Duration::from_secs(2)).await;
match storage.should_create_snapshot(&object_id).await {
Ok(should_do) => {

View File

@ -14,22 +14,17 @@ use collab_rt_entity::MessageByObjectId;
use dashmap::mapref::entry::Entry;
use dashmap::DashMap;
use database::collab::CollabStorage;
use lazy_static::lazy_static;
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::sync::{Arc, Weak};
use std::time::Duration;
use tokio::runtime;
use tokio::runtime::Runtime;
use tokio::sync::Notify;
use tokio::time::interval;
use tracing::{error, info, trace};
lazy_static! {
pub(crate) static ref COLLAB_RUNTIME: Runtime = default_tokio_runtime().unwrap();
}
#[derive(Clone)]
pub struct CollaborationServer<S, AC> {
/// Keep track of all collab groups
@ -178,7 +173,7 @@ where
let object_id = entry.key().clone();
let clone_notify = notify.clone();
rt_spawn(runner.run(object_id, clone_notify));
tokio::spawn(runner.run(object_id, clone_notify));
entry.insert(new_sender.clone());
// wait for the runner to be ready to handle the message.
@ -239,32 +234,36 @@ fn spawn_period_check_inactive_group<S, AC>(
});
}
pub fn default_tokio_runtime() -> io::Result<Runtime> {
runtime::Builder::new_multi_thread()
.thread_name("collab-rt")
.enable_io()
.enable_time()
.build()
}
/// When the CollaborationServer operates within an actix-web actor, utilizing tokio::spawn for
/// task execution confines all tasks to the same thread, attributable to the actor's reliance on a
/// single-threaded Tokio runtime. To circumvent this limitation and enable task execution across
/// multiple threads, we've incorporated a multi-thread feature.
///
/// When appflowy-collaborate is deployed as a standalone service, we can use tokio multi-thread.
#[cfg(feature = "collab-rt-multi-thread")]
pub(crate) fn rt_spawn<T>(future: T) -> tokio::task::JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
COLLAB_RUNTIME.spawn(future)
}
mod collaboration_runtime {
use lazy_static::lazy_static;
use std::future::Future;
use std::io;
use tokio::runtime;
use tokio::runtime::Runtime;
lazy_static! {
pub(crate) static ref COLLAB_RUNTIME: Runtime = default_tokio_runtime().unwrap();
}
#[cfg(not(feature = "collab-rt-multi-thread"))]
pub(crate) fn rt_spawn<T>(future: T) -> tokio::task::JoinHandle<T::Output>
where
T: Future + 'static,
T::Output: Send + 'static,
{
tokio::task::spawn_local(future)
pub fn default_tokio_runtime() -> io::Result<Runtime> {
runtime::Builder::new_multi_thread()
.thread_name("collab-rt")
.enable_io()
.enable_time()
.build()
}
pub(crate) fn spawn<T>(future: T) -> tokio::task::JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
COLLAB_RUNTIME.spawn(future)
}
}