From 61820d72263a6c8f2f2bad00877bdc454c70a008 Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Tue, 15 Oct 2024 11:29:42 +0200 Subject: [PATCH] chore: more traces in collab group --- ...a673ea4e196920636e4a4db9d42fad3ef4d73.json | 14 ++++ .../src/group/group_init.rs | 74 +++++++++++++++---- services/appflowy-collaborate/src/lib.rs | 2 +- services/appflowy-collaborate/tests/main.rs | 1 - 4 files changed, 73 insertions(+), 18 deletions(-) create mode 100644 .sqlx/query-884c44d3a87ca4e520f9e8cec6ba673ea4e196920636e4a4db9d42fad3ef4d73.json delete mode 100644 services/appflowy-collaborate/tests/main.rs diff --git a/.sqlx/query-884c44d3a87ca4e520f9e8cec6ba673ea4e196920636e4a4db9d42fad3ef4d73.json b/.sqlx/query-884c44d3a87ca4e520f9e8cec6ba673ea4e196920636e4a4db9d42fad3ef4d73.json new file mode 100644 index 00000000..aafc6f23 --- /dev/null +++ b/.sqlx/query-884c44d3a87ca4e520f9e8cec6ba673ea4e196920636e4a4db9d42fad3ef4d73.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE auth.users\n SET role = 'supabase_admin', email_confirmed_at = NOW()\n WHERE id = $1\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "884c44d3a87ca4e520f9e8cec6ba673ea4e196920636e4a4db9d42fad3ef4d73" +} diff --git a/services/appflowy-collaborate/src/group/group_init.rs b/services/appflowy-collaborate/src/group/group_init.rs index a0cb22c3..dfec05dd 100644 --- a/services/appflowy-collaborate/src/group/group_init.rs +++ b/services/appflowy-collaborate/src/group/group_init.rs @@ -194,6 +194,11 @@ impl CollabGroup { } async fn handle_inbound_update(state: &CollabGroupState, update: CollabStreamUpdate) { + tracing::trace!( + "broadcasting collab update from {} ({} bytes)", + update.sender, + update.data.len() + ); // update state vector based on incoming message let mut sv = state.state_vector.write().await; sv.merge(update.state_vector); @@ -253,6 +258,11 @@ impl CollabGroup { } async fn handle_inbound_awareness(state: &CollabGroupState, update: AwarenessStreamUpdate) { + tracing::trace!( + "broadcasting awareness update from {} ({} bytes)", + update.sender, + update.data.len() + ); let sender = update.sender; let message = AwarenessSync::new( state.object_id.clone(), @@ -512,6 +522,7 @@ impl CollabGroup { message_origin: &CollabOrigin, msg_id: MsgId, ) -> Result, RealtimeError> { + tracing::trace!("handling collab message: {}", msg_id); let mut decoder = DecoderV1::from(payload); let reader = MessageReader::new(&mut decoder); let mut ack_response = None; @@ -523,6 +534,7 @@ impl CollabGroup { match Self::handle_protocol_message(state, message_origin, msg).await { Ok(payload) => { state.metrics.apply_update_count.inc(); + // One ClientCollabMessage can have multiple Yrs [Message] in it, but we only need to // send one ack back to the client. if ack_response.is_none() { @@ -539,6 +551,7 @@ impl CollabGroup { }, Err(err) => { state.metrics.apply_update_failed_count.inc(); + let code = Self::ack_code_from_error(&err); let payload = match err { RTProtocolError::MissUpdates { @@ -598,6 +611,7 @@ impl CollabGroup { // we optimistically try to obtain state vector lock for a fast track: // if we remote sv is up-to-date with current one, we don't need to do anything if remote_sv == &*sv { + tracing::trace!("remote client is up to date - nothing to send"); return Ok(None); } } @@ -612,6 +626,7 @@ impl CollabGroup { let local_sv = tx.state_vector(); drop(tx); // Retrieve the latest document state from the client after they return online from offline editing. + tracing::trace!("sending missing data to client ({} bytes)", doc_state.len()); let mut encoder = EncoderV1::new(); Message::Sync(SyncMessage::SyncStep2(doc_state)).encode(&mut encoder); @@ -627,6 +642,7 @@ impl CollabGroup { update: Vec, ) -> Result>, RTProtocolError> { state.metrics.apply_update_size.observe(update.len() as f64); + let start = tokio::time::Instant::now(); // we try to decode update to make sure it's not malformed and to extract state vector let (update, decoded_update) = if update.len() <= collab_rt_protocol::LARGE_UPDATE_THRESHOLD { @@ -647,10 +663,12 @@ impl CollabGroup { .await .map_err(|err| RTProtocolError::Internal(err.into()))?; let elapsed = start.elapsed(); + state .metrics .apply_update_time .observe(elapsed.as_millis() as f64); + Ok(None) } @@ -782,6 +800,7 @@ impl Subscription { impl Drop for Subscription { fn drop(&mut self) { + tracing::trace!("closing subscription: {}", self.collab_origin); self.shutdown.cancel(); } } @@ -833,16 +852,16 @@ impl CollabPersister { update: Vec, state_vector: StateVector, ) -> Result { + let len = update.len(); // send updates to redis queue - let msg_id = self - .update_sink - .send(&CollabStreamUpdate::new( - update, - state_vector, - sender, - UpdateFlags::default(), - )) - .await?; + let update = CollabStreamUpdate::new(update, state_vector, sender, UpdateFlags::default()); + let msg_id = self.update_sink.send(&update).await?; + tracing::trace!( + "persisted update from {} ({} bytes) - msg id: {}", + update.sender, + len, + msg_id + ); Ok(msg_id) } @@ -853,13 +872,18 @@ impl CollabPersister { ) -> Result { // send awareness updates to redis queue: // QUESTION: is it needed? Maybe we could reuse update_sink? - let msg_id = self - .awareness_sink - .send(&AwarenessStreamUpdate { - data: awareness_update, - sender: sender_session.clone(), - }) - .await?; + let len = awareness_update.len(); + let update = AwarenessStreamUpdate { + data: awareness_update, + sender: sender_session.clone(), + }; + let msg_id = self.awareness_sink.send(&update).await?; + tracing::trace!( + "persisted update from {} ({} bytes) - msg id: {}", + update.sender, + len, + msg_id + ); Ok(msg_id) } @@ -880,9 +904,11 @@ impl CollabPersister { None, //TODO: store Redis last msg id somewhere in doc state snapshot and replay from there ); pin_mut!(stream); + let mut i = 0; while let Some(res) = stream.next().await { match res { Ok((message_id, update)) => { + i += 1; let update: Update = update.into_update()?; tx.apply_update(update) .map_err(|err| RTProtocolError::YrsApplyUpdate(err.to_string()))?; @@ -899,6 +925,12 @@ impl CollabPersister { } } drop(tx); // apply transaction to compress the state (without GC) + tracing::trace!( + "loaded collab {} state: {} replaying {} updates", + self.object_id, + if skip_gc { "full" } else { "compact" }, + i + ); // now we have the most recent version of the document let snapshot = CollabSnapshot { @@ -944,6 +976,7 @@ impl CollabPersister { let mut tx = collab.transact_mut(); let sv = tx.state_vector().encode_v1(); let doc_state_full = tx.encode_state_as_update_v1(&StateVector::default()); + let full_len = doc_state_full.len(); let encoded_collab = EncodedCollab::new_v1(sv.clone(), doc_state_full) .encode_to_bytes() .map_err(|err| RealtimeError::Internal(err.into()))?; @@ -966,6 +999,7 @@ impl CollabPersister { let doc_state_light = collab .transact() .encode_state_as_update_v1(&StateVector::default()); + let light_len = doc_state_light.len(); let encoded_collab = EncodedCollab::new_v1(sv, doc_state_light) .encode_to_bytes() .map_err(|err| RealtimeError::Internal(err.into()))?; @@ -982,6 +1016,14 @@ impl CollabPersister { .await .map_err(|err| RealtimeError::Internal(err.into()))?; + tracing::debug!( + "persisted collab {} snapshot at {}: {} and {} bytes", + self.object_id, + message_id, + full_len, + light_len + ); + // 3. finally we can drop Redis messages let msg_id = MessageId { timestamp_ms: message_id.timestamp_ms - Self::GRACE_PERIOD_MS, diff --git a/services/appflowy-collaborate/src/lib.rs b/services/appflowy-collaborate/src/lib.rs index 2fa28cf5..09b6f7f9 100644 --- a/services/appflowy-collaborate/src/lib.rs +++ b/services/appflowy-collaborate/src/lib.rs @@ -8,7 +8,7 @@ pub mod compression; pub mod config; pub mod connect_state; pub mod error; -mod group; +pub mod group; pub mod indexer; pub mod metrics; mod permission; diff --git a/services/appflowy-collaborate/tests/main.rs b/services/appflowy-collaborate/tests/main.rs deleted file mode 100644 index 8b137891..00000000 --- a/services/appflowy-collaborate/tests/main.rs +++ /dev/null @@ -1 +0,0 @@ -