From f9397eaaf23061aad0e73376a9beadd8039ffaf9 Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Mon, 4 Nov 2024 07:58:16 +0100 Subject: [PATCH 1/4] chore: remove realtime shared state --- libs/database/src/collab/collab_storage.rs | 11 --- services/appflowy-collaborate/src/api.rs | 9 --- .../appflowy-collaborate/src/application.rs | 6 -- .../src/collab/storage.rs | 20 ----- services/appflowy-collaborate/src/lib.rs | 1 - .../appflowy-collaborate/src/rt_server.rs | 8 -- .../appflowy-collaborate/src/shared_state.rs | 74 ------------------- services/appflowy-collaborate/src/state.rs | 2 - src/api/workspace.rs | 9 --- src/application.rs | 6 -- src/state.rs | 2 - 11 files changed, 148 deletions(-) delete mode 100644 services/appflowy-collaborate/src/shared_state.rs diff --git a/libs/database/src/collab/collab_storage.rs b/libs/database/src/collab/collab_storage.rs index 2da6cddc..a4ef1298 100644 --- a/libs/database/src/collab/collab_storage.rs +++ b/libs/database/src/collab/collab_storage.rs @@ -167,9 +167,6 @@ pub trait CollabStorage: Send + Sync + 'static { /// Returns list of snapshots for given object_id in descending order of creation time. async fn get_collab_snapshot_list(&self, oid: &str) -> AppResult; - - async fn add_connected_user(&self, uid: i64, device_id: &str); - async fn remove_connected_user(&self, uid: i64, device_id: &str); } #[async_trait] @@ -306,14 +303,6 @@ where async fn get_collab_snapshot_list(&self, oid: &str) -> AppResult { self.as_ref().get_collab_snapshot_list(oid).await } - - async fn add_connected_user(&self, uid: i64, device_id: &str) { - self.as_ref().add_connected_user(uid, device_id).await - } - - async fn remove_connected_user(&self, uid: i64, device_id: &str) { - self.as_ref().remove_connected_user(uid, device_id).await - } } #[derive(Debug, Clone, Deserialize, Serialize)] diff --git a/services/appflowy-collaborate/src/api.rs b/services/appflowy-collaborate/src/api.rs index 4c49d3ff..c4de10a9 100644 --- a/services/appflowy-collaborate/src/api.rs +++ b/services/appflowy-collaborate/src/api.rs @@ -115,15 +115,6 @@ async fn post_realtime_message_stream_handler( event!(tracing::Level::INFO, "message len: {}", bytes.len()); let device_id = device_id.to_string(); - // Only send message to websocket server when the user is connected - if !state - .realtime_shared_state - .is_user_connected(&uid, &device_id) - .await - .unwrap_or(false) - { - return Ok(Json(AppResponse::Ok())); - } let message = parser_realtime_msg(bytes.freeze(), req.clone()).await?; let stream_message = ClientStreamMessage { diff --git a/services/appflowy-collaborate/src/application.rs b/services/appflowy-collaborate/src/application.rs index f5466983..aad42797 100644 --- a/services/appflowy-collaborate/src/application.rs +++ b/services/appflowy-collaborate/src/application.rs @@ -28,7 +28,6 @@ use crate::command::{CLCommandReceiver, CLCommandSender}; use crate::config::{Config, DatabaseSetting}; use crate::indexer::IndexerProvider; use crate::pg_listener::PgListeners; -use crate::shared_state::RealtimeSharedState; use crate::snapshot::SnapshotControl; use crate::state::{AppMetrics, AppState, UserCache}; use crate::CollaborationServer; @@ -108,10 +107,6 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result Result { snapshot_control: SnapshotControl, rt_cmd_sender: CLCommandSender, queue: Arc, - shared_state: RealtimeSharedState, } impl CollabStorageImpl @@ -62,7 +60,6 @@ where redis_conn_manager: RedisConnectionManager, metrics: Arc, ) -> Self { - let shared_state = RealtimeSharedState::new(redis_conn_manager.clone()); let queue = Arc::new(StorageQueue::new_with_metrics( cache.clone(), redis_conn_manager, @@ -75,7 +72,6 @@ where snapshot_control, rt_cmd_sender, queue, - shared_state, } } @@ -488,22 +484,6 @@ where self.snapshot_control.get_collab_snapshot_list(oid).await } - async fn add_connected_user(&self, uid: i64, device_id: &str) { - if let Err(err) = self.shared_state.add_connected_user(uid, device_id).await { - error!("Failed to add connected user: {}", err); - } - } - - async fn remove_connected_user(&self, uid: i64, device_id: &str) { - if let Err(err) = self - .shared_state - .remove_connected_user(uid, device_id) - .await - { - error!("Failed to remove connected user: {}", err); - } - } - async fn broadcast_encode_collab( &self, object_id: String, diff --git a/services/appflowy-collaborate/src/lib.rs b/services/appflowy-collaborate/src/lib.rs index 6d446e71..2fa28cf5 100644 --- a/services/appflowy-collaborate/src/lib.rs +++ b/services/appflowy-collaborate/src/lib.rs @@ -14,7 +14,6 @@ pub mod metrics; mod permission; mod pg_listener; mod rt_server; -pub mod shared_state; pub mod snapshot; mod state; pub mod telemetry; diff --git a/services/appflowy-collaborate/src/rt_server.rs b/services/appflowy-collaborate/src/rt_server.rs index c9640d41..1cd75b0d 100644 --- a/services/appflowy-collaborate/src/rt_server.rs +++ b/services/appflowy-collaborate/src/rt_server.rs @@ -126,10 +126,6 @@ where let storage = self.storage.clone(); Box::pin(async move { - storage - .add_connected_user(connected_user.uid, &connected_user.device_id) - .await; - if let Some(old_user) = connect_state.handle_user_connect(connected_user, new_client_router) { // Remove the old user from all collaboration groups. group_manager.remove_user(&old_user).await; @@ -161,10 +157,6 @@ where trace!("[realtime]: disconnect => {}", disconnect_user); let was_removed = connect_state.handle_user_disconnect(&disconnect_user); if was_removed.is_some() { - storage - .remove_connected_user(disconnect_user.uid, &disconnect_user.device_id) - .await; - metrics_calculate .connected_users .set(connect_state.number_of_connected_users() as i64); diff --git a/services/appflowy-collaborate/src/shared_state.rs b/services/appflowy-collaborate/src/shared_state.rs deleted file mode 100644 index f57b2d72..00000000 --- a/services/appflowy-collaborate/src/shared_state.rs +++ /dev/null @@ -1,74 +0,0 @@ -use crate::error::RealtimeError; -use futures_util::StreamExt; -use redis::{pipe, AsyncCommands, AsyncIter}; - -#[derive(Clone)] -pub struct RealtimeSharedState { - redis_conn_manager: redis::aio::ConnectionManager, -} - -impl RealtimeSharedState { - pub fn new(redis_conn_manager: redis::aio::ConnectionManager) -> Self { - Self { redis_conn_manager } - } - pub async fn add_connected_user(&self, uid: i64, device_id: &str) -> Result<(), RealtimeError> { - let mut conn = self.redis_conn_manager.clone(); - let key = realtime_shared_state_cache_key(&uid, device_id); - conn - .set_ex(key, "1", 60 * 60 * 3) - .await - .map_err(|err| RealtimeError::Internal(err.into()))?; - Ok(()) - } - - pub async fn remove_connected_user( - &self, - uid: i64, - device_id: &str, - ) -> Result<(), RealtimeError> { - let mut conn = self.redis_conn_manager.clone(); - let key = realtime_shared_state_cache_key(&uid, device_id); - conn - .del(key) - .await - .map_err(|err| RealtimeError::Internal(err.into()))?; - Ok(()) - } - - pub async fn is_user_connected(&self, uid: &i64, device_id: &str) -> Result { - let mut conn = self.redis_conn_manager.clone(); - let key = realtime_shared_state_cache_key(uid, device_id); - let result: Option = conn - .get(key) - .await - .map_err(|err| RealtimeError::Internal(err.into()))?; - Ok(result.is_some()) - } - - pub async fn remove_all_connected_users(&self) -> Result<(), RealtimeError> { - let mut conn = self.redis_conn_manager.clone(); - let iter: AsyncIter = conn - .scan_match(format!("{}:*", REALTIME_SHARE_STATE_PREFIX)) - .await - .map_err(|err| RealtimeError::Internal(err.into()))?; - let keys_to_delete: Vec<_> = iter.collect().await; - if !keys_to_delete.is_empty() { - let mut pipeline = pipe(); - for key in keys_to_delete.iter() { - pipeline.del(key); - } - pipeline - .query_async(&mut conn) - .await - .map_err(|err| RealtimeError::Internal(err.into()))?; - } - Ok(()) - } -} - -pub(crate) const REALTIME_SHARE_STATE_PREFIX: &str = "realtime_shared_state_v0"; - -#[inline] -pub(crate) fn realtime_shared_state_cache_key(uid: &i64, device_id: &str) -> String { - format!("{}:{}:{}", REALTIME_SHARE_STATE_PREFIX, uid, device_id) -} diff --git a/services/appflowy-collaborate/src/state.rs b/services/appflowy-collaborate/src/state.rs index b7800307..a324f46a 100644 --- a/services/appflowy-collaborate/src/state.rs +++ b/services/appflowy-collaborate/src/state.rs @@ -15,7 +15,6 @@ use crate::config::Config; use crate::indexer::IndexerProvider; use crate::metrics::CollabMetrics; use crate::pg_listener::PgListeners; -use crate::shared_state::RealtimeSharedState; use crate::CollabRealtimeMetrics; pub type RedisConnectionManager = redis::aio::ConnectionManager; @@ -29,7 +28,6 @@ pub struct AppState { pub access_control: AccessControl, pub collab_access_control_storage: Arc, pub metrics: AppMetrics, - pub realtime_shared_state: RealtimeSharedState, pub indexer_provider: Arc, } diff --git a/src/api/workspace.rs b/src/api/workspace.rs index 470fcc84..aa059ce9 100644 --- a/src/api/workspace.rs +++ b/src/api/workspace.rs @@ -1517,15 +1517,6 @@ async fn post_realtime_message_stream_handler( event!(tracing::Level::INFO, "message len: {}", bytes.len()); let device_id = device_id.to_string(); - // Only send message to websocket server when the user is connected - if !state - .realtime_shared_state - .is_user_connected(&uid, &device_id) - .await - .unwrap_or(false) - { - return Ok(Json(AppResponse::Ok())); - } let message = parser_realtime_msg(bytes.freeze(), req.clone()).await?; let stream_message = ClientStreamMessage { diff --git a/src/application.rs b/src/application.rs index 9b1fba54..652a4275 100644 --- a/src/application.rs +++ b/src/application.rs @@ -40,7 +40,6 @@ use appflowy_collaborate::actix_ws::server::RealtimeServerActor; use appflowy_collaborate::collab::storage::CollabStorageImpl; use appflowy_collaborate::command::{CLCommandReceiver, CLCommandSender}; use appflowy_collaborate::indexer::IndexerProvider; -use appflowy_collaborate::shared_state::RealtimeSharedState; use appflowy_collaborate::snapshot::SnapshotControl; use appflowy_collaborate::CollaborationServer; use database::file::s3_client_impl::{AwsS3BucketClientImpl, S3BucketStorage}; @@ -316,10 +315,6 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result Result>>, pub indexer_provider: Arc, } From f105c2dbc278f0094bdd33f4ed931d33aecb9b95 Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Mon, 4 Nov 2024 08:20:56 +0100 Subject: [PATCH 2/4] chore: fix clippy errors --- services/appflowy-collaborate/src/application.rs | 2 +- services/appflowy-collaborate/src/rt_server.rs | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/services/appflowy-collaborate/src/application.rs b/services/appflowy-collaborate/src/application.rs index aad42797..fd670fe8 100644 --- a/services/appflowy-collaborate/src/application.rs +++ b/services/appflowy-collaborate/src/application.rs @@ -14,7 +14,7 @@ use database::collab::cache::CollabCache; use secrecy::ExposeSecret; use sqlx::postgres::PgPoolOptions; use sqlx::PgPool; -use tracing::{info, warn}; +use tracing::info; use crate::actix_ws::server::RealtimeServerActor; use crate::collab::access_control::CollabStorageAccessControlImpl; diff --git a/services/appflowy-collaborate/src/rt_server.rs b/services/appflowy-collaborate/src/rt_server.rs index 1cd75b0d..789d8c0a 100644 --- a/services/appflowy-collaborate/src/rt_server.rs +++ b/services/appflowy-collaborate/src/rt_server.rs @@ -123,7 +123,6 @@ where let group_manager = self.group_manager.clone(); let connect_state = self.connect_state.clone(); let metrics_calculate = self.metrics.clone(); - let storage = self.storage.clone(); Box::pin(async move { if let Some(old_user) = connect_state.handle_user_connect(connected_user, new_client_router) { @@ -151,7 +150,6 @@ where let group_manager = self.group_manager.clone(); let connect_state = self.connect_state.clone(); let metrics_calculate = self.metrics.clone(); - let storage = self.storage.clone(); Box::pin(async move { trace!("[realtime]: disconnect => {}", disconnect_user); From 356cf5d1320217080d8996810fb9a9d27ba3359f Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Mon, 4 Nov 2024 08:29:32 +0100 Subject: [PATCH 3/4] chore: fix clippy errors --- services/appflowy-collaborate/src/rt_server.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/services/appflowy-collaborate/src/rt_server.rs b/services/appflowy-collaborate/src/rt_server.rs index 789d8c0a..c8b762a0 100644 --- a/services/appflowy-collaborate/src/rt_server.rs +++ b/services/appflowy-collaborate/src/rt_server.rs @@ -35,7 +35,6 @@ pub struct CollaborationServer { group_manager: Arc>, connect_state: ConnectState, group_sender_by_object_id: Arc>, - storage: Arc, #[allow(dead_code)] metrics: Arc, enable_custom_runtime: bool, @@ -95,10 +94,9 @@ where spawn_metrics(metrics.clone(), storage.clone()); - spawn_handle_unindexed_collabs(indexer_provider, storage.clone()); + spawn_handle_unindexed_collabs(indexer_provider, storage); Ok(Self { - storage, group_manager, connect_state, group_sender_by_object_id, From 6aed9609b9568e2f838860cf9d1e91fb9caeafe6 Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Mon, 4 Nov 2024 09:21:10 +0100 Subject: [PATCH 4/4] chore: removed realtime shared state tests --- .../tests/shared_state_test.rs | 63 ------------------- 1 file changed, 63 deletions(-) delete mode 100644 services/appflowy-collaborate/tests/shared_state_test.rs diff --git a/services/appflowy-collaborate/tests/shared_state_test.rs b/services/appflowy-collaborate/tests/shared_state_test.rs deleted file mode 100644 index 0485b613..00000000 --- a/services/appflowy-collaborate/tests/shared_state_test.rs +++ /dev/null @@ -1,63 +0,0 @@ -use anyhow::Context; -use appflowy_collaborate::shared_state::RealtimeSharedState; - -async fn redis_client() -> redis::Client { - let redis_uri = "redis://localhost:6379"; - redis::Client::open(redis_uri) - .context("failed to connect to redis") - .unwrap() -} - -#[tokio::test] -async fn connected_user_test() { - let redis_client = redis_client().await; - let shared_state = RealtimeSharedState::new(redis_client.get_connection_manager().await.unwrap()); - - let device_id = uuid::Uuid::new_v4().to_string(); - let is_connected = shared_state - .is_user_connected(&1, &device_id) - .await - .unwrap(); - assert!(!is_connected); - - shared_state - .add_connected_user(1, &device_id) - .await - .unwrap(); - - let is_connected = shared_state - .is_user_connected(&1, &device_id) - .await - .unwrap(); - assert!(is_connected); - - shared_state - .remove_connected_user(1, &device_id) - .await - .unwrap(); - - let is_connected = shared_state - .is_user_connected(&1, &device_id) - .await - .unwrap(); - assert!(!is_connected); -} - -#[tokio::test] -async fn remove_all_connected_user_test() { - let redis_client = redis_client().await; - let shared_state = RealtimeSharedState::new(redis_client.get_connection_manager().await.unwrap()); - - let device_id = uuid::Uuid::new_v4().to_string(); - shared_state - .add_connected_user(1, &device_id) - .await - .unwrap(); - shared_state.remove_all_connected_users().await.unwrap(); - - let is_connected = shared_state - .is_user_connected(&1, &device_id) - .await - .unwrap(); - assert!(!is_connected); -}