diff --git a/libs/collab-stream/src/client.rs b/libs/collab-stream/src/client.rs index 0ed2f1f5..73e4190f 100644 --- a/libs/collab-stream/src/client.rs +++ b/libs/collab-stream/src/client.rs @@ -8,7 +8,7 @@ use crate::model::{ use crate::stream_group::{StreamConfig, StreamGroup}; use futures::Stream; use redis::aio::ConnectionManager; -use redis::streams::{StreamRangeReply, StreamReadOptions}; +use redis::streams::StreamReadOptions; use redis::{AsyncCommands, FromRedisValue}; use std::time::Duration; use tracing::error; diff --git a/script/run_local_server.sh b/script/run_local_server.sh index a56b2f03..10a87c5e 100755 --- a/script/run_local_server.sh +++ b/script/run_local_server.sh @@ -53,7 +53,7 @@ then cargo sqlx prepare --workspace fi -cargo run --release --package xtask +cargo run --package xtask # revert to require signup email verification export GOTRUE_MAILER_AUTOCONFIRM=false diff --git a/services/appflowy-collaborate/src/group/group_init.rs b/services/appflowy-collaborate/src/group/group_init.rs index 457557c7..61ff961e 100644 --- a/services/appflowy-collaborate/src/group/group_init.rs +++ b/services/appflowy-collaborate/src/group/group_init.rs @@ -95,6 +95,7 @@ impl CollabGroup { storage, collab_redis_stream, indexer, + metrics.clone(), prune_grace_period, ); @@ -511,7 +512,6 @@ impl CollabGroup { ); let payload = collab_msg.payload(); - state.metrics.acquire_collab_lock_count.inc(); // Spawn a blocking task to handle the message let result = Self::handle_message(state, payload, &message_origin, msg_id).await; @@ -542,8 +542,6 @@ impl CollabGroup { Ok(msg) => { match Self::handle_protocol_message(state, message_origin, msg).await { Ok(payload) => { - state.metrics.apply_update_count.inc(); - // One ClientCollabMessage can have multiple Yrs [Message] in it, but we only need to // send one ack back to the client. if ack_response.is_none() { @@ -660,7 +658,7 @@ impl CollabGroup { origin: &CollabOrigin, update: Vec, ) -> Result>, RTProtocolError> { - state.metrics.apply_update_size.observe(update.len() as f64); + state.metrics.collab_size.observe(update.len() as f64); let start = tokio::time::Instant::now(); // we try to decode update to make sure it's not malformed and to extract state vector @@ -698,7 +696,7 @@ impl CollabGroup { state .metrics - .apply_update_time + .load_collab_time .observe(elapsed.as_millis() as f64); Ok(None) @@ -846,6 +844,7 @@ struct CollabPersister { storage: Arc, collab_redis_stream: Arc, indexer: Option>, + metrics: Arc, update_sink: CollabUpdateSink, awareness_sink: AwarenessUpdateSink, /// A grace period for prunning Redis collab updates. Instead of deleting all messages we @@ -863,6 +862,7 @@ impl CollabPersister { storage: Arc, collab_redis_stream: Arc, indexer: Option>, + metrics: Arc, prune_grace_period: Duration, ) -> Self { let update_sink = collab_redis_stream.collab_update_sink(&workspace_id, &object_id); @@ -875,6 +875,7 @@ impl CollabPersister { storage, collab_redis_stream, indexer, + metrics, update_sink, awareness_sink, prune_grace_period, @@ -925,10 +926,12 @@ impl CollabPersister { /// Loads collab without its history. Used for handling y-sync protocol messages. async fn load_compact(&self) -> Result { // 1. Try to load the latest snapshot from storage + let start = Instant::now(); let mut collab = match self.load_collab_full(false).await? { Some(collab) => collab, None => Collab::new_with_origin(CollabOrigin::Server, self.object_id.clone(), vec![], false), }; + self.metrics.load_collab_count.inc(); // 2. consume all Redis updates on top of it (keep redis msg id) let mut last_message_id = None; @@ -949,8 +952,10 @@ impl CollabPersister { tx.apply_update(update) .map_err(|err| RTProtocolError::YrsApplyUpdate(err.to_string()))?; last_message_id = Some(message_id); //TODO: shouldn't this happen before decoding? + self.metrics.apply_update_count.inc(); }, Err(err) => { + self.metrics.apply_update_failed_count.inc(); tracing::error!( "`{}` failed to resolve collab update: {}", self.object_id, @@ -966,6 +971,10 @@ impl CollabPersister { self.object_id, i ); + self + .metrics + .load_collab_time + .observe(start.elapsed().as_millis() as f64); // now we have the most recent version of the document let snapshot = CollabSnapshot { @@ -987,6 +996,7 @@ impl CollabPersister { ); pin_mut!(stream); + let start = Instant::now(); let mut i = 0; let mut collab = None; let mut last_message_id = None; @@ -1009,8 +1019,10 @@ impl CollabPersister { .apply_update(update) .map_err(|err| RTProtocolError::YrsApplyUpdate(err.to_string()))?; last_message_id = Some(message_id); //TODO: shouldn't this happen before decoding? + self.metrics.apply_update_count.inc(); }, Err(err) => { + self.metrics.apply_update_failed_count.inc(); tracing::error!( "`{}` failed to resolve collab update: {}", self.object_id, @@ -1024,10 +1036,17 @@ impl CollabPersister { // if there were no Redis updates, collab is still not initialized match collab { Some(collab) => { + self.metrics.load_full_collab_count.inc(); + let elapsed = start.elapsed(); + self + .metrics + .load_collab_time + .observe(elapsed.as_millis() as f64); tracing::trace!( - "loaded collab full state: {} replaying {} updates", + "loaded collab full state: {} replaying {} updates in {:?}", self.object_id, - i + i, + elapsed ); { let tx = collab.transact(); @@ -1092,6 +1111,10 @@ impl CollabPersister { let encoded_collab = EncodedCollab::new_v1(sv.clone(), doc_state_full) .encode_to_bytes() .map_err(|err| RealtimeError::Internal(err.into()))?; + self + .metrics + .full_collab_size + .observe(encoded_collab.len() as f64); let params = InsertSnapshotParams { object_id: self.object_id.clone(), encoded_collab_v1: encoded_collab, @@ -1115,6 +1138,10 @@ impl CollabPersister { let encoded_collab = EncodedCollab::new_v1(sv, doc_state_light) .encode_to_bytes() .map_err(|err| RealtimeError::Internal(err.into()))?; + self + .metrics + .collab_size + .observe(encoded_collab.len() as f64); let mut params = CollabParams::new(&self.object_id, self.collab_type.clone(), encoded_collab); match self.embeddings(collab).await { diff --git a/services/appflowy-collaborate/src/metrics.rs b/services/appflowy-collaborate/src/metrics.rs index 3a37c40c..013723a4 100644 --- a/services/appflowy-collaborate/src/metrics.rs +++ b/services/appflowy-collaborate/src/metrics.rs @@ -15,16 +15,20 @@ pub struct CollabRealtimeMetrics { pub(crate) total_attempt_get_encode_collab_from_redis: Gauge, pub(crate) opening_collab_count: Gauge, pub(crate) num_of_editing_users: Gauge, + /// Number of times a compact state collab load has been done. + pub(crate) load_collab_count: Gauge, + /// Number of times a full state collab (with history) load has been done. + pub(crate) load_full_collab_count: Gauge, /// The number of apply update pub(crate) apply_update_count: Gauge, /// The number of apply update failed pub(crate) apply_update_failed_count: Gauge, - pub(crate) acquire_collab_lock_count: Gauge, - pub(crate) acquire_collab_lock_fail_count: Gauge, - /// How long it takes to apply update in milliseconds. - pub(crate) apply_update_time: Histogram, - /// How big the update is in bytes. - pub(crate) apply_update_size: Histogram, + /// How long it takes to load a collab (from snapshot and updates combined). + pub(crate) load_collab_time: Histogram, + /// How big is the collab (no history, after applying all updates). + pub(crate) collab_size: Histogram, + /// How big is the collab (with history, after applying all updates). + pub(crate) full_collab_size: Histogram, } impl CollabRealtimeMetrics { @@ -37,23 +41,30 @@ impl CollabRealtimeMetrics { num_of_editing_users: Gauge::default(), apply_update_count: Default::default(), apply_update_failed_count: Default::default(), - acquire_collab_lock_count: Default::default(), - acquire_collab_lock_fail_count: Default::default(), // when it comes to histograms we organize them by buckets or specific sizes - since our // prometheus client doesn't support Summary type, we use Histogram type instead - // time spent on apply_update in milliseconds: 1ms, 5ms, 15ms, 30ms, 100ms, 200ms, 500ms, 1s - apply_update_time: Histogram::new( + // time spent on loading collab in milliseconds: 1ms, 5ms, 15ms, 30ms, 100ms, 200ms, 500ms, 1s + load_collab_time: Histogram::new( [1.0, 5.0, 15.0, 30.0, 100.0, 200.0, 500.0, 1000.0].into_iter(), ), - // update size in bytes: 128B, 512B, 1KB, 64KB, 512KB, 1MB, 5MB, 10MB - apply_update_size: Histogram::new( + // collab size in bytes: 128B, 512B, 1KB, 64KB, 512KB, 1MB, 5MB, 10MB + collab_size: Histogram::new( [ 128.0, 512.0, 1024.0, 65536.0, 524288.0, 1048576.0, 5242880.0, 10485760.0, ] .into_iter(), ), + // collab size in bytes: 128B, 512B, 1KB, 64KB, 512KB, 1MB, 5MB, 10MB + full_collab_size: Histogram::new( + [ + 128.0, 512.0, 1024.0, 65536.0, 524288.0, 1048576.0, 5242880.0, 10485760.0, + ] + .into_iter(), + ), + load_collab_count: Default::default(), + load_full_collab_count: Default::default(), } } @@ -95,28 +106,31 @@ impl CollabRealtimeMetrics { "number of apply update failed", metrics.apply_update_failed_count.clone(), ); - realtime_registry.register( - "acquire_collab_lock_count", - "number of acquire collab lock", - metrics.acquire_collab_lock_count.clone(), + "load_collab_time", + "time spent on loading collab in milliseconds", + metrics.load_collab_time.clone(), ); realtime_registry.register( - "acquire_collab_lock_fail_count", - "number of acquire collab lock failed", - metrics.acquire_collab_lock_fail_count.clone(), + "collab_size", + "size of compact collab in bytes", + metrics.collab_size.clone(), ); realtime_registry.register( - "apply_update_time", - "time spent on applying collab updates in milliseconds", - metrics.apply_update_time.clone(), + "full_collab_size", + "size of full collab in bytes", + metrics.full_collab_size.clone(), ); realtime_registry.register( - "apply_update_size", - "size of updates applied to collab in bytes", - metrics.apply_update_size.clone(), + "load_collab_count", + "number of collab loads (no history)", + metrics.load_collab_count.clone(), + ); + realtime_registry.register( + "load_full_collab_count", + "number of collab loads (with history)", + metrics.load_full_collab_count.clone(), ); - metrics } }