chore: remove redundant metrics collector

This commit is contained in:
Bartosz Sypytkowski 2024-08-30 05:14:44 +02:00
parent 773efcd91a
commit 5b2b717fe7
6 changed files with 57 additions and 166 deletions

View File

@ -1,5 +1,4 @@
use std::borrow::BorrowMut;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Weak};
use anyhow::anyhow;
@ -29,7 +28,7 @@ use collab_rt_protocol::{Message, MessageReader, MSG_SYNC, MSG_SYNC_UPDATE};
use crate::error::RealtimeError;
use crate::group::group_init::EditState;
use crate::group::protocol::ServerSyncProtocol;
use crate::metrics::CollabMetricsCalculate;
use crate::metrics::CollabRealtimeMetrics;
pub trait CollabUpdateStreaming: 'static + Send + Sync {
fn send_update(&self, update: Vec<u8>) -> Result<(), RealtimeError>;
@ -186,7 +185,7 @@ impl CollabBroadcast {
mut sink: Sink,
mut stream: Stream,
collab: Weak<RwLock<Collab>>,
metrics_calculate: CollabMetricsCalculate,
metrics_calculate: CollabRealtimeMetrics,
) -> Subscription
where
Sink: SinkExt<CollabMessage> + Clone + Send + Sync + Unpin + 'static,
@ -280,7 +279,7 @@ async fn handle_client_messages<Sink>(
message_map: MessageByObjectId,
sink: &mut Sink,
collab: Arc<RwLock<dyn BorrowMut<Collab> + Send + Sync + 'static>>,
metrics_calculate: &CollabMetricsCalculate,
metrics_calculate: &CollabRealtimeMetrics,
edit_state: &Arc<EditState>,
) where
Sink: SinkExt<CollabMessage> + Unpin + 'static,
@ -338,7 +337,7 @@ async fn handle_one_client_message(
object_id: &str,
collab_msg: &ClientCollabMessage,
collab: &Arc<RwLock<dyn BorrowMut<Collab> + Send + Sync + 'static>>,
metrics_calculate: &CollabMetricsCalculate,
metrics_calculate: &CollabRealtimeMetrics,
edit_state: &Arc<EditState>,
) -> Result<CollabAck, RealtimeError> {
let msg_id = collab_msg.msg_id();
@ -383,13 +382,11 @@ async fn handle_one_message_payload(
msg_id: MsgId,
payload: &Bytes,
collab: &Arc<RwLock<dyn BorrowMut<Collab> + Send + Sync + 'static>>,
metrics_calculate: &CollabMetricsCalculate,
metrics_calculate: &CollabRealtimeMetrics,
edit_state: &Arc<EditState>,
) -> Result<CollabAck, RealtimeError> {
let payload = payload.clone();
metrics_calculate
.acquire_collab_lock_count
.fetch_add(1, Ordering::Relaxed);
metrics_calculate.acquire_collab_lock_count.inc();
// Spawn a blocking task to handle the message
let result = handle_message(
@ -419,7 +416,7 @@ async fn handle_message(
payload: &Bytes,
message_origin: &CollabOrigin,
collab: &Arc<RwLock<dyn BorrowMut<Collab> + Send + Sync + 'static>>,
metrics_calculate: &CollabMetricsCalculate,
metrics_calculate: &CollabRealtimeMetrics,
object_id: &str,
msg_id: MsgId,
edit_state: &Arc<EditState>,
@ -436,9 +433,7 @@ async fn handle_message(
match handle_message_follow_protocol(message_origin, &ServerSyncProtocol, collab, msg).await
{
Ok(payload) => {
metrics_calculate
.apply_update_count
.fetch_add(1, Ordering::Relaxed);
metrics_calculate.apply_update_count.inc();
// One ClientCollabMessage can have multiple Yrs [Message] in it, but we only need to
// send one ack back to the client.
if ack_response.is_none() {
@ -454,9 +449,7 @@ async fn handle_message(
}
},
Err(err) => {
metrics_calculate
.apply_update_failed_count
.fetch_add(1, Ordering::Relaxed);
metrics_calculate.apply_update_failed_count.inc();
let code = ack_code_from_error(&err);
let payload = match err {
RTProtocolError::MissUpdates {

View File

@ -30,7 +30,7 @@ use crate::error::RealtimeError;
use crate::group::broadcast::{CollabBroadcast, CollabUpdateStreaming, Subscription};
use crate::group::persistence::GroupPersistence;
use crate::indexer::Indexer;
use crate::metrics::CollabMetricsCalculate;
use crate::metrics::CollabRealtimeMetrics;
/// A group used to manage a single [Collab] object
pub struct CollabGroup {
@ -44,7 +44,7 @@ pub struct CollabGroup {
/// A list of subscribers to this group. Each subscriber will receive updates from the
/// broadcast.
subscribers: DashMap<RealtimeUser, Subscription>,
metrics_calculate: CollabMetricsCalculate,
metrics_calculate: CollabRealtimeMetrics,
destroy_group_tx: mpsc::Sender<Arc<RwLock<Collab>>>,
}
@ -62,7 +62,7 @@ impl CollabGroup {
object_id: String,
collab_type: CollabType,
collab: Arc<RwLock<Collab>>,
metrics_calculate: CollabMetricsCalculate,
metrics_calculate: CollabRealtimeMetrics,
storage: Arc<S>,
is_new_collab: bool,
collab_redis_stream: Arc<CollabRedisStream>,

View File

@ -24,13 +24,13 @@ use crate::error::{CreateGroupFailedReason, RealtimeError};
use crate::group::group_init::CollabGroup;
use crate::group::state::GroupManagementState;
use crate::indexer::IndexerProvider;
use crate::metrics::CollabMetricsCalculate;
use crate::metrics::CollabRealtimeMetrics;
pub struct GroupManager<S, AC> {
state: GroupManagementState,
storage: Arc<S>,
access_control: Arc<AC>,
metrics_calculate: CollabMetricsCalculate,
metrics_calculate: CollabRealtimeMetrics,
collab_redis_stream: Arc<CollabRedisStream>,
control_event_stream: Arc<Mutex<StreamGroup>>,
persistence_interval: Duration,
@ -48,7 +48,7 @@ where
pub async fn new(
storage: Arc<S>,
access_control: Arc<AC>,
metrics_calculate: CollabMetricsCalculate,
metrics_calculate: CollabRealtimeMetrics,
collab_stream: CollabRedisStream,
persistence_interval: Duration,
edit_state_max_count: u32,

View File

@ -1,29 +1,29 @@
use crate::error::RealtimeError;
use crate::group::group_init::CollabGroup;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use crate::metrics::CollabMetricsCalculate;
use collab_rt_entity::user::RealtimeUser;
use dashmap::mapref::one::RefMut;
use dashmap::try_result::TryResult;
use dashmap::DashMap;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
use tracing::{error, event, warn};
use collab_rt_entity::user::RealtimeUser;
use crate::error::RealtimeError;
use crate::group::group_init::CollabGroup;
use crate::metrics::CollabRealtimeMetrics;
#[derive(Default, Clone)]
pub(crate) struct GroupManagementState {
group_by_object_id: Arc<DashMap<String, Arc<CollabGroup>>>,
/// Keep track of all [Collab] objects that a user is subscribed to.
editing_by_user: Arc<DashMap<RealtimeUser, HashSet<Editing>>>,
metrics_calculate: CollabMetricsCalculate,
metrics_calculate: CollabRealtimeMetrics,
}
impl GroupManagementState {
pub(crate) fn new(metrics_calculate: CollabMetricsCalculate) -> Self {
pub(crate) fn new(metrics_calculate: CollabRealtimeMetrics) -> Self {
Self {
group_by_object_id: Arc::new(DashMap::new()),
editing_by_user: Arc::new(DashMap::new()),
@ -102,10 +102,7 @@ impl GroupManagementState {
pub(crate) async fn insert_group(&self, object_id: &str, group: Arc<CollabGroup>) {
self.group_by_object_id.insert(object_id.to_string(), group);
self
.metrics_calculate
.num_of_active_collab
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
self.metrics_calculate.opening_collab_count.inc();
}
pub(crate) async fn contains_group(&self, object_id: &str) -> bool {
@ -121,10 +118,10 @@ impl GroupManagementState {
// Log error if the group doesn't exist
error!("Group for object_id:{} not found", object_id);
}
self.metrics_calculate.num_of_active_collab.store(
self.group_by_object_id.len() as i64,
std::sync::atomic::Ordering::Relaxed,
);
self
.metrics_calculate
.opening_collab_count
.set(self.group_by_object_id.len() as i64);
}
pub(crate) async fn insert_user(
&self,
@ -139,10 +136,7 @@ impl GroupManagementState {
match entry {
dashmap::mapref::entry::Entry::Occupied(_) => {},
dashmap::mapref::entry::Entry::Vacant(_) => {
self
.metrics_calculate
.num_of_editing_users
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
self.metrics_calculate.num_of_editing_users.inc();
},
}
@ -153,10 +147,7 @@ impl GroupManagementState {
pub(crate) async fn remove_user(&self, user: &RealtimeUser) {
let entry = self.editing_by_user.remove(user);
if entry.is_some() {
self
.metrics_calculate
.num_of_editing_users
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
self.metrics_calculate.num_of_editing_users.dec();
}
if let Some(editing_objects) = entry.map(|(_, e)| e) {
for editing in editing_objects {

View File

@ -1,24 +1,19 @@
use database::collab::CollabStorage;
use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::registry::Registry;
use std::sync::atomic::AtomicI64;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::interval;
#[derive(Clone)]
#[derive(Clone, Default)]
pub struct CollabRealtimeMetrics {
connected_users: Gauge,
total_success_get_encode_collab_from_redis: Gauge,
total_attempt_get_encode_collab_from_redis: Gauge,
opening_collab_count: Gauge,
num_of_editing_users: Gauge,
pub(crate) connected_users: Gauge,
pub(crate) total_success_get_encode_collab_from_redis: Gauge,
pub(crate) total_attempt_get_encode_collab_from_redis: Gauge,
pub(crate) opening_collab_count: Gauge,
pub(crate) num_of_editing_users: Gauge,
/// The number of apply update
apply_update_count: Gauge,
pub(crate) apply_update_count: Gauge,
/// The number of apply update failed
apply_update_failed_count: Gauge,
acquire_collab_lock_count: Gauge,
acquire_collab_lock_fail_count: Gauge,
pub(crate) apply_update_failed_count: Gauge,
pub(crate) acquire_collab_lock_count: Gauge,
pub(crate) acquire_collab_lock_fail_count: Gauge,
}
impl CollabRealtimeMetrics {
@ -90,89 +85,6 @@ impl CollabRealtimeMetrics {
}
}
#[derive(Clone, Default)]
pub(crate) struct CollabMetricsCalculate {
pub(crate) connected_users: Arc<AtomicI64>,
pub(crate) acquire_collab_lock_count: Arc<AtomicI64>,
pub(crate) acquire_collab_lock_fail_count: Arc<AtomicI64>,
pub(crate) apply_update_count: Arc<AtomicI64>,
pub(crate) apply_update_failed_count: Arc<AtomicI64>,
pub(crate) num_of_active_collab: Arc<AtomicI64>,
pub(crate) num_of_editing_users: Arc<AtomicI64>,
}
pub(crate) fn spawn_metrics<S>(
metrics: &Arc<CollabRealtimeMetrics>,
metrics_calculation: &CollabMetricsCalculate,
storage: &Arc<S>,
) where
S: CollabStorage,
{
let metrics = metrics.clone();
let metrics_calculation = metrics_calculation.clone();
let storage = storage.clone();
tokio::task::spawn_local(async move {
let mut interval = interval(Duration::from_secs(120));
loop {
interval.tick().await;
// active collab
metrics.opening_collab_count.set(
metrics_calculation
.num_of_active_collab
.load(std::sync::atomic::Ordering::Relaxed),
);
// editing users
metrics.num_of_editing_users.set(
metrics_calculation
.num_of_editing_users
.load(std::sync::atomic::Ordering::Relaxed),
);
// connect user
metrics.connected_users.set(
metrics_calculation
.connected_users
.load(std::sync::atomic::Ordering::Relaxed),
);
// lock
metrics.acquire_collab_lock_count.set(
metrics_calculation
.acquire_collab_lock_count
.load(std::sync::atomic::Ordering::Relaxed),
);
metrics.acquire_collab_lock_fail_count.set(
metrics_calculation
.acquire_collab_lock_fail_count
.load(std::sync::atomic::Ordering::Relaxed),
);
// update count
metrics.apply_update_count.set(
metrics_calculation
.apply_update_count
.load(std::sync::atomic::Ordering::Relaxed),
);
metrics.apply_update_failed_count.set(
metrics_calculation
.apply_update_failed_count
.load(std::sync::atomic::Ordering::Relaxed),
);
// cache hit rate
let (total, success) = storage.encode_collab_redis_query_state();
metrics
.total_attempt_get_encode_collab_from_redis
.set(total as i64);
metrics
.total_success_get_encode_collab_from_redis
.set(success as i64);
}
});
}
#[derive(Clone)]
pub struct CollabMetrics {
success_write_snapshot_count: Gauge,

View File

@ -1,10 +1,11 @@
use anyhow::Result;
use dashmap::mapref::entry::Entry;
use dashmap::DashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Weak};
use std::time::Duration;
use anyhow::Result;
use dashmap::mapref::entry::Entry;
use dashmap::DashMap;
use tokio::sync::Notify;
use tokio::time::interval;
use tracing::{error, info, trace};
@ -17,17 +18,15 @@ use database::collab::CollabStorage;
use crate::client::client_msg_router::ClientMessageRouter;
use crate::command::{spawn_collaboration_command, CLCommandReceiver};
use crate::config::get_env_var;
use crate::connect_state::ConnectState;
use crate::error::{CreateGroupFailedReason, RealtimeError};
use crate::group::cmd::{GroupCommand, GroupCommandRunner, GroupCommandSender};
use crate::group::manager::GroupManager;
use crate::indexer::IndexerProvider;
use crate::metrics::CollabMetricsCalculate;
use crate::config::get_env_var;
use crate::rt_server::collaboration_runtime::COLLAB_RUNTIME;
use crate::state::RedisConnectionManager;
use crate::{spawn_metrics, CollabRealtimeMetrics, RealtimeClientWebsocketSink};
use crate::{CollabRealtimeMetrics, RealtimeClientWebsocketSink};
#[derive(Clone)]
pub struct CollaborationServer<S, AC> {
@ -38,7 +37,7 @@ pub struct CollaborationServer<S, AC> {
storage: Arc<S>,
#[allow(dead_code)]
metrics: Arc<CollabRealtimeMetrics>,
metrics_calculate: CollabMetricsCalculate,
metrics_calculate: CollabRealtimeMetrics,
enable_custom_runtime: bool,
}
@ -69,7 +68,7 @@ where
info!("CollaborationServer with actix-web runtime");
}
let metrics_calculate = CollabMetricsCalculate::default();
let metrics_calculate = CollabRealtimeMetrics::default();
let connect_state = ConnectState::new();
let access_control = Arc::new(access_control);
let collab_stream = CollabRedisStream::new_with_connection_manager(redis_connection_manager);
@ -93,8 +92,6 @@ where
spawn_collaboration_command(command_recv, &group_sender_by_object_id);
spawn_metrics(&metrics, &metrics_calculate, &storage);
spawn_handle_unindexed_collabs(indexer_provider);
Ok(Self {
@ -135,10 +132,9 @@ where
// Remove the old user from all collaboration groups.
group_manager.remove_user(&old_user).await;
}
metrics_calculate.connected_users.store(
connect_state.number_of_connected_users() as i64,
std::sync::atomic::Ordering::Relaxed,
);
metrics_calculate
.connected_users
.set(connect_state.number_of_connected_users() as i64);
Ok(())
})
}
@ -167,10 +163,9 @@ where
.remove_connected_user(disconnect_user.uid, &disconnect_user.device_id)
.await;
metrics_calculate.connected_users.store(
connect_state.number_of_connected_users() as i64,
std::sync::atomic::Ordering::Relaxed,
);
metrics_calculate
.connected_users
.set(connect_state.number_of_connected_users() as i64);
group_manager.remove_user(&disconnect_user).await;
}