feat: Measure sync (#772)
* chore: redesign collab sync protocol to enable injection of metric tracking * chore: track metrics for apply update on the server side * chore: close locks * chore: post rebase fixes
This commit is contained in:
parent
405d9703cc
commit
6972f9c4ab
|
|
@ -20,7 +20,7 @@ use yrs::ReadTxn;
|
|||
use client_api_entity::{validate_data_for_folder, CollabType};
|
||||
use collab_rt_entity::{AckCode, ClientCollabMessage, ServerCollabMessage, ServerInit, UpdateSync};
|
||||
use collab_rt_protocol::{
|
||||
handle_message_follow_protocol, ClientSyncProtocol, Message, MessageReader, SyncMessage,
|
||||
ClientSyncProtocol, CollabSyncProtocol, Message, MessageReader, SyncMessage,
|
||||
};
|
||||
|
||||
use crate::af_spawn;
|
||||
|
|
@ -344,8 +344,9 @@ where
|
|||
.map_err(|err| SyncError::OverrideWithIncorrectData(err.to_string()))?;
|
||||
}
|
||||
|
||||
if let Some(return_payload) =
|
||||
handle_message_follow_protocol(&message_origin, &ClientSyncProtocol, &collab, msg).await?
|
||||
if let Some(return_payload) = ClientSyncProtocol
|
||||
.handle_message(&message_origin, &collab, msg)
|
||||
.await?
|
||||
{
|
||||
let object_id = sync_object.object_id.clone();
|
||||
sink.queue_msg(|msg_id| {
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
use std::borrow::BorrowMut;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use collab::core::awareness::{Awareness, AwarenessUpdate};
|
||||
use collab::core::collab::{TransactionExt, TransactionMutExt};
|
||||
use collab::core::origin::CollabOrigin;
|
||||
|
|
@ -34,6 +35,8 @@ use crate::message::{CustomMessage, Message, RTProtocolError, SyncMessage, SyncM
|
|||
/// A implementation of [CollabSyncProtocol].
|
||||
#[derive(Clone)]
|
||||
pub struct ClientSyncProtocol;
|
||||
|
||||
#[async_trait]
|
||||
impl CollabSyncProtocol for ClientSyncProtocol {
|
||||
fn check<E: Encoder>(&self, encoder: &mut E, last_sync_at: i64) -> Result<(), RTProtocolError> {
|
||||
let meta = SyncMeta { last_sync_at };
|
||||
|
|
@ -43,13 +46,17 @@ impl CollabSyncProtocol for ClientSyncProtocol {
|
|||
|
||||
/// Handle reply for a sync-step-1 send from this replica previously. By default just apply
|
||||
/// an update to current `awareness` document instance.
|
||||
fn handle_sync_step2(
|
||||
async fn handle_sync_step2(
|
||||
&self,
|
||||
origin: &CollabOrigin,
|
||||
awareness: &mut Awareness,
|
||||
update: Update,
|
||||
) -> Result<(), RTProtocolError> {
|
||||
let mut txn = awareness
|
||||
collab: &CollabRef,
|
||||
update: Vec<u8>,
|
||||
) -> Result<Option<Vec<u8>>, RTProtocolError> {
|
||||
let update = decode_update(update).await?;
|
||||
let mut lock = collab.write().await;
|
||||
let collab = (*lock).borrow_mut();
|
||||
let mut txn = collab
|
||||
.get_awareness()
|
||||
.doc()
|
||||
.try_transact_mut_with(origin.clone())
|
||||
.map_err(|err| {
|
||||
|
|
@ -77,12 +84,41 @@ impl CollabSyncProtocol for ClientSyncProtocol {
|
|||
reason: "client miss updates".to_string(),
|
||||
})
|
||||
},
|
||||
None => Ok(()),
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub type CollabRef = Arc<RwLock<dyn BorrowMut<Collab> + Send + Sync + 'static>>;
|
||||
|
||||
#[async_trait]
|
||||
pub trait CollabSyncProtocol {
|
||||
/// Handles incoming messages from the client/server
|
||||
async fn handle_message(
|
||||
&self,
|
||||
message_origin: &CollabOrigin,
|
||||
collab: &CollabRef,
|
||||
msg: Message,
|
||||
) -> Result<Option<Vec<u8>>, RTProtocolError> {
|
||||
match msg {
|
||||
Message::Sync(msg) => match msg {
|
||||
SyncMessage::SyncStep1(sv) => self.handle_sync_step1(collab, sv).await,
|
||||
SyncMessage::SyncStep2(update) => {
|
||||
self.handle_sync_step2(message_origin, collab, update).await
|
||||
},
|
||||
SyncMessage::Update(update) => self.handle_update(message_origin, collab, update).await,
|
||||
},
|
||||
Message::Auth(reason) => self.handle_auth(collab, reason).await,
|
||||
//FIXME: where is the QueryAwareness protocol?
|
||||
Message::Awareness(update) => {
|
||||
self
|
||||
.handle_awareness_update(message_origin, collab, update)
|
||||
.await
|
||||
},
|
||||
Message::Custom(msg) => self.handle_custom_message(collab, msg).await,
|
||||
}
|
||||
}
|
||||
|
||||
fn check<E: Encoder>(&self, _encoder: &mut E, _last_sync_at: i64) -> Result<(), RTProtocolError> {
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -121,22 +157,27 @@ pub trait CollabSyncProtocol {
|
|||
|
||||
/// Given a [StateVector] of a remote side, calculate missing
|
||||
/// updates. Returns a sync-step-2 message containing a calculated update.
|
||||
fn handle_sync_step1(
|
||||
async fn handle_sync_step1(
|
||||
&self,
|
||||
awareness: &Awareness,
|
||||
collab: &CollabRef,
|
||||
sv: StateVector,
|
||||
) -> Result<Option<Vec<u8>>, RTProtocolError> {
|
||||
let txn = awareness.doc().try_transact().map_err(|err| {
|
||||
RTProtocolError::YrsTransaction(format!("fail to handle sync step1. error: {}", err))
|
||||
})?;
|
||||
let update = txn.try_encode_state_as_update_v1(&sv).map_err(|err| {
|
||||
RTProtocolError::YrsEncodeState(format!(
|
||||
"fail to encode state as update. error: {}\ninit state vector: {:?}\ndocument state: {:#?}",
|
||||
err,
|
||||
sv,
|
||||
txn.store()
|
||||
))
|
||||
})?;
|
||||
// calculate missing updates base on the input state vector
|
||||
let update = {
|
||||
let lock = collab.read().await;
|
||||
let collab = lock.borrow();
|
||||
let txn = collab.get_awareness().doc().try_transact().map_err(|err| {
|
||||
RTProtocolError::YrsTransaction(format!("fail to handle sync step1. error: {}", err))
|
||||
})?;
|
||||
txn.try_encode_state_as_update_v1(&sv).map_err(|err| {
|
||||
RTProtocolError::YrsEncodeState(format!(
|
||||
"fail to encode state as update. error: {}\ninit state vector: {:?}\ndocument state: {:#?}",
|
||||
err,
|
||||
sv,
|
||||
txn.store()
|
||||
))
|
||||
})?
|
||||
};
|
||||
Ok(Some(
|
||||
Message::Sync(SyncMessage::SyncStep2(update)).encode_v1(),
|
||||
))
|
||||
|
|
@ -144,27 +185,27 @@ pub trait CollabSyncProtocol {
|
|||
|
||||
/// Handle reply for a sync-step-1 send from this replica previously. By default just apply
|
||||
/// an update to current `awareness` document instance.
|
||||
fn handle_sync_step2(
|
||||
async fn handle_sync_step2(
|
||||
&self,
|
||||
origin: &CollabOrigin,
|
||||
awareness: &mut Awareness,
|
||||
update: Update,
|
||||
) -> Result<(), RTProtocolError>;
|
||||
collab: &CollabRef,
|
||||
update: Vec<u8>,
|
||||
) -> Result<Option<Vec<u8>>, RTProtocolError>;
|
||||
|
||||
/// Handle continuous update send from the client. By default just apply an update to a current
|
||||
/// `awareness` document instance.
|
||||
fn handle_update(
|
||||
async fn handle_update(
|
||||
&self,
|
||||
origin: &CollabOrigin,
|
||||
awareness: &mut Awareness,
|
||||
update: Update,
|
||||
) -> Result<(), RTProtocolError> {
|
||||
self.handle_sync_step2(origin, awareness, update)
|
||||
collab: &CollabRef,
|
||||
update: Vec<u8>,
|
||||
) -> Result<Option<Vec<u8>>, RTProtocolError> {
|
||||
self.handle_sync_step2(origin, collab, update).await
|
||||
}
|
||||
|
||||
fn handle_auth(
|
||||
async fn handle_auth(
|
||||
&self,
|
||||
_awareness: &Awareness,
|
||||
_collab: &CollabRef,
|
||||
deny_reason: Option<String>,
|
||||
) -> Result<Option<Vec<u8>>, RTProtocolError> {
|
||||
if let Some(reason) = deny_reason {
|
||||
|
|
@ -176,19 +217,21 @@ pub trait CollabSyncProtocol {
|
|||
|
||||
/// Reply to awareness query or just incoming [AwarenessUpdate], where current `awareness`
|
||||
/// instance is being updated with incoming data.
|
||||
fn handle_awareness_update(
|
||||
async fn handle_awareness_update(
|
||||
&self,
|
||||
_message_origin: &CollabOrigin,
|
||||
awareness: &mut Awareness,
|
||||
collab: &CollabRef,
|
||||
update: AwarenessUpdate,
|
||||
) -> Result<Option<Vec<u8>>, RTProtocolError> {
|
||||
awareness.apply_update(update)?;
|
||||
let mut lock = collab.write().await;
|
||||
let collab = (*lock).borrow_mut();
|
||||
collab.get_awareness().apply_update(update)?;
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn handle_custom_message(
|
||||
async fn handle_custom_message(
|
||||
&self,
|
||||
_awareness: &mut Awareness,
|
||||
_collab: &CollabRef,
|
||||
_msg: CustomMessage,
|
||||
) -> Result<Option<Vec<u8>>, RTProtocolError> {
|
||||
Ok(None)
|
||||
|
|
@ -198,7 +241,7 @@ pub trait CollabSyncProtocol {
|
|||
const LARGE_UPDATE_THRESHOLD: usize = 1024 * 1024; // 1MB
|
||||
|
||||
#[inline]
|
||||
async fn decode_update(update: Vec<u8>) -> Result<Update, RTProtocolError> {
|
||||
pub async fn decode_update(update: Vec<u8>) -> Result<Update, RTProtocolError> {
|
||||
let update = if update.len() > LARGE_UPDATE_THRESHOLD {
|
||||
spawn_blocking(move || Update::decode_v1(&update))
|
||||
.await
|
||||
|
|
@ -208,56 +251,3 @@ async fn decode_update(update: Vec<u8>) -> Result<Update, RTProtocolError> {
|
|||
}?;
|
||||
Ok(update)
|
||||
}
|
||||
|
||||
/// Handles incoming messages from the client/server
|
||||
pub async fn handle_message_follow_protocol<P>(
|
||||
message_origin: &CollabOrigin,
|
||||
protocol: &P,
|
||||
collab: &Arc<RwLock<dyn BorrowMut<Collab> + Send + Sync + 'static>>,
|
||||
msg: Message,
|
||||
) -> Result<Option<Vec<u8>>, RTProtocolError>
|
||||
where
|
||||
P: CollabSyncProtocol,
|
||||
{
|
||||
match msg {
|
||||
Message::Sync(msg) => match msg {
|
||||
SyncMessage::SyncStep1(sv) => {
|
||||
// calculate missing updates base on the input state vector
|
||||
let lock = collab.read().await;
|
||||
let collab = lock.borrow();
|
||||
let update = protocol.handle_sync_step1(collab.get_awareness(), sv)?;
|
||||
Ok(update)
|
||||
},
|
||||
SyncMessage::SyncStep2(update) => {
|
||||
let update = decode_update(update).await?;
|
||||
let mut lock = collab.write().await;
|
||||
let collab = (*lock).borrow_mut();
|
||||
protocol.handle_sync_step2(message_origin, collab.get_mut_awareness(), update)?;
|
||||
Ok(None)
|
||||
},
|
||||
SyncMessage::Update(update) => {
|
||||
let update = decode_update(update).await?;
|
||||
let mut lock = collab.write().await;
|
||||
let collab = (*lock).borrow_mut();
|
||||
protocol.handle_update(message_origin, collab.get_mut_awareness(), update)?;
|
||||
Ok(None)
|
||||
},
|
||||
},
|
||||
Message::Auth(reason) => {
|
||||
let lock = collab.read().await;
|
||||
let collab = lock.borrow();
|
||||
protocol.handle_auth(collab.get_awareness(), reason)
|
||||
},
|
||||
//FIXME: where is the QueryAwareness protocol?
|
||||
Message::Awareness(update) => {
|
||||
let mut lock = collab.write().await;
|
||||
let collab = (*lock).borrow_mut();
|
||||
protocol.handle_awareness_update(message_origin, collab.get_mut_awareness(), update)
|
||||
},
|
||||
Message::Custom(msg) => {
|
||||
let mut lock = collab.write().await;
|
||||
let collab = (*lock).borrow_mut();
|
||||
protocol.handle_custom_message(collab.get_mut_awareness(), msg)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,8 +22,8 @@ use collab_rt_entity::{AckCode, MsgId};
|
|||
use collab_rt_entity::{
|
||||
AwarenessSync, BroadcastSync, ClientCollabMessage, CollabAck, CollabMessage,
|
||||
};
|
||||
use collab_rt_protocol::{handle_message_follow_protocol, RTProtocolError, SyncMessage};
|
||||
use collab_rt_protocol::{Message, MessageReader, MSG_SYNC, MSG_SYNC_UPDATE};
|
||||
use collab_rt_protocol::{CollabSyncProtocol, Message, MessageReader, MSG_SYNC, MSG_SYNC_UPDATE};
|
||||
use collab_rt_protocol::{RTProtocolError, SyncMessage};
|
||||
|
||||
use crate::error::RealtimeError;
|
||||
use crate::group::group_init::EditState;
|
||||
|
|
@ -279,7 +279,7 @@ async fn handle_client_messages<Sink>(
|
|||
message_map: MessageByObjectId,
|
||||
sink: &mut Sink,
|
||||
collab: Arc<RwLock<dyn BorrowMut<Collab> + Send + Sync + 'static>>,
|
||||
metrics_calculate: &CollabRealtimeMetrics,
|
||||
metrics_calculate: &Arc<CollabRealtimeMetrics>,
|
||||
edit_state: &Arc<EditState>,
|
||||
) where
|
||||
Sink: SinkExt<CollabMessage> + Unpin + 'static,
|
||||
|
|
@ -337,7 +337,7 @@ async fn handle_one_client_message(
|
|||
object_id: &str,
|
||||
collab_msg: &ClientCollabMessage,
|
||||
collab: &Arc<RwLock<dyn BorrowMut<Collab> + Send + Sync + 'static>>,
|
||||
metrics_calculate: &CollabRealtimeMetrics,
|
||||
metrics_calculate: &Arc<CollabRealtimeMetrics>,
|
||||
edit_state: &Arc<EditState>,
|
||||
) -> Result<CollabAck, RealtimeError> {
|
||||
let msg_id = collab_msg.msg_id();
|
||||
|
|
@ -382,7 +382,7 @@ async fn handle_one_message_payload(
|
|||
msg_id: MsgId,
|
||||
payload: &Bytes,
|
||||
collab: &Arc<RwLock<dyn BorrowMut<Collab> + Send + Sync + 'static>>,
|
||||
metrics_calculate: &CollabRealtimeMetrics,
|
||||
metrics_calculate: &Arc<CollabRealtimeMetrics>,
|
||||
edit_state: &Arc<EditState>,
|
||||
) -> Result<CollabAck, RealtimeError> {
|
||||
let payload = payload.clone();
|
||||
|
|
@ -416,7 +416,7 @@ async fn handle_message(
|
|||
payload: &Bytes,
|
||||
message_origin: &CollabOrigin,
|
||||
collab: &Arc<RwLock<dyn BorrowMut<Collab> + Send + Sync + 'static>>,
|
||||
metrics_calculate: &CollabRealtimeMetrics,
|
||||
metrics_calculate: &Arc<CollabRealtimeMetrics>,
|
||||
object_id: &str,
|
||||
msg_id: MsgId,
|
||||
edit_state: &Arc<EditState>,
|
||||
|
|
@ -430,7 +430,9 @@ async fn handle_message(
|
|||
match msg {
|
||||
Ok(msg) => {
|
||||
is_sync_step2 = matches!(msg, Message::Sync(SyncMessage::SyncStep2(_)));
|
||||
match handle_message_follow_protocol(message_origin, &ServerSyncProtocol, collab, msg).await
|
||||
match ServerSyncProtocol::new(metrics_calculate.clone())
|
||||
.handle_message(message_origin, collab, msg)
|
||||
.await
|
||||
{
|
||||
Ok(payload) => {
|
||||
metrics_calculate.apply_update_count.inc();
|
||||
|
|
|
|||
|
|
@ -1,85 +1,125 @@
|
|||
use collab::core::awareness::Awareness;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use collab::core::collab::{TransactionExt, TransactionMutExt};
|
||||
use collab::core::origin::CollabOrigin;
|
||||
use tokio::time::Instant;
|
||||
use yrs::updates::encoder::{Encode, Encoder, EncoderV1};
|
||||
use yrs::{ReadTxn, StateVector, Transact, Update};
|
||||
use yrs::{ReadTxn, StateVector, Transact};
|
||||
|
||||
use collab_rt_protocol::CollabSyncProtocol;
|
||||
use collab_rt_protocol::{CustomMessage, Message, RTProtocolError, SyncMessage};
|
||||
use collab_rt_protocol::{
|
||||
decode_update, CollabRef, CustomMessage, Message, RTProtocolError, SyncMessage,
|
||||
};
|
||||
|
||||
use crate::CollabRealtimeMetrics;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ServerSyncProtocol;
|
||||
pub struct ServerSyncProtocol {
|
||||
metrics: Arc<CollabRealtimeMetrics>,
|
||||
}
|
||||
|
||||
impl ServerSyncProtocol {
|
||||
pub fn new(metrics: Arc<CollabRealtimeMetrics>) -> Self {
|
||||
Self { metrics }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl CollabSyncProtocol for ServerSyncProtocol {
|
||||
fn handle_sync_step1(
|
||||
async fn handle_sync_step1(
|
||||
&self,
|
||||
awareness: &Awareness,
|
||||
collab: &CollabRef,
|
||||
sv: StateVector,
|
||||
) -> Result<Option<Vec<u8>>, RTProtocolError> {
|
||||
let txn = awareness.doc().try_transact().map_err(|err| {
|
||||
RTProtocolError::YrsTransaction(format!("fail to handle sync step1. error: {}", err))
|
||||
})?;
|
||||
let (doc_state, state_vector) = {
|
||||
let lock = collab.read().await;
|
||||
let collab = (*lock).borrow();
|
||||
|
||||
let client_step2_update = txn.try_encode_state_as_update_v1(&sv).map_err(|err| {
|
||||
RTProtocolError::YrsEncodeState(format!(
|
||||
let txn = collab.get_awareness().doc().try_transact().map_err(|err| {
|
||||
RTProtocolError::YrsTransaction(format!("fail to handle sync step1. error: {}", err))
|
||||
})?;
|
||||
|
||||
let doc_state = txn.try_encode_state_as_update_v1(&sv).map_err(|err| {
|
||||
RTProtocolError::YrsEncodeState(format!(
|
||||
"fail to encode state as update. error: {}\ninit state vector: {:?}\ndocument state: {:#?}",
|
||||
err,
|
||||
sv,
|
||||
txn.store()
|
||||
))
|
||||
})?;
|
||||
})?;
|
||||
(doc_state, txn.state_vector())
|
||||
};
|
||||
|
||||
// Retrieve the latest document state from the client after they return online from offline editing.
|
||||
let mut encoder = EncoderV1::new();
|
||||
Message::Sync(SyncMessage::SyncStep2(client_step2_update)).encode(&mut encoder);
|
||||
Message::Sync(SyncMessage::SyncStep2(doc_state)).encode(&mut encoder);
|
||||
|
||||
//FIXME: this should never happen as response to sync step 1 from the client, but rather be
|
||||
// send when a connection is established
|
||||
Message::Sync(SyncMessage::SyncStep1(txn.state_vector())).encode(&mut encoder);
|
||||
Message::Sync(SyncMessage::SyncStep1(state_vector)).encode(&mut encoder);
|
||||
Ok(Some(encoder.to_vec()))
|
||||
}
|
||||
|
||||
fn handle_sync_step2(
|
||||
async fn handle_sync_step2(
|
||||
&self,
|
||||
origin: &CollabOrigin,
|
||||
awareness: &mut Awareness,
|
||||
update: Update,
|
||||
) -> Result<(), RTProtocolError> {
|
||||
let mut txn = awareness
|
||||
.doc()
|
||||
.try_transact_mut_with(origin.clone())
|
||||
.map_err(|err| {
|
||||
RTProtocolError::YrsTransaction(format!("sync step2 transaction acquire: {}", err))
|
||||
})?;
|
||||
txn.try_apply_update(update).map_err(|err| {
|
||||
RTProtocolError::YrsApplyUpdate(format!(
|
||||
"sync step2 apply update: {}\ndocument state: {:#?}",
|
||||
err,
|
||||
txn.store()
|
||||
))
|
||||
})?;
|
||||
collab: &CollabRef,
|
||||
update: Vec<u8>,
|
||||
) -> Result<Option<Vec<u8>>, RTProtocolError> {
|
||||
self.metrics.apply_update_size.observe(update.len() as f64);
|
||||
let start = Instant::now();
|
||||
let result = {
|
||||
let update = decode_update(update).await?;
|
||||
let mut lock = collab.write().await;
|
||||
let collab = (*lock).borrow_mut();
|
||||
|
||||
// If server can't apply updates sent by client, which means the server is missing some updates
|
||||
// from the client or the client is missing some updates from the server.
|
||||
// If the client can't apply broadcast from server, which means the client is missing some
|
||||
// updates.
|
||||
match txn.store().pending_update() {
|
||||
Some(_update) => {
|
||||
// let state_vector_v1 = update.missing.encode_v1();
|
||||
// for the moment, we don't need to send missing updates to the client. passing None
|
||||
// instead, which will trigger a sync step 0 on client
|
||||
let state_vector_v1 = txn.state_vector().encode_v1();
|
||||
Err(RTProtocolError::MissUpdates {
|
||||
state_vector_v1: Some(state_vector_v1),
|
||||
reason: "server miss updates".to_string(),
|
||||
})
|
||||
},
|
||||
None => Ok(()),
|
||||
}
|
||||
let mut txn = collab
|
||||
.get_awareness()
|
||||
.doc()
|
||||
.try_transact_mut_with(origin.clone())
|
||||
.map_err(|err| {
|
||||
RTProtocolError::YrsTransaction(format!("sync step2 transaction acquire: {}", err))
|
||||
})?;
|
||||
txn.try_apply_update(update).map_err(|err| {
|
||||
RTProtocolError::YrsApplyUpdate(format!(
|
||||
"sync step2 apply update: {}\ndocument state: {:#?}",
|
||||
err,
|
||||
txn.store()
|
||||
))
|
||||
})?;
|
||||
|
||||
// If server can't apply updates sent by client, which means the server is missing some updates
|
||||
// from the client or the client is missing some updates from the server.
|
||||
// If the client can't apply broadcast from server, which means the client is missing some
|
||||
// updates.
|
||||
match txn.store().pending_update() {
|
||||
Some(_update) => {
|
||||
// let state_vector_v1 = update.missing.encode_v1();
|
||||
// for the moment, we don't need to send missing updates to the client. passing None
|
||||
// instead, which will trigger a sync step 0 on client
|
||||
let state_vector_v1 = txn.state_vector().encode_v1();
|
||||
Err(RTProtocolError::MissUpdates {
|
||||
state_vector_v1: Some(state_vector_v1),
|
||||
reason: "server miss updates".to_string(),
|
||||
})
|
||||
},
|
||||
None => Ok(None),
|
||||
}
|
||||
};
|
||||
|
||||
let elapsed = start.elapsed();
|
||||
self
|
||||
.metrics
|
||||
.apply_update_time
|
||||
.observe(elapsed.as_millis() as f64);
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
fn handle_custom_message(
|
||||
async fn handle_custom_message(
|
||||
&self,
|
||||
_awareness: &mut Awareness,
|
||||
_collab: &CollabRef,
|
||||
_msg: CustomMessage,
|
||||
) -> Result<Option<Vec<u8>>, RTProtocolError> {
|
||||
Ok(None)
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ use crate::error::RealtimeError;
|
|||
use crate::group::group_init::CollabGroup;
|
||||
use crate::metrics::CollabRealtimeMetrics;
|
||||
|
||||
#[derive(Default, Clone)]
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct GroupManagementState {
|
||||
group_by_object_id: Arc<DashMap<String, Arc<CollabGroup>>>,
|
||||
/// Keep track of all [Collab] objects that a user is subscribed to.
|
||||
|
|
|
|||
|
|
@ -2,12 +2,13 @@ use std::sync::Arc;
|
|||
use std::time::Duration;
|
||||
|
||||
use prometheus_client::metrics::gauge::Gauge;
|
||||
use prometheus_client::metrics::histogram::Histogram;
|
||||
use prometheus_client::registry::Registry;
|
||||
use tokio::time::interval;
|
||||
|
||||
use database::collab::CollabStorage;
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
#[derive(Clone)]
|
||||
pub struct CollabRealtimeMetrics {
|
||||
pub(crate) connected_users: Gauge,
|
||||
pub(crate) total_success_get_encode_collab_from_redis: Gauge,
|
||||
|
|
@ -20,10 +21,14 @@ pub struct CollabRealtimeMetrics {
|
|||
pub(crate) apply_update_failed_count: Gauge,
|
||||
pub(crate) acquire_collab_lock_count: Gauge,
|
||||
pub(crate) acquire_collab_lock_fail_count: Gauge,
|
||||
/// How long it takes to apply update in milliseconds.
|
||||
pub(crate) apply_update_time: Histogram,
|
||||
/// How big the update is in bytes.
|
||||
pub(crate) apply_update_size: Histogram,
|
||||
}
|
||||
|
||||
impl CollabRealtimeMetrics {
|
||||
fn init() -> Self {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
connected_users: Gauge::default(),
|
||||
total_success_get_encode_collab_from_redis: Gauge::default(),
|
||||
|
|
@ -34,11 +39,26 @@ impl CollabRealtimeMetrics {
|
|||
apply_update_failed_count: Default::default(),
|
||||
acquire_collab_lock_count: Default::default(),
|
||||
acquire_collab_lock_fail_count: Default::default(),
|
||||
|
||||
// when it comes to histograms we organize them by buckets or specific sizes - since our
|
||||
// prometheus client doesn't support Summary type, we use Histogram type instead
|
||||
|
||||
// time spent on apply_update in milliseconds: 1ms, 5ms, 15ms, 30ms, 100ms, 200ms, 500ms, 1s
|
||||
apply_update_time: Histogram::new(
|
||||
[1.0, 5.0, 15.0, 30.0, 100.0, 200.0, 500.0, 1000.0].into_iter(),
|
||||
),
|
||||
// update size in bytes: 128B, 512B, 1KB, 64KB, 512KB, 1MB, 5MB, 10MB
|
||||
apply_update_size: Histogram::new(
|
||||
[
|
||||
128.0, 512.0, 1024.0, 65536.0, 524288.0, 1048576.0, 5242880.0, 10485760.0,
|
||||
]
|
||||
.into_iter(),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn register(registry: &mut Registry) -> Self {
|
||||
let metrics = Self::init();
|
||||
let metrics = Self::new();
|
||||
let realtime_registry = registry.sub_registry_with_prefix("realtime");
|
||||
realtime_registry.register(
|
||||
"connected_users",
|
||||
|
|
@ -86,6 +106,16 @@ impl CollabRealtimeMetrics {
|
|||
"number of acquire collab lock failed",
|
||||
metrics.acquire_collab_lock_fail_count.clone(),
|
||||
);
|
||||
realtime_registry.register(
|
||||
"apply_update_time",
|
||||
"time spent on applying collab updates in milliseconds",
|
||||
metrics.apply_update_time.clone(),
|
||||
);
|
||||
realtime_registry.register(
|
||||
"apply_update_size",
|
||||
"size of updates applied to collab in bytes",
|
||||
metrics.apply_update_size.clone(),
|
||||
);
|
||||
|
||||
metrics
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue