From 225887dbee7b246cd35b45dd8085a62eac4899d3 Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Tue, 20 Feb 2024 14:26:26 +0800 Subject: [PATCH] chore: remove sink lock (#334) * chore: remove sink lock * chore: clippy --- libs/realtime/src/collaborate/broadcast.rs | 49 +++++++------------ .../realtime/src/collaborate/group_control.rs | 2 +- libs/realtime/src/util/channel_ext.rs | 1 + src/biz/casbin/access_control.rs | 10 ++-- 4 files changed, 23 insertions(+), 39 deletions(-) diff --git a/libs/realtime/src/collaborate/broadcast.rs b/libs/realtime/src/collaborate/broadcast.rs index 78e17d33..82e06ffe 100644 --- a/libs/realtime/src/collaborate/broadcast.rs +++ b/libs/realtime/src/collaborate/broadcast.rs @@ -1,4 +1,4 @@ -use anyhow::{anyhow, Error}; +use anyhow::anyhow; use collab::core::awareness; use std::future::Future; use std::iter::Take; @@ -130,19 +130,18 @@ impl CollabBroadcast { pub fn subscribe( &self, subscriber_origin: CollabOrigin, - sink: Sink, + mut sink: Sink, mut stream: Stream, modified_at: Arc>, ) -> Subscription where - Sink: SinkExt + Send + Sync + Unpin + 'static, + Sink: SinkExt + Clone + Send + Sync + Unpin + 'static, Stream: StreamExt> + Send + Sync + Unpin + 'static, >::Error: std::error::Error + Send + Sync, E: Into + Send + Sync + 'static, { let cloned_origin = subscriber_origin.clone(); trace!("[realtime]: new subscriber: {}", subscriber_origin); - let sink = Arc::new(Mutex::new(sink)); // Receive a update from the document observer and forward the update to all // connected subscribers using its Sink. let sink_stop_tx = { @@ -199,7 +198,7 @@ impl CollabBroadcast { match result { Some(Ok(collab_msg)) => { if object_id == collab_msg.object_id() && collab_msg.payload().is_some() { - handle_user_collab_message(&object_id, &sink, &collab_msg, &collab).await; + handle_client_collab_message(&object_id, &mut sink, &collab_msg, &collab).await; if let Ok(mut modified_at) = modified_at.try_lock() { *modified_at = Instant::now(); } @@ -226,9 +225,10 @@ impl CollabBroadcast { } } -async fn handle_user_collab_message( +/// Handle the message sent from the client +async fn handle_client_collab_message( object_id: &str, - sink: &Arc>, + sink: &mut Sink, collab_msg: &CollabMessage, collab: &MutexCollab, ) where @@ -247,13 +247,11 @@ async fn handle_user_collab_message( Ok(msg) => { let cloned_collab = collab.clone(); let cloned_origin = origin.clone(); - let result = tokio::task::spawn_blocking(move || { - handle_collab_message(&cloned_origin, &ServerSyncProtocol, &cloned_collab, msg) - }) - .await; + let result = + handle_collab_message(&cloned_origin, &ServerSyncProtocol, &cloned_collab, msg); match result { - Ok(Ok(payload)) => match origin.as_ref() { + Ok(payload) => match origin.as_ref() { None => warn!("Client message does not have a origin"), Some(origin) => { if let Some(msg_id) = collab_msg.msg_id() { @@ -266,22 +264,14 @@ async fn handle_user_collab_message( ); trace!("Send response to client: {}", resp); - match sink.try_lock() { - Ok(mut sink) => { - if let Err(err) = sink.send(resp.into()).await { - trace!("fail to send response to client: {}", err); - } - }, - Err(err) => error!("Requires sink lock failed: {:?}", err), + if let Err(err) = sink.send(resp.into()).await { + trace!("fail to send response to client: {}", err); } } }, }, - Ok(Err(err)) => { - error!("object id:{} =>{}", object_id, err); - }, Err(err) => { - error!("internal error when handle user ws message: {}", err); + error!("object id:{} =>{}", object_id, err); }, } }, @@ -356,14 +346,14 @@ fn gen_awareness_update_message( Ok(update) } -pub struct SinkCollabMessageAction<'a, Sink> { - pub sink: &'a Arc>, +pub struct SinkCollabMessageAction<'a, Sink: Clone> { + pub sink: &'a Sink, pub message: CollabMessage, } impl<'a, Sink> SinkCollabMessageAction<'a, Sink> where - Sink: SinkExt + Send + Sync + Unpin + 'a, + Sink: SinkExt + Clone + Send + Sync + Unpin + 'a, { pub fn run(self) -> Retry, SinkCollabMessageAction<'a, Sink>> { let retry_strategy = FixedInterval::new(Duration::from_secs(2)).take(5); @@ -373,19 +363,16 @@ where impl<'a, Sink> Action for SinkCollabMessageAction<'a, Sink> where - Sink: SinkExt + Send + Sync + Unpin + 'a, + Sink: SinkExt + Clone + Send + Sync + Unpin + 'a, { type Future = Pin> + Send + Sync + 'a>>; type Item = (); type Error = RealtimeError; fn run(&mut self) -> Self::Future { - let sink = self.sink.clone(); + let mut sink = self.sink.clone(); let message = self.message.clone(); Box::pin(async move { - let mut sink = sink - .try_lock() - .map_err(|err| RealtimeError::Internal(Error::from(err)))?; sink .send(message) .await diff --git a/libs/realtime/src/collaborate/group_control.rs b/libs/realtime/src/collaborate/group_control.rs index 9cc50752..846887d0 100644 --- a/libs/realtime/src/collaborate/group_control.rs +++ b/libs/realtime/src/collaborate/group_control.rs @@ -228,7 +228,7 @@ where sink: Sink, stream: Stream, ) where - Sink: SinkExt + Send + Sync + Unpin + 'static, + Sink: SinkExt + Clone + Send + Sync + Unpin + 'static, Stream: StreamExt> + Send + Sync + Unpin + 'static, >::Error: std::error::Error + Send + Sync, E: Into + Send + Sync + 'static, diff --git a/libs/realtime/src/util/channel_ext.rs b/libs/realtime/src/util/channel_ext.rs index c23d55bd..887312cb 100644 --- a/libs/realtime/src/util/channel_ext.rs +++ b/libs/realtime/src/util/channel_ext.rs @@ -4,6 +4,7 @@ use std::fmt::Debug; use std::pin::Pin; use std::task::{Context, Poll}; +#[derive(Clone)] pub struct UnboundedSenderSink(pub tokio::sync::mpsc::UnboundedSender); impl UnboundedSenderSink { diff --git a/src/biz/casbin/access_control.rs b/src/biz/casbin/access_control.rs index 6478ade2..ccafabf4 100644 --- a/src/biz/casbin/access_control.rs +++ b/src/biz/casbin/access_control.rs @@ -14,7 +14,7 @@ use anyhow::anyhow; use dashmap::DashMap; use sqlx::PgPool; use std::sync::Arc; -use std::time::Instant; + use tokio::sync::broadcast; /// Manages access control. @@ -32,6 +32,7 @@ use tokio::sync::broadcast; #[derive(Clone)] pub struct AccessControl { enforcer: Arc, + #[allow(dead_code)] access_control_metrics: Arc, } @@ -97,12 +98,7 @@ impl AccessControl { where A: ToCasbinAction, { - let start = Instant::now(); - let result = self.enforcer.enforce(uid, obj, act).await; - self - .access_control_metrics - .record_enforce_duration(start.elapsed().as_millis() as u64); - result + self.enforcer.enforce(uid, obj, act).await } pub async fn get_access_level(&self, uid: &i64, oid: &str) -> Option {