chore: more traces in collab group

This commit is contained in:
Bartosz Sypytkowski 2024-10-15 11:29:42 +02:00
parent a1546909c3
commit 61820d7226
4 changed files with 73 additions and 18 deletions

View File

@ -0,0 +1,14 @@
{
"db_name": "PostgreSQL",
"query": "\n UPDATE auth.users\n SET role = 'supabase_admin', email_confirmed_at = NOW()\n WHERE id = $1\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": []
},
"hash": "884c44d3a87ca4e520f9e8cec6ba673ea4e196920636e4a4db9d42fad3ef4d73"
}

View File

@ -194,6 +194,11 @@ impl CollabGroup {
}
async fn handle_inbound_update(state: &CollabGroupState, update: CollabStreamUpdate) {
tracing::trace!(
"broadcasting collab update from {} ({} bytes)",
update.sender,
update.data.len()
);
// update state vector based on incoming message
let mut sv = state.state_vector.write().await;
sv.merge(update.state_vector);
@ -253,6 +258,11 @@ impl CollabGroup {
}
async fn handle_inbound_awareness(state: &CollabGroupState, update: AwarenessStreamUpdate) {
tracing::trace!(
"broadcasting awareness update from {} ({} bytes)",
update.sender,
update.data.len()
);
let sender = update.sender;
let message = AwarenessSync::new(
state.object_id.clone(),
@ -512,6 +522,7 @@ impl CollabGroup {
message_origin: &CollabOrigin,
msg_id: MsgId,
) -> Result<Option<CollabAck>, RealtimeError> {
tracing::trace!("handling collab message: {}", msg_id);
let mut decoder = DecoderV1::from(payload);
let reader = MessageReader::new(&mut decoder);
let mut ack_response = None;
@ -523,6 +534,7 @@ impl CollabGroup {
match Self::handle_protocol_message(state, message_origin, msg).await {
Ok(payload) => {
state.metrics.apply_update_count.inc();
// One ClientCollabMessage can have multiple Yrs [Message] in it, but we only need to
// send one ack back to the client.
if ack_response.is_none() {
@ -539,6 +551,7 @@ impl CollabGroup {
},
Err(err) => {
state.metrics.apply_update_failed_count.inc();
let code = Self::ack_code_from_error(&err);
let payload = match err {
RTProtocolError::MissUpdates {
@ -598,6 +611,7 @@ impl CollabGroup {
// we optimistically try to obtain state vector lock for a fast track:
// if we remote sv is up-to-date with current one, we don't need to do anything
if remote_sv == &*sv {
tracing::trace!("remote client is up to date - nothing to send");
return Ok(None);
}
}
@ -612,6 +626,7 @@ impl CollabGroup {
let local_sv = tx.state_vector();
drop(tx);
// Retrieve the latest document state from the client after they return online from offline editing.
tracing::trace!("sending missing data to client ({} bytes)", doc_state.len());
let mut encoder = EncoderV1::new();
Message::Sync(SyncMessage::SyncStep2(doc_state)).encode(&mut encoder);
@ -627,6 +642,7 @@ impl CollabGroup {
update: Vec<u8>,
) -> Result<Option<Vec<u8>>, RTProtocolError> {
state.metrics.apply_update_size.observe(update.len() as f64);
let start = tokio::time::Instant::now();
// we try to decode update to make sure it's not malformed and to extract state vector
let (update, decoded_update) = if update.len() <= collab_rt_protocol::LARGE_UPDATE_THRESHOLD {
@ -647,10 +663,12 @@ impl CollabGroup {
.await
.map_err(|err| RTProtocolError::Internal(err.into()))?;
let elapsed = start.elapsed();
state
.metrics
.apply_update_time
.observe(elapsed.as_millis() as f64);
Ok(None)
}
@ -782,6 +800,7 @@ impl Subscription {
impl Drop for Subscription {
fn drop(&mut self) {
tracing::trace!("closing subscription: {}", self.collab_origin);
self.shutdown.cancel();
}
}
@ -833,16 +852,16 @@ impl CollabPersister {
update: Vec<u8>,
state_vector: StateVector,
) -> Result<MessageId, StreamError> {
let len = update.len();
// send updates to redis queue
let msg_id = self
.update_sink
.send(&CollabStreamUpdate::new(
update,
state_vector,
sender,
UpdateFlags::default(),
))
.await?;
let update = CollabStreamUpdate::new(update, state_vector, sender, UpdateFlags::default());
let msg_id = self.update_sink.send(&update).await?;
tracing::trace!(
"persisted update from {} ({} bytes) - msg id: {}",
update.sender,
len,
msg_id
);
Ok(msg_id)
}
@ -853,13 +872,18 @@ impl CollabPersister {
) -> Result<MessageId, StreamError> {
// send awareness updates to redis queue:
// QUESTION: is it needed? Maybe we could reuse update_sink?
let msg_id = self
.awareness_sink
.send(&AwarenessStreamUpdate {
data: awareness_update,
sender: sender_session.clone(),
})
.await?;
let len = awareness_update.len();
let update = AwarenessStreamUpdate {
data: awareness_update,
sender: sender_session.clone(),
};
let msg_id = self.awareness_sink.send(&update).await?;
tracing::trace!(
"persisted update from {} ({} bytes) - msg id: {}",
update.sender,
len,
msg_id
);
Ok(msg_id)
}
@ -880,9 +904,11 @@ impl CollabPersister {
None, //TODO: store Redis last msg id somewhere in doc state snapshot and replay from there
);
pin_mut!(stream);
let mut i = 0;
while let Some(res) = stream.next().await {
match res {
Ok((message_id, update)) => {
i += 1;
let update: Update = update.into_update()?;
tx.apply_update(update)
.map_err(|err| RTProtocolError::YrsApplyUpdate(err.to_string()))?;
@ -899,6 +925,12 @@ impl CollabPersister {
}
}
drop(tx); // apply transaction to compress the state (without GC)
tracing::trace!(
"loaded collab {} state: {} replaying {} updates",
self.object_id,
if skip_gc { "full" } else { "compact" },
i
);
// now we have the most recent version of the document
let snapshot = CollabSnapshot {
@ -944,6 +976,7 @@ impl CollabPersister {
let mut tx = collab.transact_mut();
let sv = tx.state_vector().encode_v1();
let doc_state_full = tx.encode_state_as_update_v1(&StateVector::default());
let full_len = doc_state_full.len();
let encoded_collab = EncodedCollab::new_v1(sv.clone(), doc_state_full)
.encode_to_bytes()
.map_err(|err| RealtimeError::Internal(err.into()))?;
@ -966,6 +999,7 @@ impl CollabPersister {
let doc_state_light = collab
.transact()
.encode_state_as_update_v1(&StateVector::default());
let light_len = doc_state_light.len();
let encoded_collab = EncodedCollab::new_v1(sv, doc_state_light)
.encode_to_bytes()
.map_err(|err| RealtimeError::Internal(err.into()))?;
@ -982,6 +1016,14 @@ impl CollabPersister {
.await
.map_err(|err| RealtimeError::Internal(err.into()))?;
tracing::debug!(
"persisted collab {} snapshot at {}: {} and {} bytes",
self.object_id,
message_id,
full_len,
light_len
);
// 3. finally we can drop Redis messages
let msg_id = MessageId {
timestamp_ms: message_id.timestamp_ms - Self::GRACE_PERIOD_MS,

View File

@ -8,7 +8,7 @@ pub mod compression;
pub mod config;
pub mod connect_state;
pub mod error;
mod group;
pub mod group;
pub mod indexer;
pub mod metrics;
mod permission;