chore: added metrics for new stateless operations

This commit is contained in:
Bartosz Sypytkowski 2024-10-30 07:40:23 +01:00
parent e86c9de316
commit 25f0461389
4 changed files with 76 additions and 35 deletions

View File

@ -8,7 +8,7 @@ use crate::model::{
use crate::stream_group::{StreamConfig, StreamGroup};
use futures::Stream;
use redis::aio::ConnectionManager;
use redis::streams::{StreamRangeReply, StreamReadOptions};
use redis::streams::StreamReadOptions;
use redis::{AsyncCommands, FromRedisValue};
use std::time::Duration;
use tracing::error;

View File

@ -53,7 +53,7 @@ then
cargo sqlx prepare --workspace
fi
cargo run --release --package xtask
cargo run --package xtask
# revert to require signup email verification
export GOTRUE_MAILER_AUTOCONFIRM=false

View File

@ -95,6 +95,7 @@ impl CollabGroup {
storage,
collab_redis_stream,
indexer,
metrics.clone(),
prune_grace_period,
);
@ -511,7 +512,6 @@ impl CollabGroup {
);
let payload = collab_msg.payload();
state.metrics.acquire_collab_lock_count.inc();
// Spawn a blocking task to handle the message
let result = Self::handle_message(state, payload, &message_origin, msg_id).await;
@ -542,8 +542,6 @@ impl CollabGroup {
Ok(msg) => {
match Self::handle_protocol_message(state, message_origin, msg).await {
Ok(payload) => {
state.metrics.apply_update_count.inc();
// One ClientCollabMessage can have multiple Yrs [Message] in it, but we only need to
// send one ack back to the client.
if ack_response.is_none() {
@ -660,7 +658,7 @@ impl CollabGroup {
origin: &CollabOrigin,
update: Vec<u8>,
) -> Result<Option<Vec<u8>>, RTProtocolError> {
state.metrics.apply_update_size.observe(update.len() as f64);
state.metrics.collab_size.observe(update.len() as f64);
let start = tokio::time::Instant::now();
// we try to decode update to make sure it's not malformed and to extract state vector
@ -698,7 +696,7 @@ impl CollabGroup {
state
.metrics
.apply_update_time
.load_collab_time
.observe(elapsed.as_millis() as f64);
Ok(None)
@ -846,6 +844,7 @@ struct CollabPersister {
storage: Arc<dyn CollabStorage>,
collab_redis_stream: Arc<CollabRedisStream>,
indexer: Option<Arc<dyn Indexer>>,
metrics: Arc<CollabRealtimeMetrics>,
update_sink: CollabUpdateSink,
awareness_sink: AwarenessUpdateSink,
/// A grace period for prunning Redis collab updates. Instead of deleting all messages we
@ -863,6 +862,7 @@ impl CollabPersister {
storage: Arc<dyn CollabStorage>,
collab_redis_stream: Arc<CollabRedisStream>,
indexer: Option<Arc<dyn Indexer>>,
metrics: Arc<CollabRealtimeMetrics>,
prune_grace_period: Duration,
) -> Self {
let update_sink = collab_redis_stream.collab_update_sink(&workspace_id, &object_id);
@ -875,6 +875,7 @@ impl CollabPersister {
storage,
collab_redis_stream,
indexer,
metrics,
update_sink,
awareness_sink,
prune_grace_period,
@ -925,10 +926,12 @@ impl CollabPersister {
/// Loads collab without its history. Used for handling y-sync protocol messages.
async fn load_compact(&self) -> Result<CollabSnapshot, RealtimeError> {
// 1. Try to load the latest snapshot from storage
let start = Instant::now();
let mut collab = match self.load_collab_full(false).await? {
Some(collab) => collab,
None => Collab::new_with_origin(CollabOrigin::Server, self.object_id.clone(), vec![], false),
};
self.metrics.load_collab_count.inc();
// 2. consume all Redis updates on top of it (keep redis msg id)
let mut last_message_id = None;
@ -949,8 +952,10 @@ impl CollabPersister {
tx.apply_update(update)
.map_err(|err| RTProtocolError::YrsApplyUpdate(err.to_string()))?;
last_message_id = Some(message_id); //TODO: shouldn't this happen before decoding?
self.metrics.apply_update_count.inc();
},
Err(err) => {
self.metrics.apply_update_failed_count.inc();
tracing::error!(
"`{}` failed to resolve collab update: {}",
self.object_id,
@ -966,6 +971,10 @@ impl CollabPersister {
self.object_id,
i
);
self
.metrics
.load_collab_time
.observe(start.elapsed().as_millis() as f64);
// now we have the most recent version of the document
let snapshot = CollabSnapshot {
@ -987,6 +996,7 @@ impl CollabPersister {
);
pin_mut!(stream);
let start = Instant::now();
let mut i = 0;
let mut collab = None;
let mut last_message_id = None;
@ -1009,8 +1019,10 @@ impl CollabPersister {
.apply_update(update)
.map_err(|err| RTProtocolError::YrsApplyUpdate(err.to_string()))?;
last_message_id = Some(message_id); //TODO: shouldn't this happen before decoding?
self.metrics.apply_update_count.inc();
},
Err(err) => {
self.metrics.apply_update_failed_count.inc();
tracing::error!(
"`{}` failed to resolve collab update: {}",
self.object_id,
@ -1024,10 +1036,17 @@ impl CollabPersister {
// if there were no Redis updates, collab is still not initialized
match collab {
Some(collab) => {
self.metrics.load_full_collab_count.inc();
let elapsed = start.elapsed();
self
.metrics
.load_collab_time
.observe(elapsed.as_millis() as f64);
tracing::trace!(
"loaded collab full state: {} replaying {} updates",
"loaded collab full state: {} replaying {} updates in {:?}",
self.object_id,
i
i,
elapsed
);
{
let tx = collab.transact();
@ -1092,6 +1111,10 @@ impl CollabPersister {
let encoded_collab = EncodedCollab::new_v1(sv.clone(), doc_state_full)
.encode_to_bytes()
.map_err(|err| RealtimeError::Internal(err.into()))?;
self
.metrics
.full_collab_size
.observe(encoded_collab.len() as f64);
let params = InsertSnapshotParams {
object_id: self.object_id.clone(),
encoded_collab_v1: encoded_collab,
@ -1115,6 +1138,10 @@ impl CollabPersister {
let encoded_collab = EncodedCollab::new_v1(sv, doc_state_light)
.encode_to_bytes()
.map_err(|err| RealtimeError::Internal(err.into()))?;
self
.metrics
.collab_size
.observe(encoded_collab.len() as f64);
let mut params = CollabParams::new(&self.object_id, self.collab_type.clone(), encoded_collab);
match self.embeddings(collab).await {

View File

@ -15,16 +15,20 @@ pub struct CollabRealtimeMetrics {
pub(crate) total_attempt_get_encode_collab_from_redis: Gauge,
pub(crate) opening_collab_count: Gauge,
pub(crate) num_of_editing_users: Gauge,
/// Number of times a compact state collab load has been done.
pub(crate) load_collab_count: Gauge,
/// Number of times a full state collab (with history) load has been done.
pub(crate) load_full_collab_count: Gauge,
/// The number of apply update
pub(crate) apply_update_count: Gauge,
/// The number of apply update failed
pub(crate) apply_update_failed_count: Gauge,
pub(crate) acquire_collab_lock_count: Gauge,
pub(crate) acquire_collab_lock_fail_count: Gauge,
/// How long it takes to apply update in milliseconds.
pub(crate) apply_update_time: Histogram,
/// How big the update is in bytes.
pub(crate) apply_update_size: Histogram,
/// How long it takes to load a collab (from snapshot and updates combined).
pub(crate) load_collab_time: Histogram,
/// How big is the collab (no history, after applying all updates).
pub(crate) collab_size: Histogram,
/// How big is the collab (with history, after applying all updates).
pub(crate) full_collab_size: Histogram,
}
impl CollabRealtimeMetrics {
@ -37,23 +41,30 @@ impl CollabRealtimeMetrics {
num_of_editing_users: Gauge::default(),
apply_update_count: Default::default(),
apply_update_failed_count: Default::default(),
acquire_collab_lock_count: Default::default(),
acquire_collab_lock_fail_count: Default::default(),
// when it comes to histograms we organize them by buckets or specific sizes - since our
// prometheus client doesn't support Summary type, we use Histogram type instead
// time spent on apply_update in milliseconds: 1ms, 5ms, 15ms, 30ms, 100ms, 200ms, 500ms, 1s
apply_update_time: Histogram::new(
// time spent on loading collab in milliseconds: 1ms, 5ms, 15ms, 30ms, 100ms, 200ms, 500ms, 1s
load_collab_time: Histogram::new(
[1.0, 5.0, 15.0, 30.0, 100.0, 200.0, 500.0, 1000.0].into_iter(),
),
// update size in bytes: 128B, 512B, 1KB, 64KB, 512KB, 1MB, 5MB, 10MB
apply_update_size: Histogram::new(
// collab size in bytes: 128B, 512B, 1KB, 64KB, 512KB, 1MB, 5MB, 10MB
collab_size: Histogram::new(
[
128.0, 512.0, 1024.0, 65536.0, 524288.0, 1048576.0, 5242880.0, 10485760.0,
]
.into_iter(),
),
// collab size in bytes: 128B, 512B, 1KB, 64KB, 512KB, 1MB, 5MB, 10MB
full_collab_size: Histogram::new(
[
128.0, 512.0, 1024.0, 65536.0, 524288.0, 1048576.0, 5242880.0, 10485760.0,
]
.into_iter(),
),
load_collab_count: Default::default(),
load_full_collab_count: Default::default(),
}
}
@ -95,28 +106,31 @@ impl CollabRealtimeMetrics {
"number of apply update failed",
metrics.apply_update_failed_count.clone(),
);
realtime_registry.register(
"acquire_collab_lock_count",
"number of acquire collab lock",
metrics.acquire_collab_lock_count.clone(),
"load_collab_time",
"time spent on loading collab in milliseconds",
metrics.load_collab_time.clone(),
);
realtime_registry.register(
"acquire_collab_lock_fail_count",
"number of acquire collab lock failed",
metrics.acquire_collab_lock_fail_count.clone(),
"collab_size",
"size of compact collab in bytes",
metrics.collab_size.clone(),
);
realtime_registry.register(
"apply_update_time",
"time spent on applying collab updates in milliseconds",
metrics.apply_update_time.clone(),
"full_collab_size",
"size of full collab in bytes",
metrics.full_collab_size.clone(),
);
realtime_registry.register(
"apply_update_size",
"size of updates applied to collab in bytes",
metrics.apply_update_size.clone(),
"load_collab_count",
"number of collab loads (no history)",
metrics.load_collab_count.clone(),
);
realtime_registry.register(
"load_full_collab_count",
"number of collab loads (with history)",
metrics.load_full_collab_count.clone(),
);
metrics
}
}