From 06b7ce0a29de9b653e32802ca8c0670d3abd8335 Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Sat, 16 Nov 2024 07:33:18 +0100 Subject: [PATCH] chore: post merge fixes --- .../src/group/group_init.rs | 1 + .../appflowy-collaborate/src/group/manager.rs | 29 ++++--------------- .../appflowy-collaborate/src/group/state.rs | 12 -------- .../appflowy-collaborate/src/rt_server.rs | 4 ++- src/application.rs | 4 ++- 5 files changed, 13 insertions(+), 37 deletions(-) diff --git a/services/appflowy-collaborate/src/group/group_init.rs b/services/appflowy-collaborate/src/group/group_init.rs index a707409a..2d90ff0a 100644 --- a/services/appflowy-collaborate/src/group/group_init.rs +++ b/services/appflowy-collaborate/src/group/group_init.rs @@ -79,6 +79,7 @@ impl CollabGroup { metrics: Arc, storage: Arc, is_new_collab: bool, + collab_redis_stream: Arc, persistence_interval: Duration, prune_grace_period: Duration, indexer: Option>, diff --git a/services/appflowy-collaborate/src/group/manager.rs b/services/appflowy-collaborate/src/group/manager.rs index aaccec68..9effb7f2 100644 --- a/services/appflowy-collaborate/src/group/manager.rs +++ b/services/appflowy-collaborate/src/group/manager.rs @@ -12,7 +12,7 @@ use access_control::collab::RealtimeAccessControl; use app_error::AppError; use collab_rt_entity::user::RealtimeUser; use collab_rt_entity::CollabMessage; - +use collab_stream::client::CollabRedisStream; use database::collab::{CollabStorage, GetCollabOrigin}; use database_entity::dto::QueryCollabParams; @@ -28,6 +28,7 @@ pub struct GroupManager { storage: Arc, access_control: Arc, metrics_calculate: Arc, + collab_redis_stream: Arc, persistence_interval: Duration, prune_grace_period: Duration, indexer_provider: Arc, @@ -42,15 +43,18 @@ where storage: Arc, access_control: Arc, metrics_calculate: Arc, + collab_stream: CollabRedisStream, persistence_interval: Duration, prune_grace_period: Duration, indexer_provider: Arc, ) -> Result { + let collab_stream = Arc::new(collab_stream); Ok(Self { state: GroupManagementState::new(metrics_calculate.clone()), storage, access_control, metrics_calculate, + collab_redis_stream: collab_stream, persistence_interval, prune_grace_period, indexer_provider, @@ -143,28 +147,6 @@ where } } - let result = load_collab(user.uid, object_id, params, self.storage.clone()).await; - let (collab, _encode_collab) = { - let (mut collab, encode_collab) = match result { - Ok(value) => value, - Err(err) => { - if err.is_record_not_found() { - is_new_collab = true; - let collab = Collab::new_with_origin(CollabOrigin::Server, object_id, vec![], false); - let encode_collab = collab.encode_collab_v1(|_| Ok::<_, RealtimeError>(()))?; - (collab, encode_collab) - } else { - return Err(RealtimeError::CreateGroupFailed( - CreateGroupFailedReason::CannotGetCollabData, - )); - } - }, - }; - - collab.initialize(); - encode_collab - }; - trace!( "[realtime]: create group: uid:{},workspace_id:{},object_id:{}:{}", user.uid, @@ -193,6 +175,7 @@ where self.metrics_calculate.clone(), self.storage.clone(), is_new_collab, + self.collab_redis_stream.clone(), self.persistence_interval, self.prune_grace_period, indexer, diff --git a/services/appflowy-collaborate/src/group/state.rs b/services/appflowy-collaborate/src/group/state.rs index 9d2ff5a0..7f74924a 100644 --- a/services/appflowy-collaborate/src/group/state.rs +++ b/services/appflowy-collaborate/src/group/state.rs @@ -128,18 +128,6 @@ impl GroupManagementState { error!("Group for object_id:{} not found", object_id); } - if let Err(err) = self - .control_event_stream - .lock() - .await - .insert_message(CollabControlEvent::Close { - object_id: object_id.to_string(), - }) - .await - { - error!("Failed to insert close event to control stream: {}", err); - } - self .metrics_calculate .opening_collab_count diff --git a/services/appflowy-collaborate/src/rt_server.rs b/services/appflowy-collaborate/src/rt_server.rs index ff2a7d28..d648854b 100644 --- a/services/appflowy-collaborate/src/rt_server.rs +++ b/services/appflowy-collaborate/src/rt_server.rs @@ -6,6 +6,7 @@ use std::time::Duration; use anyhow::Result; use dashmap::mapref::entry::Entry; use dashmap::DashMap; +use redis::aio::ConnectionManager; use tokio::sync::Notify; use tokio::time::interval; use tracing::{error, info, trace}; @@ -52,7 +53,7 @@ where metrics: Arc, command_recv: CLCommandReceiver, redis_stream_router: Arc, - redis_connection_manager: RedisConnectionManager, + redis_connection_manager: ConnectionManager, group_persistence_interval: Duration, prune_grace_period: Duration, indexer_provider: Arc, @@ -75,6 +76,7 @@ where storage.clone(), access_control.clone(), metrics.clone(), + collab_stream, group_persistence_interval, prune_grace_period, indexer_provider.clone(), diff --git a/src/application.rs b/src/application.rs index 8e34cdda..5441d6d7 100644 --- a/src/application.rs +++ b/src/application.rs @@ -135,7 +135,9 @@ pub async fn run_actix_server( rt_cmd_recv, state.redis_stream_router.clone(), state.redis_connection_manager.clone(), - Duration::from_secs(config.collab.group_persistence_interval_secs) + Duration::from_secs(config.collab.group_persistence_interval_secs), + Duration::from_secs(config.collab.group_prune_grace_period_secs), + state.indexer_provider.clone(), ) .await .unwrap();