Merge pull request #773 from AppFlowy-IO/metrics-connect

chore: reconnect realtime metrics
This commit is contained in:
Khor Shu Heng 2024-08-30 16:02:37 +08:00 committed by GitHub
commit 77a4af8a58
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 40 additions and 13 deletions

View File

@ -185,7 +185,7 @@ impl CollabBroadcast {
mut sink: Sink,
mut stream: Stream,
collab: Weak<RwLock<Collab>>,
metrics_calculate: CollabRealtimeMetrics,
metrics_calculate: Arc<CollabRealtimeMetrics>,
) -> Subscription
where
Sink: SinkExt<CollabMessage> + Clone + Send + Sync + Unpin + 'static,

View File

@ -44,7 +44,7 @@ pub struct CollabGroup {
/// A list of subscribers to this group. Each subscriber will receive updates from the
/// broadcast.
subscribers: DashMap<RealtimeUser, Subscription>,
metrics_calculate: CollabRealtimeMetrics,
metrics_calculate: Arc<CollabRealtimeMetrics>,
destroy_group_tx: mpsc::Sender<Arc<RwLock<Collab>>>,
}
@ -62,7 +62,7 @@ impl CollabGroup {
object_id: String,
collab_type: CollabType,
collab: Arc<RwLock<Collab>>,
metrics_calculate: CollabRealtimeMetrics,
metrics_calculate: Arc<CollabRealtimeMetrics>,
storage: Arc<S>,
is_new_collab: bool,
collab_redis_stream: Arc<CollabRedisStream>,

View File

@ -30,7 +30,7 @@ pub struct GroupManager<S, AC> {
state: GroupManagementState,
storage: Arc<S>,
access_control: Arc<AC>,
metrics_calculate: CollabRealtimeMetrics,
metrics_calculate: Arc<CollabRealtimeMetrics>,
collab_redis_stream: Arc<CollabRedisStream>,
control_event_stream: Arc<Mutex<StreamGroup>>,
persistence_interval: Duration,
@ -48,7 +48,7 @@ where
pub async fn new(
storage: Arc<S>,
access_control: Arc<AC>,
metrics_calculate: CollabRealtimeMetrics,
metrics_calculate: Arc<CollabRealtimeMetrics>,
collab_stream: CollabRedisStream,
persistence_interval: Duration,
edit_state_max_count: u32,

View File

@ -19,11 +19,11 @@ pub(crate) struct GroupManagementState {
group_by_object_id: Arc<DashMap<String, Arc<CollabGroup>>>,
/// Keep track of all [Collab] objects that a user is subscribed to.
editing_by_user: Arc<DashMap<RealtimeUser, HashSet<Editing>>>,
metrics_calculate: CollabRealtimeMetrics,
metrics_calculate: Arc<CollabRealtimeMetrics>,
}
impl GroupManagementState {
pub(crate) fn new(metrics_calculate: CollabRealtimeMetrics) -> Self {
pub(crate) fn new(metrics_calculate: Arc<CollabRealtimeMetrics>) -> Self {
Self {
group_by_object_id: Arc::new(DashMap::new()),
editing_by_user: Arc::new(DashMap::new()),

View File

@ -1,5 +1,11 @@
use std::sync::Arc;
use std::time::Duration;
use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::registry::Registry;
use tokio::time::interval;
use database::collab::CollabStorage;
#[derive(Clone, Default)]
pub struct CollabRealtimeMetrics {
@ -85,6 +91,27 @@ impl CollabRealtimeMetrics {
}
}
pub(crate) fn spawn_metrics<S>(metrics: Arc<CollabRealtimeMetrics>, storage: Arc<S>)
where
S: CollabStorage,
{
tokio::task::spawn_local(async move {
let mut interval = interval(Duration::from_secs(120));
loop {
interval.tick().await;
// cache hit rate
let (total, success) = storage.encode_collab_redis_query_state();
metrics
.total_attempt_get_encode_collab_from_redis
.set(total as i64);
metrics
.total_success_get_encode_collab_from_redis
.set(success as i64);
}
});
}
#[derive(Clone)]
pub struct CollabMetrics {
success_write_snapshot_count: Gauge,

View File

@ -24,6 +24,7 @@ use crate::error::{CreateGroupFailedReason, RealtimeError};
use crate::group::cmd::{GroupCommand, GroupCommandRunner, GroupCommandSender};
use crate::group::manager::GroupManager;
use crate::indexer::IndexerProvider;
use crate::metrics::spawn_metrics;
use crate::rt_server::collaboration_runtime::COLLAB_RUNTIME;
use crate::state::RedisConnectionManager;
use crate::{CollabRealtimeMetrics, RealtimeClientWebsocketSink};
@ -37,7 +38,6 @@ pub struct CollaborationServer<S, AC> {
storage: Arc<S>,
#[allow(dead_code)]
metrics: Arc<CollabRealtimeMetrics>,
metrics_calculate: CollabRealtimeMetrics,
enable_custom_runtime: bool,
}
@ -68,7 +68,6 @@ where
info!("CollaborationServer with actix-web runtime");
}
let metrics_calculate = CollabRealtimeMetrics::default();
let connect_state = ConnectState::new();
let access_control = Arc::new(access_control);
let collab_stream = CollabRedisStream::new_with_connection_manager(redis_connection_manager);
@ -76,7 +75,7 @@ where
GroupManager::new(
storage.clone(),
access_control.clone(),
metrics_calculate.clone(),
metrics.clone(),
collab_stream,
group_persistence_interval,
edit_state_max_count,
@ -92,6 +91,8 @@ where
spawn_collaboration_command(command_recv, &group_sender_by_object_id);
spawn_metrics(metrics.clone(), storage.clone());
spawn_handle_unindexed_collabs(indexer_provider);
Ok(Self {
@ -100,7 +101,6 @@ where
connect_state,
group_sender_by_object_id,
metrics,
metrics_calculate,
enable_custom_runtime,
})
}
@ -120,7 +120,7 @@ where
let new_client_router = ClientMessageRouter::new(conn_sink);
let group_manager = self.group_manager.clone();
let connect_state = self.connect_state.clone();
let metrics_calculate = self.metrics_calculate.clone();
let metrics_calculate = self.metrics.clone();
let storage = self.storage.clone();
Box::pin(async move {
@ -152,7 +152,7 @@ where
) -> Pin<Box<dyn Future<Output = Result<(), RealtimeError>>>> {
let group_manager = self.group_manager.clone();
let connect_state = self.connect_state.clone();
let metrics_calculate = self.metrics_calculate.clone();
let metrics_calculate = self.metrics.clone();
let storage = self.storage.clone();
Box::pin(async move {