From 5b2b717fe747cb0e923c3a94f384cbbeb4ad4351 Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Fri, 30 Aug 2024 05:14:44 +0200 Subject: [PATCH] chore: remove redundant metrics collector --- .../src/group/broadcast.rs | 25 ++-- .../src/group/group_init.rs | 6 +- .../appflowy-collaborate/src/group/manager.rs | 6 +- .../appflowy-collaborate/src/group/state.rs | 45 +++----- services/appflowy-collaborate/src/metrics.rs | 108 ++---------------- .../appflowy-collaborate/src/rt_server.rs | 33 +++--- 6 files changed, 57 insertions(+), 166 deletions(-) diff --git a/services/appflowy-collaborate/src/group/broadcast.rs b/services/appflowy-collaborate/src/group/broadcast.rs index a63ac510..dbcb3af4 100644 --- a/services/appflowy-collaborate/src/group/broadcast.rs +++ b/services/appflowy-collaborate/src/group/broadcast.rs @@ -1,5 +1,4 @@ use std::borrow::BorrowMut; -use std::sync::atomic::Ordering; use std::sync::{Arc, Weak}; use anyhow::anyhow; @@ -29,7 +28,7 @@ use collab_rt_protocol::{Message, MessageReader, MSG_SYNC, MSG_SYNC_UPDATE}; use crate::error::RealtimeError; use crate::group::group_init::EditState; use crate::group::protocol::ServerSyncProtocol; -use crate::metrics::CollabMetricsCalculate; +use crate::metrics::CollabRealtimeMetrics; pub trait CollabUpdateStreaming: 'static + Send + Sync { fn send_update(&self, update: Vec) -> Result<(), RealtimeError>; @@ -186,7 +185,7 @@ impl CollabBroadcast { mut sink: Sink, mut stream: Stream, collab: Weak>, - metrics_calculate: CollabMetricsCalculate, + metrics_calculate: CollabRealtimeMetrics, ) -> Subscription where Sink: SinkExt + Clone + Send + Sync + Unpin + 'static, @@ -280,7 +279,7 @@ async fn handle_client_messages( message_map: MessageByObjectId, sink: &mut Sink, collab: Arc + Send + Sync + 'static>>, - metrics_calculate: &CollabMetricsCalculate, + metrics_calculate: &CollabRealtimeMetrics, edit_state: &Arc, ) where Sink: SinkExt + Unpin + 'static, @@ -338,7 +337,7 @@ async fn handle_one_client_message( object_id: &str, collab_msg: &ClientCollabMessage, collab: &Arc + Send + Sync + 'static>>, - metrics_calculate: &CollabMetricsCalculate, + metrics_calculate: &CollabRealtimeMetrics, edit_state: &Arc, ) -> Result { let msg_id = collab_msg.msg_id(); @@ -383,13 +382,11 @@ async fn handle_one_message_payload( msg_id: MsgId, payload: &Bytes, collab: &Arc + Send + Sync + 'static>>, - metrics_calculate: &CollabMetricsCalculate, + metrics_calculate: &CollabRealtimeMetrics, edit_state: &Arc, ) -> Result { let payload = payload.clone(); - metrics_calculate - .acquire_collab_lock_count - .fetch_add(1, Ordering::Relaxed); + metrics_calculate.acquire_collab_lock_count.inc(); // Spawn a blocking task to handle the message let result = handle_message( @@ -419,7 +416,7 @@ async fn handle_message( payload: &Bytes, message_origin: &CollabOrigin, collab: &Arc + Send + Sync + 'static>>, - metrics_calculate: &CollabMetricsCalculate, + metrics_calculate: &CollabRealtimeMetrics, object_id: &str, msg_id: MsgId, edit_state: &Arc, @@ -436,9 +433,7 @@ async fn handle_message( match handle_message_follow_protocol(message_origin, &ServerSyncProtocol, collab, msg).await { Ok(payload) => { - metrics_calculate - .apply_update_count - .fetch_add(1, Ordering::Relaxed); + metrics_calculate.apply_update_count.inc(); // One ClientCollabMessage can have multiple Yrs [Message] in it, but we only need to // send one ack back to the client. if ack_response.is_none() { @@ -454,9 +449,7 @@ async fn handle_message( } }, Err(err) => { - metrics_calculate - .apply_update_failed_count - .fetch_add(1, Ordering::Relaxed); + metrics_calculate.apply_update_failed_count.inc(); let code = ack_code_from_error(&err); let payload = match err { RTProtocolError::MissUpdates { diff --git a/services/appflowy-collaborate/src/group/group_init.rs b/services/appflowy-collaborate/src/group/group_init.rs index d5b669d5..2ce6d12f 100644 --- a/services/appflowy-collaborate/src/group/group_init.rs +++ b/services/appflowy-collaborate/src/group/group_init.rs @@ -30,7 +30,7 @@ use crate::error::RealtimeError; use crate::group::broadcast::{CollabBroadcast, CollabUpdateStreaming, Subscription}; use crate::group::persistence::GroupPersistence; use crate::indexer::Indexer; -use crate::metrics::CollabMetricsCalculate; +use crate::metrics::CollabRealtimeMetrics; /// A group used to manage a single [Collab] object pub struct CollabGroup { @@ -44,7 +44,7 @@ pub struct CollabGroup { /// A list of subscribers to this group. Each subscriber will receive updates from the /// broadcast. subscribers: DashMap, - metrics_calculate: CollabMetricsCalculate, + metrics_calculate: CollabRealtimeMetrics, destroy_group_tx: mpsc::Sender>>, } @@ -62,7 +62,7 @@ impl CollabGroup { object_id: String, collab_type: CollabType, collab: Arc>, - metrics_calculate: CollabMetricsCalculate, + metrics_calculate: CollabRealtimeMetrics, storage: Arc, is_new_collab: bool, collab_redis_stream: Arc, diff --git a/services/appflowy-collaborate/src/group/manager.rs b/services/appflowy-collaborate/src/group/manager.rs index a2a81577..4de9c9ad 100644 --- a/services/appflowy-collaborate/src/group/manager.rs +++ b/services/appflowy-collaborate/src/group/manager.rs @@ -24,13 +24,13 @@ use crate::error::{CreateGroupFailedReason, RealtimeError}; use crate::group::group_init::CollabGroup; use crate::group::state::GroupManagementState; use crate::indexer::IndexerProvider; -use crate::metrics::CollabMetricsCalculate; +use crate::metrics::CollabRealtimeMetrics; pub struct GroupManager { state: GroupManagementState, storage: Arc, access_control: Arc, - metrics_calculate: CollabMetricsCalculate, + metrics_calculate: CollabRealtimeMetrics, collab_redis_stream: Arc, control_event_stream: Arc>, persistence_interval: Duration, @@ -48,7 +48,7 @@ where pub async fn new( storage: Arc, access_control: Arc, - metrics_calculate: CollabMetricsCalculate, + metrics_calculate: CollabRealtimeMetrics, collab_stream: CollabRedisStream, persistence_interval: Duration, edit_state_max_count: u32, diff --git a/services/appflowy-collaborate/src/group/state.rs b/services/appflowy-collaborate/src/group/state.rs index 887026b3..70eae270 100644 --- a/services/appflowy-collaborate/src/group/state.rs +++ b/services/appflowy-collaborate/src/group/state.rs @@ -1,29 +1,29 @@ -use crate::error::RealtimeError; -use crate::group::group_init::CollabGroup; +use std::collections::HashSet; +use std::sync::Arc; +use std::time::Duration; -use crate::metrics::CollabMetricsCalculate; -use collab_rt_entity::user::RealtimeUser; use dashmap::mapref::one::RefMut; use dashmap::try_result::TryResult; use dashmap::DashMap; - -use std::collections::HashSet; - -use std::sync::Arc; -use std::time::Duration; use tokio::time::sleep; use tracing::{error, event, warn}; +use collab_rt_entity::user::RealtimeUser; + +use crate::error::RealtimeError; +use crate::group::group_init::CollabGroup; +use crate::metrics::CollabRealtimeMetrics; + #[derive(Default, Clone)] pub(crate) struct GroupManagementState { group_by_object_id: Arc>>, /// Keep track of all [Collab] objects that a user is subscribed to. editing_by_user: Arc>>, - metrics_calculate: CollabMetricsCalculate, + metrics_calculate: CollabRealtimeMetrics, } impl GroupManagementState { - pub(crate) fn new(metrics_calculate: CollabMetricsCalculate) -> Self { + pub(crate) fn new(metrics_calculate: CollabRealtimeMetrics) -> Self { Self { group_by_object_id: Arc::new(DashMap::new()), editing_by_user: Arc::new(DashMap::new()), @@ -102,10 +102,7 @@ impl GroupManagementState { pub(crate) async fn insert_group(&self, object_id: &str, group: Arc) { self.group_by_object_id.insert(object_id.to_string(), group); - self - .metrics_calculate - .num_of_active_collab - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + self.metrics_calculate.opening_collab_count.inc(); } pub(crate) async fn contains_group(&self, object_id: &str) -> bool { @@ -121,10 +118,10 @@ impl GroupManagementState { // Log error if the group doesn't exist error!("Group for object_id:{} not found", object_id); } - self.metrics_calculate.num_of_active_collab.store( - self.group_by_object_id.len() as i64, - std::sync::atomic::Ordering::Relaxed, - ); + self + .metrics_calculate + .opening_collab_count + .set(self.group_by_object_id.len() as i64); } pub(crate) async fn insert_user( &self, @@ -139,10 +136,7 @@ impl GroupManagementState { match entry { dashmap::mapref::entry::Entry::Occupied(_) => {}, dashmap::mapref::entry::Entry::Vacant(_) => { - self - .metrics_calculate - .num_of_editing_users - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + self.metrics_calculate.num_of_editing_users.inc(); }, } @@ -153,10 +147,7 @@ impl GroupManagementState { pub(crate) async fn remove_user(&self, user: &RealtimeUser) { let entry = self.editing_by_user.remove(user); if entry.is_some() { - self - .metrics_calculate - .num_of_editing_users - .fetch_sub(1, std::sync::atomic::Ordering::Relaxed); + self.metrics_calculate.num_of_editing_users.dec(); } if let Some(editing_objects) = entry.map(|(_, e)| e) { for editing in editing_objects { diff --git a/services/appflowy-collaborate/src/metrics.rs b/services/appflowy-collaborate/src/metrics.rs index a1b73c48..1891d2eb 100644 --- a/services/appflowy-collaborate/src/metrics.rs +++ b/services/appflowy-collaborate/src/metrics.rs @@ -1,24 +1,19 @@ -use database::collab::CollabStorage; use prometheus_client::metrics::gauge::Gauge; use prometheus_client::registry::Registry; -use std::sync::atomic::AtomicI64; -use std::sync::Arc; -use std::time::Duration; -use tokio::time::interval; -#[derive(Clone)] +#[derive(Clone, Default)] pub struct CollabRealtimeMetrics { - connected_users: Gauge, - total_success_get_encode_collab_from_redis: Gauge, - total_attempt_get_encode_collab_from_redis: Gauge, - opening_collab_count: Gauge, - num_of_editing_users: Gauge, + pub(crate) connected_users: Gauge, + pub(crate) total_success_get_encode_collab_from_redis: Gauge, + pub(crate) total_attempt_get_encode_collab_from_redis: Gauge, + pub(crate) opening_collab_count: Gauge, + pub(crate) num_of_editing_users: Gauge, /// The number of apply update - apply_update_count: Gauge, + pub(crate) apply_update_count: Gauge, /// The number of apply update failed - apply_update_failed_count: Gauge, - acquire_collab_lock_count: Gauge, - acquire_collab_lock_fail_count: Gauge, + pub(crate) apply_update_failed_count: Gauge, + pub(crate) acquire_collab_lock_count: Gauge, + pub(crate) acquire_collab_lock_fail_count: Gauge, } impl CollabRealtimeMetrics { @@ -90,89 +85,6 @@ impl CollabRealtimeMetrics { } } -#[derive(Clone, Default)] -pub(crate) struct CollabMetricsCalculate { - pub(crate) connected_users: Arc, - pub(crate) acquire_collab_lock_count: Arc, - pub(crate) acquire_collab_lock_fail_count: Arc, - pub(crate) apply_update_count: Arc, - pub(crate) apply_update_failed_count: Arc, - pub(crate) num_of_active_collab: Arc, - pub(crate) num_of_editing_users: Arc, -} - -pub(crate) fn spawn_metrics( - metrics: &Arc, - metrics_calculation: &CollabMetricsCalculate, - storage: &Arc, -) where - S: CollabStorage, -{ - let metrics = metrics.clone(); - let metrics_calculation = metrics_calculation.clone(); - let storage = storage.clone(); - tokio::task::spawn_local(async move { - let mut interval = interval(Duration::from_secs(120)); - loop { - interval.tick().await; - - // active collab - metrics.opening_collab_count.set( - metrics_calculation - .num_of_active_collab - .load(std::sync::atomic::Ordering::Relaxed), - ); - - // editing users - metrics.num_of_editing_users.set( - metrics_calculation - .num_of_editing_users - .load(std::sync::atomic::Ordering::Relaxed), - ); - - // connect user - metrics.connected_users.set( - metrics_calculation - .connected_users - .load(std::sync::atomic::Ordering::Relaxed), - ); - - // lock - metrics.acquire_collab_lock_count.set( - metrics_calculation - .acquire_collab_lock_count - .load(std::sync::atomic::Ordering::Relaxed), - ); - metrics.acquire_collab_lock_fail_count.set( - metrics_calculation - .acquire_collab_lock_fail_count - .load(std::sync::atomic::Ordering::Relaxed), - ); - - // update count - metrics.apply_update_count.set( - metrics_calculation - .apply_update_count - .load(std::sync::atomic::Ordering::Relaxed), - ); - metrics.apply_update_failed_count.set( - metrics_calculation - .apply_update_failed_count - .load(std::sync::atomic::Ordering::Relaxed), - ); - - // cache hit rate - let (total, success) = storage.encode_collab_redis_query_state(); - metrics - .total_attempt_get_encode_collab_from_redis - .set(total as i64); - metrics - .total_success_get_encode_collab_from_redis - .set(success as i64); - } - }); -} - #[derive(Clone)] pub struct CollabMetrics { success_write_snapshot_count: Gauge, diff --git a/services/appflowy-collaborate/src/rt_server.rs b/services/appflowy-collaborate/src/rt_server.rs index 3d2465b8..c7615e34 100644 --- a/services/appflowy-collaborate/src/rt_server.rs +++ b/services/appflowy-collaborate/src/rt_server.rs @@ -1,10 +1,11 @@ -use anyhow::Result; -use dashmap::mapref::entry::Entry; -use dashmap::DashMap; use std::future::Future; use std::pin::Pin; use std::sync::{Arc, Weak}; use std::time::Duration; + +use anyhow::Result; +use dashmap::mapref::entry::Entry; +use dashmap::DashMap; use tokio::sync::Notify; use tokio::time::interval; use tracing::{error, info, trace}; @@ -17,17 +18,15 @@ use database::collab::CollabStorage; use crate::client::client_msg_router::ClientMessageRouter; use crate::command::{spawn_collaboration_command, CLCommandReceiver}; +use crate::config::get_env_var; use crate::connect_state::ConnectState; use crate::error::{CreateGroupFailedReason, RealtimeError}; use crate::group::cmd::{GroupCommand, GroupCommandRunner, GroupCommandSender}; use crate::group::manager::GroupManager; use crate::indexer::IndexerProvider; -use crate::metrics::CollabMetricsCalculate; - -use crate::config::get_env_var; use crate::rt_server::collaboration_runtime::COLLAB_RUNTIME; use crate::state::RedisConnectionManager; -use crate::{spawn_metrics, CollabRealtimeMetrics, RealtimeClientWebsocketSink}; +use crate::{CollabRealtimeMetrics, RealtimeClientWebsocketSink}; #[derive(Clone)] pub struct CollaborationServer { @@ -38,7 +37,7 @@ pub struct CollaborationServer { storage: Arc, #[allow(dead_code)] metrics: Arc, - metrics_calculate: CollabMetricsCalculate, + metrics_calculate: CollabRealtimeMetrics, enable_custom_runtime: bool, } @@ -69,7 +68,7 @@ where info!("CollaborationServer with actix-web runtime"); } - let metrics_calculate = CollabMetricsCalculate::default(); + let metrics_calculate = CollabRealtimeMetrics::default(); let connect_state = ConnectState::new(); let access_control = Arc::new(access_control); let collab_stream = CollabRedisStream::new_with_connection_manager(redis_connection_manager); @@ -93,8 +92,6 @@ where spawn_collaboration_command(command_recv, &group_sender_by_object_id); - spawn_metrics(&metrics, &metrics_calculate, &storage); - spawn_handle_unindexed_collabs(indexer_provider); Ok(Self { @@ -135,10 +132,9 @@ where // Remove the old user from all collaboration groups. group_manager.remove_user(&old_user).await; } - metrics_calculate.connected_users.store( - connect_state.number_of_connected_users() as i64, - std::sync::atomic::Ordering::Relaxed, - ); + metrics_calculate + .connected_users + .set(connect_state.number_of_connected_users() as i64); Ok(()) }) } @@ -167,10 +163,9 @@ where .remove_connected_user(disconnect_user.uid, &disconnect_user.device_id) .await; - metrics_calculate.connected_users.store( - connect_state.number_of_connected_users() as i64, - std::sync::atomic::Ordering::Relaxed, - ); + metrics_calculate + .connected_users + .set(connect_state.number_of_connected_users() as i64); group_manager.remove_user(&disconnect_user).await; }