diff --git a/services/appflowy-collaborate/src/group/broadcast.rs b/services/appflowy-collaborate/src/group/broadcast.rs index dbcb3af4..eced8274 100644 --- a/services/appflowy-collaborate/src/group/broadcast.rs +++ b/services/appflowy-collaborate/src/group/broadcast.rs @@ -185,7 +185,7 @@ impl CollabBroadcast { mut sink: Sink, mut stream: Stream, collab: Weak>, - metrics_calculate: CollabRealtimeMetrics, + metrics_calculate: Arc, ) -> Subscription where Sink: SinkExt + Clone + Send + Sync + Unpin + 'static, diff --git a/services/appflowy-collaborate/src/group/group_init.rs b/services/appflowy-collaborate/src/group/group_init.rs index 2ce6d12f..66a6822b 100644 --- a/services/appflowy-collaborate/src/group/group_init.rs +++ b/services/appflowy-collaborate/src/group/group_init.rs @@ -44,7 +44,7 @@ pub struct CollabGroup { /// A list of subscribers to this group. Each subscriber will receive updates from the /// broadcast. subscribers: DashMap, - metrics_calculate: CollabRealtimeMetrics, + metrics_calculate: Arc, destroy_group_tx: mpsc::Sender>>, } @@ -62,7 +62,7 @@ impl CollabGroup { object_id: String, collab_type: CollabType, collab: Arc>, - metrics_calculate: CollabRealtimeMetrics, + metrics_calculate: Arc, storage: Arc, is_new_collab: bool, collab_redis_stream: Arc, diff --git a/services/appflowy-collaborate/src/group/manager.rs b/services/appflowy-collaborate/src/group/manager.rs index 4de9c9ad..338ae944 100644 --- a/services/appflowy-collaborate/src/group/manager.rs +++ b/services/appflowy-collaborate/src/group/manager.rs @@ -30,7 +30,7 @@ pub struct GroupManager { state: GroupManagementState, storage: Arc, access_control: Arc, - metrics_calculate: CollabRealtimeMetrics, + metrics_calculate: Arc, collab_redis_stream: Arc, control_event_stream: Arc>, persistence_interval: Duration, @@ -48,7 +48,7 @@ where pub async fn new( storage: Arc, access_control: Arc, - metrics_calculate: CollabRealtimeMetrics, + metrics_calculate: Arc, collab_stream: CollabRedisStream, persistence_interval: Duration, edit_state_max_count: u32, diff --git a/services/appflowy-collaborate/src/group/state.rs b/services/appflowy-collaborate/src/group/state.rs index 6a79a275..ab1f15e0 100644 --- a/services/appflowy-collaborate/src/group/state.rs +++ b/services/appflowy-collaborate/src/group/state.rs @@ -19,11 +19,11 @@ pub(crate) struct GroupManagementState { group_by_object_id: Arc>>, /// Keep track of all [Collab] objects that a user is subscribed to. editing_by_user: Arc>>, - metrics_calculate: CollabRealtimeMetrics, + metrics_calculate: Arc, } impl GroupManagementState { - pub(crate) fn new(metrics_calculate: CollabRealtimeMetrics) -> Self { + pub(crate) fn new(metrics_calculate: Arc) -> Self { Self { group_by_object_id: Arc::new(DashMap::new()), editing_by_user: Arc::new(DashMap::new()), diff --git a/services/appflowy-collaborate/src/metrics.rs b/services/appflowy-collaborate/src/metrics.rs index 1891d2eb..2e8e6f22 100644 --- a/services/appflowy-collaborate/src/metrics.rs +++ b/services/appflowy-collaborate/src/metrics.rs @@ -1,5 +1,11 @@ +use std::sync::Arc; +use std::time::Duration; + use prometheus_client::metrics::gauge::Gauge; use prometheus_client::registry::Registry; +use tokio::time::interval; + +use database::collab::CollabStorage; #[derive(Clone, Default)] pub struct CollabRealtimeMetrics { @@ -85,6 +91,27 @@ impl CollabRealtimeMetrics { } } +pub(crate) fn spawn_metrics(metrics: Arc, storage: Arc) +where + S: CollabStorage, +{ + tokio::task::spawn_local(async move { + let mut interval = interval(Duration::from_secs(120)); + loop { + interval.tick().await; + + // cache hit rate + let (total, success) = storage.encode_collab_redis_query_state(); + metrics + .total_attempt_get_encode_collab_from_redis + .set(total as i64); + metrics + .total_success_get_encode_collab_from_redis + .set(success as i64); + } + }); +} + #[derive(Clone)] pub struct CollabMetrics { success_write_snapshot_count: Gauge, diff --git a/services/appflowy-collaborate/src/rt_server.rs b/services/appflowy-collaborate/src/rt_server.rs index c7615e34..d43eda26 100644 --- a/services/appflowy-collaborate/src/rt_server.rs +++ b/services/appflowy-collaborate/src/rt_server.rs @@ -24,6 +24,7 @@ use crate::error::{CreateGroupFailedReason, RealtimeError}; use crate::group::cmd::{GroupCommand, GroupCommandRunner, GroupCommandSender}; use crate::group::manager::GroupManager; use crate::indexer::IndexerProvider; +use crate::metrics::spawn_metrics; use crate::rt_server::collaboration_runtime::COLLAB_RUNTIME; use crate::state::RedisConnectionManager; use crate::{CollabRealtimeMetrics, RealtimeClientWebsocketSink}; @@ -37,7 +38,6 @@ pub struct CollaborationServer { storage: Arc, #[allow(dead_code)] metrics: Arc, - metrics_calculate: CollabRealtimeMetrics, enable_custom_runtime: bool, } @@ -68,7 +68,6 @@ where info!("CollaborationServer with actix-web runtime"); } - let metrics_calculate = CollabRealtimeMetrics::default(); let connect_state = ConnectState::new(); let access_control = Arc::new(access_control); let collab_stream = CollabRedisStream::new_with_connection_manager(redis_connection_manager); @@ -76,7 +75,7 @@ where GroupManager::new( storage.clone(), access_control.clone(), - metrics_calculate.clone(), + metrics.clone(), collab_stream, group_persistence_interval, edit_state_max_count, @@ -92,6 +91,8 @@ where spawn_collaboration_command(command_recv, &group_sender_by_object_id); + spawn_metrics(metrics.clone(), storage.clone()); + spawn_handle_unindexed_collabs(indexer_provider); Ok(Self { @@ -100,7 +101,6 @@ where connect_state, group_sender_by_object_id, metrics, - metrics_calculate, enable_custom_runtime, }) } @@ -120,7 +120,7 @@ where let new_client_router = ClientMessageRouter::new(conn_sink); let group_manager = self.group_manager.clone(); let connect_state = self.connect_state.clone(); - let metrics_calculate = self.metrics_calculate.clone(); + let metrics_calculate = self.metrics.clone(); let storage = self.storage.clone(); Box::pin(async move { @@ -152,7 +152,7 @@ where ) -> Pin>>> { let group_manager = self.group_manager.clone(); let connect_state = self.connect_state.clone(); - let metrics_calculate = self.metrics_calculate.clone(); + let metrics_calculate = self.metrics.clone(); let storage = self.storage.clone(); Box::pin(async move {