chore: more traces in collab group
This commit is contained in:
parent
d3553e5dfc
commit
21b734269c
|
|
@ -0,0 +1,14 @@
|
||||||
|
{
|
||||||
|
"db_name": "PostgreSQL",
|
||||||
|
"query": "\n UPDATE auth.users\n SET role = 'supabase_admin', email_confirmed_at = NOW()\n WHERE id = $1\n ",
|
||||||
|
"describe": {
|
||||||
|
"columns": [],
|
||||||
|
"parameters": {
|
||||||
|
"Left": [
|
||||||
|
"Uuid"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"nullable": []
|
||||||
|
},
|
||||||
|
"hash": "884c44d3a87ca4e520f9e8cec6ba673ea4e196920636e4a4db9d42fad3ef4d73"
|
||||||
|
}
|
||||||
|
|
@ -194,6 +194,11 @@ impl CollabGroup {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_inbound_update(state: &CollabGroupState, update: CollabStreamUpdate) {
|
async fn handle_inbound_update(state: &CollabGroupState, update: CollabStreamUpdate) {
|
||||||
|
tracing::trace!(
|
||||||
|
"broadcasting collab update from {} ({} bytes)",
|
||||||
|
update.sender,
|
||||||
|
update.data.len()
|
||||||
|
);
|
||||||
// update state vector based on incoming message
|
// update state vector based on incoming message
|
||||||
let mut sv = state.state_vector.write().await;
|
let mut sv = state.state_vector.write().await;
|
||||||
sv.merge(update.state_vector);
|
sv.merge(update.state_vector);
|
||||||
|
|
@ -253,6 +258,11 @@ impl CollabGroup {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_inbound_awareness(state: &CollabGroupState, update: AwarenessStreamUpdate) {
|
async fn handle_inbound_awareness(state: &CollabGroupState, update: AwarenessStreamUpdate) {
|
||||||
|
tracing::trace!(
|
||||||
|
"broadcasting awareness update from {} ({} bytes)",
|
||||||
|
update.sender,
|
||||||
|
update.data.len()
|
||||||
|
);
|
||||||
let sender = update.sender;
|
let sender = update.sender;
|
||||||
let message = AwarenessSync::new(
|
let message = AwarenessSync::new(
|
||||||
state.object_id.clone(),
|
state.object_id.clone(),
|
||||||
|
|
@ -512,6 +522,7 @@ impl CollabGroup {
|
||||||
message_origin: &CollabOrigin,
|
message_origin: &CollabOrigin,
|
||||||
msg_id: MsgId,
|
msg_id: MsgId,
|
||||||
) -> Result<Option<CollabAck>, RealtimeError> {
|
) -> Result<Option<CollabAck>, RealtimeError> {
|
||||||
|
tracing::trace!("handling collab message: {}", msg_id);
|
||||||
let mut decoder = DecoderV1::from(payload);
|
let mut decoder = DecoderV1::from(payload);
|
||||||
let reader = MessageReader::new(&mut decoder);
|
let reader = MessageReader::new(&mut decoder);
|
||||||
let mut ack_response = None;
|
let mut ack_response = None;
|
||||||
|
|
@ -523,6 +534,7 @@ impl CollabGroup {
|
||||||
match Self::handle_protocol_message(state, message_origin, msg).await {
|
match Self::handle_protocol_message(state, message_origin, msg).await {
|
||||||
Ok(payload) => {
|
Ok(payload) => {
|
||||||
state.metrics.apply_update_count.inc();
|
state.metrics.apply_update_count.inc();
|
||||||
|
|
||||||
// One ClientCollabMessage can have multiple Yrs [Message] in it, but we only need to
|
// One ClientCollabMessage can have multiple Yrs [Message] in it, but we only need to
|
||||||
// send one ack back to the client.
|
// send one ack back to the client.
|
||||||
if ack_response.is_none() {
|
if ack_response.is_none() {
|
||||||
|
|
@ -539,6 +551,7 @@ impl CollabGroup {
|
||||||
},
|
},
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
state.metrics.apply_update_failed_count.inc();
|
state.metrics.apply_update_failed_count.inc();
|
||||||
|
|
||||||
let code = Self::ack_code_from_error(&err);
|
let code = Self::ack_code_from_error(&err);
|
||||||
let payload = match err {
|
let payload = match err {
|
||||||
RTProtocolError::MissUpdates {
|
RTProtocolError::MissUpdates {
|
||||||
|
|
@ -598,6 +611,7 @@ impl CollabGroup {
|
||||||
// we optimistically try to obtain state vector lock for a fast track:
|
// we optimistically try to obtain state vector lock for a fast track:
|
||||||
// if we remote sv is up-to-date with current one, we don't need to do anything
|
// if we remote sv is up-to-date with current one, we don't need to do anything
|
||||||
if remote_sv == &*sv {
|
if remote_sv == &*sv {
|
||||||
|
tracing::trace!("remote client is up to date - nothing to send");
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -612,6 +626,7 @@ impl CollabGroup {
|
||||||
let local_sv = tx.state_vector();
|
let local_sv = tx.state_vector();
|
||||||
drop(tx);
|
drop(tx);
|
||||||
// Retrieve the latest document state from the client after they return online from offline editing.
|
// Retrieve the latest document state from the client after they return online from offline editing.
|
||||||
|
tracing::trace!("sending missing data to client ({} bytes)", doc_state.len());
|
||||||
let mut encoder = EncoderV1::new();
|
let mut encoder = EncoderV1::new();
|
||||||
Message::Sync(SyncMessage::SyncStep2(doc_state)).encode(&mut encoder);
|
Message::Sync(SyncMessage::SyncStep2(doc_state)).encode(&mut encoder);
|
||||||
|
|
||||||
|
|
@ -627,6 +642,7 @@ impl CollabGroup {
|
||||||
update: Vec<u8>,
|
update: Vec<u8>,
|
||||||
) -> Result<Option<Vec<u8>>, RTProtocolError> {
|
) -> Result<Option<Vec<u8>>, RTProtocolError> {
|
||||||
state.metrics.apply_update_size.observe(update.len() as f64);
|
state.metrics.apply_update_size.observe(update.len() as f64);
|
||||||
|
|
||||||
let start = tokio::time::Instant::now();
|
let start = tokio::time::Instant::now();
|
||||||
// we try to decode update to make sure it's not malformed and to extract state vector
|
// we try to decode update to make sure it's not malformed and to extract state vector
|
||||||
let (update, decoded_update) = if update.len() <= collab_rt_protocol::LARGE_UPDATE_THRESHOLD {
|
let (update, decoded_update) = if update.len() <= collab_rt_protocol::LARGE_UPDATE_THRESHOLD {
|
||||||
|
|
@ -647,10 +663,12 @@ impl CollabGroup {
|
||||||
.await
|
.await
|
||||||
.map_err(|err| RTProtocolError::Internal(err.into()))?;
|
.map_err(|err| RTProtocolError::Internal(err.into()))?;
|
||||||
let elapsed = start.elapsed();
|
let elapsed = start.elapsed();
|
||||||
|
|
||||||
state
|
state
|
||||||
.metrics
|
.metrics
|
||||||
.apply_update_time
|
.apply_update_time
|
||||||
.observe(elapsed.as_millis() as f64);
|
.observe(elapsed.as_millis() as f64);
|
||||||
|
|
||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -782,6 +800,7 @@ impl Subscription {
|
||||||
|
|
||||||
impl Drop for Subscription {
|
impl Drop for Subscription {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
|
tracing::trace!("closing subscription: {}", self.collab_origin);
|
||||||
self.shutdown.cancel();
|
self.shutdown.cancel();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -833,16 +852,16 @@ impl CollabPersister {
|
||||||
update: Vec<u8>,
|
update: Vec<u8>,
|
||||||
state_vector: StateVector,
|
state_vector: StateVector,
|
||||||
) -> Result<MessageId, StreamError> {
|
) -> Result<MessageId, StreamError> {
|
||||||
|
let len = update.len();
|
||||||
// send updates to redis queue
|
// send updates to redis queue
|
||||||
let msg_id = self
|
let update = CollabStreamUpdate::new(update, state_vector, sender, UpdateFlags::default());
|
||||||
.update_sink
|
let msg_id = self.update_sink.send(&update).await?;
|
||||||
.send(&CollabStreamUpdate::new(
|
tracing::trace!(
|
||||||
update,
|
"persisted update from {} ({} bytes) - msg id: {}",
|
||||||
state_vector,
|
update.sender,
|
||||||
sender,
|
len,
|
||||||
UpdateFlags::default(),
|
msg_id
|
||||||
))
|
);
|
||||||
.await?;
|
|
||||||
Ok(msg_id)
|
Ok(msg_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -853,13 +872,18 @@ impl CollabPersister {
|
||||||
) -> Result<MessageId, StreamError> {
|
) -> Result<MessageId, StreamError> {
|
||||||
// send awareness updates to redis queue:
|
// send awareness updates to redis queue:
|
||||||
// QUESTION: is it needed? Maybe we could reuse update_sink?
|
// QUESTION: is it needed? Maybe we could reuse update_sink?
|
||||||
let msg_id = self
|
let len = awareness_update.len();
|
||||||
.awareness_sink
|
let update = AwarenessStreamUpdate {
|
||||||
.send(&AwarenessStreamUpdate {
|
|
||||||
data: awareness_update,
|
data: awareness_update,
|
||||||
sender: sender_session.clone(),
|
sender: sender_session.clone(),
|
||||||
})
|
};
|
||||||
.await?;
|
let msg_id = self.awareness_sink.send(&update).await?;
|
||||||
|
tracing::trace!(
|
||||||
|
"persisted update from {} ({} bytes) - msg id: {}",
|
||||||
|
update.sender,
|
||||||
|
len,
|
||||||
|
msg_id
|
||||||
|
);
|
||||||
Ok(msg_id)
|
Ok(msg_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -880,9 +904,11 @@ impl CollabPersister {
|
||||||
None, //TODO: store Redis last msg id somewhere in doc state snapshot and replay from there
|
None, //TODO: store Redis last msg id somewhere in doc state snapshot and replay from there
|
||||||
);
|
);
|
||||||
pin_mut!(stream);
|
pin_mut!(stream);
|
||||||
|
let mut i = 0;
|
||||||
while let Some(res) = stream.next().await {
|
while let Some(res) = stream.next().await {
|
||||||
match res {
|
match res {
|
||||||
Ok((message_id, update)) => {
|
Ok((message_id, update)) => {
|
||||||
|
i += 1;
|
||||||
let update: Update = update.into_update()?;
|
let update: Update = update.into_update()?;
|
||||||
tx.apply_update(update)
|
tx.apply_update(update)
|
||||||
.map_err(|err| RTProtocolError::YrsApplyUpdate(err.to_string()))?;
|
.map_err(|err| RTProtocolError::YrsApplyUpdate(err.to_string()))?;
|
||||||
|
|
@ -899,6 +925,12 @@ impl CollabPersister {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
drop(tx); // apply transaction to compress the state (without GC)
|
drop(tx); // apply transaction to compress the state (without GC)
|
||||||
|
tracing::trace!(
|
||||||
|
"loaded collab {} state: {} replaying {} updates",
|
||||||
|
self.object_id,
|
||||||
|
if skip_gc { "full" } else { "compact" },
|
||||||
|
i
|
||||||
|
);
|
||||||
|
|
||||||
// now we have the most recent version of the document
|
// now we have the most recent version of the document
|
||||||
let snapshot = CollabSnapshot {
|
let snapshot = CollabSnapshot {
|
||||||
|
|
@ -944,6 +976,7 @@ impl CollabPersister {
|
||||||
let mut tx = collab.transact_mut();
|
let mut tx = collab.transact_mut();
|
||||||
let sv = tx.state_vector().encode_v1();
|
let sv = tx.state_vector().encode_v1();
|
||||||
let doc_state_full = tx.encode_state_as_update_v1(&StateVector::default());
|
let doc_state_full = tx.encode_state_as_update_v1(&StateVector::default());
|
||||||
|
let full_len = doc_state_full.len();
|
||||||
let encoded_collab = EncodedCollab::new_v1(sv.clone(), doc_state_full)
|
let encoded_collab = EncodedCollab::new_v1(sv.clone(), doc_state_full)
|
||||||
.encode_to_bytes()
|
.encode_to_bytes()
|
||||||
.map_err(|err| RealtimeError::Internal(err.into()))?;
|
.map_err(|err| RealtimeError::Internal(err.into()))?;
|
||||||
|
|
@ -966,6 +999,7 @@ impl CollabPersister {
|
||||||
let doc_state_light = collab
|
let doc_state_light = collab
|
||||||
.transact()
|
.transact()
|
||||||
.encode_state_as_update_v1(&StateVector::default());
|
.encode_state_as_update_v1(&StateVector::default());
|
||||||
|
let light_len = doc_state_light.len();
|
||||||
let encoded_collab = EncodedCollab::new_v1(sv, doc_state_light)
|
let encoded_collab = EncodedCollab::new_v1(sv, doc_state_light)
|
||||||
.encode_to_bytes()
|
.encode_to_bytes()
|
||||||
.map_err(|err| RealtimeError::Internal(err.into()))?;
|
.map_err(|err| RealtimeError::Internal(err.into()))?;
|
||||||
|
|
@ -982,6 +1016,14 @@ impl CollabPersister {
|
||||||
.await
|
.await
|
||||||
.map_err(|err| RealtimeError::Internal(err.into()))?;
|
.map_err(|err| RealtimeError::Internal(err.into()))?;
|
||||||
|
|
||||||
|
tracing::debug!(
|
||||||
|
"persisted collab {} snapshot at {}: {} and {} bytes",
|
||||||
|
self.object_id,
|
||||||
|
message_id,
|
||||||
|
full_len,
|
||||||
|
light_len
|
||||||
|
);
|
||||||
|
|
||||||
// 3. finally we can drop Redis messages
|
// 3. finally we can drop Redis messages
|
||||||
let msg_id = MessageId {
|
let msg_id = MessageId {
|
||||||
timestamp_ms: message_id.timestamp_ms - Self::GRACE_PERIOD_MS,
|
timestamp_ms: message_id.timestamp_ms - Self::GRACE_PERIOD_MS,
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,7 @@ pub mod compression;
|
||||||
pub mod config;
|
pub mod config;
|
||||||
pub mod connect_state;
|
pub mod connect_state;
|
||||||
pub mod error;
|
pub mod error;
|
||||||
mod group;
|
pub mod group;
|
||||||
pub mod indexer;
|
pub mod indexer;
|
||||||
pub mod metrics;
|
pub mod metrics;
|
||||||
mod permission;
|
mod permission;
|
||||||
|
|
|
||||||
|
|
@ -1 +0,0 @@
|
||||||
|
|
||||||
Loading…
Reference in New Issue