From d4e45efd6a16cb83e9c60af26d7bab99cde76646 Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Mon, 25 Mar 2024 16:14:49 +0800 Subject: [PATCH] chore: bump collab commit (#417) * chore: bump collab rev * chore: bump collab commit * chore: custome decode ack code --- Cargo.lock | 8 ++-- Cargo.toml | 13 ++++-- libs/client-api-test-util/src/test_client.rs | 23 +++++++---- libs/client-api/src/ws/msg_queue.rs | 1 - libs/collab-rt-entity/src/collab_msg.rs | 40 +++++++++++++++++-- libs/collab-rt-protocol/src/message.rs | 3 ++ libs/collab-rt-protocol/src/protocol.rs | 13 +++++- .../src/collaborate/group_broadcast.rs | 1 + libs/collab-rt/src/collaborate/plugin.rs | 4 +- .../src/collaborate/sync_protocol.rs | 11 ++++- src/biz/collab/cache.rs | 11 ++++- src/biz/collab/disk_cache.rs | 5 ++- src/biz/collab/mem_cache.rs | 7 +++- src/biz/collab/storage.rs | 8 +++- tests/collab_snapshot/snapshot_test.rs | 4 +- 15 files changed, 116 insertions(+), 36 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 86db8e76..e99777c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1388,7 +1388,7 @@ dependencies = [ [[package]] name = "collab" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=0970b2e1440134af7c83bb8fc80cac5d2dedebb7#0970b2e1440134af7c83bb8fc80cac5d2dedebb7" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=409058aad0969c4d4429151317428a3d17f341d1#409058aad0969c4d4429151317428a3d17f341d1" dependencies = [ "anyhow", "async-trait", @@ -1412,7 +1412,7 @@ dependencies = [ [[package]] name = "collab-document" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=0970b2e1440134af7c83bb8fc80cac5d2dedebb7#0970b2e1440134af7c83bb8fc80cac5d2dedebb7" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=409058aad0969c4d4429151317428a3d17f341d1#409058aad0969c4d4429151317428a3d17f341d1" dependencies = [ "anyhow", "collab", @@ -1431,7 +1431,7 @@ dependencies = [ [[package]] name = "collab-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=0970b2e1440134af7c83bb8fc80cac5d2dedebb7#0970b2e1440134af7c83bb8fc80cac5d2dedebb7" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=409058aad0969c4d4429151317428a3d17f341d1#409058aad0969c4d4429151317428a3d17f341d1" dependencies = [ "anyhow", "bytes", @@ -1446,7 +1446,7 @@ dependencies = [ [[package]] name = "collab-folder" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=0970b2e1440134af7c83bb8fc80cac5d2dedebb7#0970b2e1440134af7c83bb8fc80cac5d2dedebb7" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=409058aad0969c4d4429151317428a3d17f341d1#409058aad0969c4d4429151317428a3d17f341d1" dependencies = [ "anyhow", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 0a170c62..d11ab91c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -175,16 +175,21 @@ collab-rt = { path = "libs/collab-rt" } lto = true opt-level = 3 codegen-units = 1 +panic = 'unwind' [profile.profiling] inherits = "release" debug = true +panic = 'unwind' + +[profile.dev] +panic = 'unwind' [patch.crates-io] -collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "0970b2e1440134af7c83bb8fc80cac5d2dedebb7" } -collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "0970b2e1440134af7c83bb8fc80cac5d2dedebb7" } -collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "0970b2e1440134af7c83bb8fc80cac5d2dedebb7" } -collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "0970b2e1440134af7c83bb8fc80cac5d2dedebb7" } +collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "409058aad0969c4d4429151317428a3d17f341d1" } +collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "409058aad0969c4d4429151317428a3d17f341d1" } +collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "409058aad0969c4d4429151317428a3d17f341d1" } +collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "409058aad0969c4d4429151317428a3d17f341d1" } [features] custom_env= [] diff --git a/libs/client-api-test-util/src/test_client.rs b/libs/client-api-test-util/src/test_client.rs index ef7be2ec..a57d7085 100644 --- a/libs/client-api-test-util/src/test_client.rs +++ b/libs/client-api-test-util/src/test_client.rs @@ -6,7 +6,7 @@ use assert_json_diff::{ use bytes::Bytes; use client_api::collab_sync::{SinkConfig, SyncObject, SyncPlugin}; use client_api::ws::{WSClient, WSClientConfig}; -use collab::core::collab::MutexCollab; +use collab::core::collab::{DocStateSource, MutexCollab}; use collab::core::collab_plugin::EncodedCollab; use collab::core::collab_state::SyncState; use collab::core::origin::{CollabClient, CollabOrigin}; @@ -184,7 +184,7 @@ impl TestClient { Folder::from_collab_doc_state( uid, CollabOrigin::Empty, - data.doc_state.to_vec(), + DocStateSource::FromDocState(data.doc_state.to_vec()), &workspace_id, vec![], ) @@ -498,7 +498,7 @@ impl TestClient { MutexCollab::new_with_doc_state( origin.clone(), &object_id, - data.doc_state.to_vec(), + DocStateSource::FromDocState(data.doc_state.to_vec()), vec![], false, ) @@ -575,7 +575,14 @@ impl TestClient { let (sink, stream) = (handler.sink(), handler.stream()); let origin = CollabOrigin::Client(CollabClient::new(self.uid().await, self.device_id.clone())); let collab = Arc::new( - MutexCollab::new_with_doc_state(origin.clone(), object_id, doc_state, vec![], false).unwrap(), + MutexCollab::new_with_doc_state( + origin.clone(), + object_id, + DocStateSource::FromDocState(doc_state), + vec![], + false, + ) + .unwrap(), ); let ws_connect_state = self.ws_client.subscribe_connect_state(); @@ -660,9 +667,9 @@ pub async fn assert_server_snapshot( let json = Collab::new_with_doc_state( CollabOrigin::Empty, &object_id, - encoded_collab_v1.doc_state.to_vec(), + DocStateSource::FromDocState(encoded_collab_v1.doc_state.to_vec()), vec![], - false, + false, ) .unwrap() .to_json_value(); @@ -718,7 +725,7 @@ pub async fn assert_server_collab( let json = Collab::new_with_doc_state( CollabOrigin::Empty, &object_id, - data.doc_state.to_vec(), + DocStateSource::FromDocState(data.doc_state.to_vec()), vec![], false, ) @@ -839,7 +846,7 @@ pub async fn get_collab_json_from_server( Collab::new_with_doc_state( CollabOrigin::Empty, object_id, - bytes.doc_state.to_vec(), + DocStateSource::FromDocState(bytes.doc_state.to_vec()), vec![], false, ) diff --git a/libs/client-api/src/ws/msg_queue.rs b/libs/client-api/src/ws/msg_queue.rs index 1bdf6095..2a6b69cf 100644 --- a/libs/client-api/src/ws/msg_queue.rs +++ b/libs/client-api/src/ws/msg_queue.rs @@ -31,7 +31,6 @@ impl AggregateMessageQueue { pub async fn push(&self, msg: Vec) { let mut queue_guard = self.queue.lock().await; let mut seen_ids_guard = self.seen_ids.lock().await; - for msg in msg.into_iter() { if seen_ids_guard.insert(SeenId::from(&msg)) { queue_guard.push(msg); diff --git a/libs/collab-rt-entity/src/collab_msg.rs b/libs/collab-rt-entity/src/collab_msg.rs index 81008008..48cd2a09 100644 --- a/libs/collab-rt-entity/src/collab_msg.rs +++ b/libs/collab-rt-entity/src/collab_msg.rs @@ -1,5 +1,6 @@ use anyhow::{anyhow, Error}; use std::cmp::Ordering; +use std::fmt; use std::fmt::{Debug, Display, Formatter}; use std::hash::{Hash, Hasher}; @@ -12,8 +13,9 @@ use collab::preclude::updates::decoder::DecoderV1; use collab::preclude::updates::encoder::{Encode, Encoder, EncoderV1}; use collab_entity::CollabType; use collab_rt_protocol::{Message, MessageReader, SyncMessage}; -use serde::{Deserialize, Serialize}; -use serde_repr::{Deserialize_repr, Serialize_repr}; +use serde::de::Visitor; +use serde::{de, Deserialize, Deserializer, Serialize}; +use serde_repr::Serialize_repr; pub trait CollabSinkMessage: Clone + Send + Sync + 'static + Ord + Display { fn payload_size(&self) -> usize; @@ -454,13 +456,14 @@ impl Display for UpdateSync { } } -#[derive(Clone, Eq, PartialEq, Debug, Serialize_repr, Deserialize_repr, Hash)] +#[derive(Clone, Eq, PartialEq, Debug, Serialize_repr, Hash)] #[repr(u8)] pub enum AckCode { Success = 0, CannotApplyUpdate = 1, Retry = 2, Internal = 3, + EncodeState = 4, } /// ⚠️ ⚠️ ⚠️Compatibility Warning: @@ -476,8 +479,39 @@ pub struct CollabAck { pub object_id: String, pub source: AckSource, pub payload: Bytes, + #[serde(deserialize_with = "deserialize_ack_code")] pub code: AckCode, } +fn deserialize_ack_code<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + struct AckCodeVisitor; + + impl<'de> Visitor<'de> for AckCodeVisitor { + type Value = AckCode; + + fn expecting(&self, formatter: &mut Formatter) -> fmt::Result { + formatter.write_str("an integer between 0 and 4") + } + + fn visit_u8(self, value: u8) -> Result + where + E: de::Error, + { + match value { + 0 => Ok(AckCode::Success), + 1 => Ok(AckCode::CannotApplyUpdate), + 2 => Ok(AckCode::Retry), + 3 => Ok(AckCode::Internal), + 4 => Ok(AckCode::EncodeState), + _ => Ok(AckCode::Internal), + } + } + } + + deserializer.deserialize_u8(AckCodeVisitor) +} #[derive(Clone, Eq, PartialEq, Debug, Serialize, Deserialize, Hash)] pub struct AckSource { diff --git a/libs/collab-rt-protocol/src/message.rs b/libs/collab-rt-protocol/src/message.rs index 91c7cc1f..0b7d2af7 100644 --- a/libs/collab-rt-protocol/src/message.rs +++ b/libs/collab-rt-protocol/src/message.rs @@ -251,6 +251,9 @@ pub enum Error { #[error("{0}")] YrsApplyUpdate(String), + #[error("{0}")] + YrsEncodeState(String), + #[error(transparent)] BinCodeSerde(#[from] bincode::Error), diff --git a/libs/collab-rt-protocol/src/protocol.rs b/libs/collab-rt-protocol/src/protocol.rs index ef1ae1e5..50a8ce55 100644 --- a/libs/collab-rt-protocol/src/protocol.rs +++ b/libs/collab-rt-protocol/src/protocol.rs @@ -1,8 +1,10 @@ use collab::core::awareness::{Awareness, AwarenessUpdate}; -use collab::core::collab::TransactionMutExt; +use collab::core::collab::{TransactionExt, TransactionMutExt}; use collab::core::origin::CollabOrigin; use collab::core::transaction::TransactionRetry; + use collab::preclude::Collab; + use yrs::updates::decoder::Decode; use yrs::updates::encoder::{Encode, Encoder}; use yrs::{ReadTxn, StateVector, Transact, Update}; @@ -70,7 +72,10 @@ pub trait CollabSyncProtocol { .doc() .try_transact() .map_err(|err| Error::YrsTransaction(format!("fail to handle sync step1. error: {}", err)))? - .encode_state_as_update_v1(&sv); + .try_encode_state_as_update_v1(&sv) + .map_err(|err| { + Error::YrsEncodeState(format!("fail to encode state as update. error: {}", err)) + })?; Ok(Some( Message::Sync(SyncMessage::SyncStep2(update)).encode_v1(), )) @@ -91,6 +96,10 @@ pub trait CollabSyncProtocol { txn .try_apply_update(update) .map_err(|err| Error::YrsApplyUpdate(format!("sync step2 apply update: {}", err)))?; + + txn + .try_commit() + .map_err(|err| Error::YrsTransaction(format!("sync step2 transaction acquire: {}", err)))?; Ok(None) } diff --git a/libs/collab-rt/src/collaborate/group_broadcast.rs b/libs/collab-rt/src/collaborate/group_broadcast.rs index 609cfc9e..b14140ab 100644 --- a/libs/collab-rt/src/collaborate/group_broadcast.rs +++ b/libs/collab-rt/src/collaborate/group_broadcast.rs @@ -330,6 +330,7 @@ fn ack_code_from_error(error: &Error) -> AckCode { match error { Error::YrsTransaction(_) => AckCode::Retry, Error::YrsApplyUpdate(_) => AckCode::CannotApplyUpdate, + Error::YrsEncodeState(_) => AckCode::EncodeState, _ => AckCode::Internal, } } diff --git a/libs/collab-rt/src/collaborate/plugin.rs b/libs/collab-rt/src/collaborate/plugin.rs index e7efaef3..3eb75b9b 100644 --- a/libs/collab-rt/src/collaborate/plugin.rs +++ b/libs/collab-rt/src/collaborate/plugin.rs @@ -7,7 +7,7 @@ use crate::RealtimeAccessControl; use anyhow::anyhow; use collab::core::awareness::{AwarenessUpdate, Event}; -use collab::core::collab::TransactionMutExt; +use collab::core::collab::{DocStateSource, TransactionMutExt}; use collab::core::collab_plugin::EncodedCollab; use collab::core::origin::CollabOrigin; use collab::core::transaction::DocTransactionExtension; @@ -273,7 +273,7 @@ where if let Ok(collab) = Collab::new_with_doc_state( CollabOrigin::Empty, object_id, - encoded_collab.doc_state.to_vec(), + DocStateSource::FromDocState(encoded_collab.doc_state.to_vec()), vec![], false, ) { diff --git a/libs/collab-rt/src/collaborate/sync_protocol.rs b/libs/collab-rt/src/collaborate/sync_protocol.rs index 9abfb828..beff77f7 100644 --- a/libs/collab-rt/src/collaborate/sync_protocol.rs +++ b/libs/collab-rt/src/collaborate/sync_protocol.rs @@ -1,4 +1,5 @@ use collab::core::awareness::Awareness; +use collab::core::collab::TransactionExt; use collab_rt_protocol::CollabSyncProtocol; use collab_rt_protocol::{CustomMessage, Error, Message, SyncMessage}; use yrs::updates::encoder::{Encode, Encoder, EncoderV1}; @@ -12,8 +13,14 @@ impl CollabSyncProtocol for ServerSyncProtocol { awareness: &Awareness, sv: StateVector, ) -> Result>, Error> { - let txn = awareness.doc().transact(); - let client_step2_update = txn.encode_state_as_update_v1(&sv); + let txn = awareness + .doc() + .try_transact() + .map_err(|err| Error::YrsTransaction(format!("fail to handle sync step1. error: {}", err)))?; + + let client_step2_update = txn.try_encode_state_as_update_v1(&sv).map_err(|err| { + Error::YrsEncodeState(format!("fail to encode state as update. error: {}", err)) + })?; // Retrieve the latest document state from the client after they return online from offline editing. let server_step1_update = txn.state_vector(); diff --git a/src/biz/collab/cache.rs b/src/biz/collab/cache.rs index 2be21091..9fe1ca88 100644 --- a/src/biz/collab/cache.rs +++ b/src/biz/collab/cache.rs @@ -42,7 +42,11 @@ impl CollabCache { ) -> Result { self.total_attempts.fetch_add(1, Ordering::Relaxed); // Attempt to retrieve encoded collab from memory cache, falling back to disk cache if necessary. - if let Some(encoded_collab) = self.mem_cache.get_encode_collab(¶ms.object_id).await { + if let Some(encoded_collab) = self + .mem_cache + .get_encode_collab_from_mem(¶ms.object_id) + .await + { event!( Level::DEBUG, "Get encoded collab:{} from cache", @@ -54,7 +58,10 @@ impl CollabCache { // Retrieve from disk cache as fallback. After retrieval, the value is inserted into the memory cache. let object_id = params.object_id.clone(); - let encoded_collab = self.disk_cache.get_collab_encoded(uid, params).await?; + let encoded_collab = self + .disk_cache + .get_collab_encoded_from_disk(uid, params) + .await?; self .mem_cache .insert_encode_collab(object_id, &encoded_collab) diff --git a/src/biz/collab/disk_cache.rs b/src/biz/collab/disk_cache.rs index d0e142ff..ee22c254 100644 --- a/src/biz/collab/disk_cache.rs +++ b/src/biz/collab/disk_cache.rs @@ -10,7 +10,7 @@ use sqlx::{PgPool, Transaction}; use std::collections::HashMap; use std::time::Duration; use tokio::time::sleep; -use tracing::{event, Level}; +use tracing::{event, instrument, Level}; #[derive(Clone)] pub struct CollabDiskCache { @@ -38,7 +38,8 @@ impl CollabDiskCache { Ok(()) } - pub async fn get_collab_encoded( + #[instrument(level = "trace", skip_all)] + pub async fn get_collab_encoded_from_disk( &self, _uid: &i64, params: QueryCollabParams, diff --git a/src/biz/collab/mem_cache.rs b/src/biz/collab/mem_cache.rs index f64f47aa..d19515b1 100644 --- a/src/biz/collab/mem_cache.rs +++ b/src/biz/collab/mem_cache.rs @@ -6,7 +6,7 @@ use anyhow::anyhow; use app_error::AppError; use std::sync::Arc; use tokio::sync::Mutex; -use tracing::{error, trace}; +use tracing::{error, instrument, trace}; #[derive(Clone)] pub struct CollabMemCache { @@ -51,7 +51,8 @@ impl CollabMemCache { } } - pub async fn get_encode_collab(&self, object_id: &str) -> Option { + #[instrument(level = "trace", skip_all)] + pub async fn get_encode_collab_from_mem(&self, object_id: &str) -> Option { match self.get_encode_collab_bytes(object_id).await { Some(bytes) => match EncodedCollab::decode_from_bytes(&bytes) { Ok(encoded_collab) => Some(encoded_collab), @@ -70,7 +71,9 @@ impl CollabMemCache { } } + #[instrument(level = "trace", skip_all, fields(object_id=%object_id))] pub async fn insert_encode_collab(&self, object_id: String, encoded_collab: &EncodedCollab) { + trace!("Inserting encode collab into cache: {}", object_id); match encoded_collab.encode_to_bytes() { Ok(bytes) => { if let Err(err) = self.set_bytes_in_redis(object_id, bytes).await { diff --git a/src/biz/collab/storage.rs b/src/biz/collab/storage.rs index 3a595997..8f2fa3f6 100644 --- a/src/biz/collab/storage.rs +++ b/src/biz/collab/storage.rs @@ -5,6 +5,7 @@ use crate::biz::snapshot::SnapshotControl; use anyhow::Context; use app_error::AppError; use async_trait::async_trait; +use collab::core::collab::DocStateSource; use collab::core::collab_plugin::EncodedCollab; use collab::core::origin::CollabOrigin; use collab::preclude::Collab; @@ -120,7 +121,10 @@ where // Await the response from the realtime server with a timeout match timeout(timeout_duration, rx).await { Ok(Ok(Some(encode_collab))) => Some(encode_collab), - Ok(Ok(None)) => None, + Ok(Ok(None)) => { + trace!("No encode collab found in editing collab"); + None + }, Ok(Err(err)) => { error!("Failed to get encode collab from realtime server: {}", err); None @@ -313,7 +317,7 @@ pub fn check_encoded_collab_data(object_id: &str, data: &[u8]) -> Result<(), any let _ = Collab::new_with_doc_state( CollabOrigin::Empty, object_id, - encoded_collab.doc_state.to_vec(), + DocStateSource::FromDocState(encoded_collab.doc_state.to_vec()), vec![], false, )?; diff --git a/tests/collab_snapshot/snapshot_test.rs b/tests/collab_snapshot/snapshot_test.rs index 3399066c..39a59afa 100644 --- a/tests/collab_snapshot/snapshot_test.rs +++ b/tests/collab_snapshot/snapshot_test.rs @@ -1,5 +1,5 @@ use assert_json_diff::assert_json_eq; -use collab::core::collab::MutexCollab; +use collab::core::collab::{DocStateSource, MutexCollab}; use collab::core::collab_plugin::EncodedCollab; use collab::core::origin::CollabOrigin; use collab::preclude::Collab; @@ -85,7 +85,7 @@ async fn get_snapshot_data_test() { let collab = MutexCollab::new_with_doc_state( CollabOrigin::Empty, &object_id, - encoded_collab.doc_state.to_vec(), + DocStateSource::FromDocState(encoded_collab.doc_state.to_vec()), vec![], false, )