From 54b811125eabdbdf2e3fbb7a65ca52588ad8bdf3 Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Thu, 29 Aug 2024 22:53:33 +0800 Subject: [PATCH] chore: wrap spawn blocking when decoding collab (#767) --- Cargo.lock | 1 + libs/collab-stream/Cargo.toml | 1 + libs/collab-stream/src/model.rs | 25 +++++++++++-------- .../src/group/persistence.rs | 10 ++++++-- services/appflowy-history/src/biz/history.rs | 11 ++++---- .../appflowy-history/src/core/open_handle.rs | 12 ++++++--- 6 files changed, 39 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4ebd4523..7546a91e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2245,6 +2245,7 @@ version = "0.1.0" dependencies = [ "anyhow", "bincode", + "bytes", "chrono", "collab-entity", "futures", diff --git a/libs/collab-stream/Cargo.toml b/libs/collab-stream/Cargo.toml index 55986d69..f99afdd6 100644 --- a/libs/collab-stream/Cargo.toml +++ b/libs/collab-stream/Cargo.toml @@ -15,6 +15,7 @@ futures = "0.3.30" tracing = "0.1" serde = { version = "1", features = ["derive"] } bincode = "1.3.3" +bytes.workspace = true collab-entity.workspace = true serde_json.workspace = true chrono = "0.4" diff --git a/libs/collab-stream/src/model.rs b/libs/collab-stream/src/model.rs index ec880132..0ee436a4 100644 --- a/libs/collab-stream/src/model.rs +++ b/libs/collab-stream/src/model.rs @@ -1,15 +1,15 @@ +use crate::error::{internal, StreamError}; +use bytes::Bytes; use collab_entity::proto::collab::collab_update_event::Update; use collab_entity::{proto, CollabType}; -use std::collections::BTreeMap; -use std::fmt::{Display, Formatter}; -use std::ops::Deref; -use std::str::FromStr; - -use crate::error::{internal, StreamError}; use prost::Message; use redis::streams::StreamId; use redis::{FromRedisValue, RedisError, RedisResult, Value}; use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; +use std::fmt::{Display, Formatter}; +use std::ops::Deref; +use std::str::FromStr; /// The [MessageId] generated by XADD has two parts: a timestamp and a sequence number, separated by /// a hyphen (-). The timestamp is based on the server's time when the message is added, and the @@ -20,7 +20,7 @@ use serde::{Deserialize, Serialize}; /// /// An example message ID might look like this: 1631020452097-0. In this example, 1631020452097 is /// the timestamp in milliseconds, and 0 is the sequence number. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct MessageId { pub timestamp_ms: u64, pub sequence_number: u16, @@ -120,9 +120,9 @@ impl FromRedisValue for StreamMessageByStreamKey { } /// A message in the Redis stream. It's the same as [StreamBinary] but with additional metadata. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct StreamMessage { - pub data: Vec, + pub data: Bytes, /// only applicable when reading from redis pub id: MessageId, } @@ -158,7 +158,10 @@ impl FromRedisValue for StreamMessage { verify_field(&fields[0], "data")?; let raw_data = Vec::::from_redis_value(&fields[1])?; - Ok(StreamMessage { data: raw_data, id }) + Ok(StreamMessage { + data: Bytes::from(raw_data), + id, + }) } } @@ -179,7 +182,7 @@ pub struct StreamBinary(pub Vec); impl From for StreamBinary { fn from(m: StreamMessage) -> Self { - Self(m.data) + Self(m.data.to_vec()) } } diff --git a/services/appflowy-collaborate/src/group/persistence.rs b/services/appflowy-collaborate/src/group/persistence.rs index 083d1001..2968330b 100644 --- a/services/appflowy-collaborate/src/group/persistence.rs +++ b/services/appflowy-collaborate/src/group/persistence.rs @@ -124,9 +124,15 @@ where }; let params = { - let lock = collab.read().await; - let mut params = get_encode_collab(&workspace_id, &object_id, &lock, &collab_type)?; + let cloned_collab = collab.clone(); + let (workspace_id, mut params, object_id) = tokio::task::spawn_blocking(move || { + let collab = cloned_collab.blocking_read(); + let params = get_encode_collab(&workspace_id, &object_id, &collab, &collab_type)?; + Ok::<_, AppError>((workspace_id, params, object_id)) + }) + .await??; + let lock = collab.read().await; if let Some(indexer) = &self.indexer { match indexer.embedding_params(&lock) { Ok(embedding_params) => { diff --git a/services/appflowy-history/src/biz/history.rs b/services/appflowy-history/src/biz/history.rs index 167f4e05..f2414030 100644 --- a/services/appflowy-history/src/biz/history.rs +++ b/services/appflowy-history/src/biz/history.rs @@ -64,14 +64,15 @@ impl CollabHistory { return Ok(None); } trace!("[History] prepare to save snapshots to disk"); - let (doc_state, state_vector) = { - let lock = collab.read().await; + let (doc_state, state_vector) = tokio::task::spawn_blocking(move || { + let lock = collab.blocking_read(); let txn = lock.transact(); - // TODO(nathan): reduce the size of doc_state_v2 by encoding the previous [CollabStateSnapshot] doc_state_v2 let doc_state_v2 = txn.encode_state_as_update_v2(&StateVector::default()); let state_vector = txn.state_vector(); - (doc_state_v2, state_vector) - }; + Ok::<_, HistoryError>((doc_state_v2, state_vector)) + }) + .await + .map_err(|err| HistoryError::Internal(err.into()))??; let state = CollabSnapshotState::new( object_id, diff --git a/services/appflowy-history/src/core/open_handle.rs b/services/appflowy-history/src/core/open_handle.rs index 6451b142..7a90edf5 100644 --- a/services/appflowy-history/src/core/open_handle.rs +++ b/services/appflowy-history/src/core/open_handle.rs @@ -183,9 +183,15 @@ async fn process_messages( _object_id: &str, _collab_type: &CollabType, ) -> Result<(), HistoryError> { - let mut lock = collab.write().await; - apply_updates(&messages, &mut lock)?; - drop(lock); + let cloned_message = messages.clone(); + tokio::task::spawn_blocking(move || { + let mut lock = collab.blocking_write(); + apply_updates(&cloned_message, &mut lock)?; + drop(lock); + Ok::<_, HistoryError>(()) + }) + .await + .map_err(|e| HistoryError::Internal(e.into()))??; update_stream.ack_messages(&messages).await?; Ok(()) }