From 52782033948b7d243693ca159ea519d53458c8a6 Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Fri, 30 Aug 2024 11:30:15 +0800 Subject: [PATCH] =?UTF-8?q?chore:=20decode=20update=20when=20spawn=20block?= =?UTF-8?q?ing=20when=20update=20size=20exceeds=20the=E2=80=A6=20(#769)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * chore: decode update when spawn blocking when update size exceeds the threshold size * chore: use 1mb as threshold --- libs/collab-rt-protocol/src/protocol.rs | 19 +++++++++++++++++-- tests/yrs_version/document_test.rs | 3 +-- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/libs/collab-rt-protocol/src/protocol.rs b/libs/collab-rt-protocol/src/protocol.rs index 7ae93421..956a980e 100644 --- a/libs/collab-rt-protocol/src/protocol.rs +++ b/libs/collab-rt-protocol/src/protocol.rs @@ -6,6 +6,7 @@ use collab::core::collab::{TransactionExt, TransactionMutExt}; use collab::core::origin::CollabOrigin; use collab::lock::RwLock; use collab::preclude::Collab; +use tokio::task::spawn_blocking; use yrs::updates::decoder::Decode; use yrs::updates::encoder::{Encode, Encoder}; use yrs::{ReadTxn, StateVector, Transact, Update}; @@ -194,6 +195,20 @@ pub trait CollabSyncProtocol { } } +const LARGE_UPDATE_THRESHOLD: usize = 1024 * 1024; // 1MB + +#[inline] +async fn decode_update(update: Vec) -> Result { + let update = if update.len() > LARGE_UPDATE_THRESHOLD { + spawn_blocking(move || Update::decode_v1(&update)) + .await + .map_err(|err| RTProtocolError::Internal(err.into()))? + } else { + Update::decode_v1(&update) + }?; + Ok(update) +} + /// Handles incoming messages from the client/server pub async fn handle_message_follow_protocol

( message_origin: &CollabOrigin, @@ -214,14 +229,14 @@ where Ok(update) }, SyncMessage::SyncStep2(update) => { - let update = Update::decode_v1(&update)?; + let update = decode_update(update).await?; let mut lock = collab.write().await; let collab = (*lock).borrow_mut(); protocol.handle_sync_step2(message_origin, collab.get_mut_awareness(), update)?; Ok(None) }, SyncMessage::Update(update) => { - let update = Update::decode_v1(&update)?; + let update = decode_update(update).await?; let mut lock = collab.write().await; let collab = (*lock).borrow_mut(); protocol.handle_update(message_origin, collab.get_mut_awareness(), update)?; diff --git a/tests/yrs_version/document_test.rs b/tests/yrs_version/document_test.rs index 425a8f2d..34e53a14 100644 --- a/tests/yrs_version/document_test.rs +++ b/tests/yrs_version/document_test.rs @@ -1,10 +1,9 @@ +use crate::yrs_version::util::read_bytes_from_file; use collab::core::collab::DataSource; use collab::core::origin::CollabOrigin; use collab::entity::EncodedCollab; use collab_document::document::Document; -use crate::yrs_version::util::read_bytes_from_file; - /// Load collaboration data that was encoded using Yjs version 0.17. #[test] fn load_yrs_0172_version_get_started_document_using_current_yrs_version() {