From 299680c14a3ab9dbaa6abfa154fc461d1a93eea8 Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Mon, 2 Sep 2024 20:27:40 +0800 Subject: [PATCH] chore: decode encode collab using spawn blocking (#781) --- .../src/collab/decode_util.rs | 16 +++++++++++++ .../src/collab/disk_cache.rs | 9 ++------ .../src/collab/mem_cache.rs | 12 +++------- .../appflowy-collaborate/src/collab/mod.rs | 2 ++ src/api/workspace.rs | 23 +++++++++++-------- 5 files changed, 37 insertions(+), 25 deletions(-) create mode 100644 services/appflowy-collaborate/src/collab/decode_util.rs diff --git a/services/appflowy-collaborate/src/collab/decode_util.rs b/services/appflowy-collaborate/src/collab/decode_util.rs new file mode 100644 index 00000000..2ce1eecb --- /dev/null +++ b/services/appflowy-collaborate/src/collab/decode_util.rs @@ -0,0 +1,16 @@ +use anyhow::anyhow; +use app_error::AppError; +use collab::entity::EncodedCollab; + +#[inline] +pub(crate) async fn encode_collab_from_bytes(bytes: Vec) -> Result { + // FIXME: Implement metrics to determine the appropriate data size for decoding in a blocking task. + tokio::task::spawn_blocking(move || match EncodedCollab::decode_from_bytes(&bytes) { + Ok(encoded_collab) => Ok(encoded_collab), + Err(err) => Err(AppError::Internal(anyhow!( + "Failed to decode collab from bytes: {:?}", + err + ))), + }) + .await? +} diff --git a/services/appflowy-collaborate/src/collab/disk_cache.rs b/services/appflowy-collaborate/src/collab/disk_cache.rs index 040d09d1..e5619cbb 100644 --- a/services/appflowy-collaborate/src/collab/disk_cache.rs +++ b/services/appflowy-collaborate/src/collab/disk_cache.rs @@ -1,7 +1,6 @@ use std::collections::HashMap; use std::time::Duration; -use anyhow::anyhow; use collab::entity::EncodedCollab; use collab_entity::CollabType; use sqlx::{Error, PgPool, Transaction}; @@ -9,6 +8,7 @@ use tokio::time::sleep; use tracing::{event, instrument, Level}; use uuid::Uuid; +use crate::collab::decode_util::encode_collab_from_bytes; use app_error::AppError; use database::collab::{ batch_select_collab_blob, insert_into_af_collab, is_collab_exists, select_blob_from_af_collab, @@ -92,12 +92,7 @@ impl CollabDiskCache { match result { Ok(data) => { - return tokio::task::spawn_blocking(move || { - EncodedCollab::decode_from_bytes(&data).map_err(|err| { - AppError::Internal(anyhow!("fail to decode data to EncodedCollab: {:?}", err)) - }) - }) - .await?; + return encode_collab_from_bytes(data).await; }, Err(e) => { match e { diff --git a/services/appflowy-collaborate/src/collab/mem_cache.rs b/services/appflowy-collaborate/src/collab/mem_cache.rs index 26b0f698..22bbb41f 100644 --- a/services/appflowy-collaborate/src/collab/mem_cache.rs +++ b/services/appflowy-collaborate/src/collab/mem_cache.rs @@ -3,11 +3,11 @@ use collab::entity::EncodedCollab; use redis::{pipe, AsyncCommands}; use tracing::{error, instrument, trace}; +use crate::collab::decode_util::encode_collab_from_bytes; +use crate::state::RedisConnectionManager; use app_error::AppError; use database::collab::CollabMetadata; -use crate::state::RedisConnectionManager; - const SEVEN_DAYS: i64 = 604800; const ONE_MONTH: u64 = 2592000; #[derive(Clone)] @@ -96,13 +96,7 @@ impl CollabMemCache { #[instrument(level = "trace", skip_all)] pub async fn get_encode_collab(&self, object_id: &str) -> Option { match self.get_encode_collab_data(object_id).await { - Some(bytes) => match EncodedCollab::decode_from_bytes(&bytes) { - Ok(encoded_collab) => Some(encoded_collab), - Err(err) => { - error!("Failed to decode collab from redis cache bytes: {:?}", err); - None - }, - }, + Some(bytes) => encode_collab_from_bytes(bytes).await.ok(), None => { trace!( "No encoded collab found in cache for object_id: {}", diff --git a/services/appflowy-collaborate/src/collab/mod.rs b/services/appflowy-collaborate/src/collab/mod.rs index 3480b52b..4fc931e9 100644 --- a/services/appflowy-collaborate/src/collab/mod.rs +++ b/services/appflowy-collaborate/src/collab/mod.rs @@ -1,5 +1,6 @@ pub mod access_control; pub mod cache; +mod decode_util; pub mod disk_cache; pub mod mem_cache; pub mod notification; @@ -7,4 +8,5 @@ pub mod queue; mod queue_redis_ops; pub mod storage; pub mod validator; + pub use queue_redis_ops::{PendingWrite, RedisSortedSet, WritePriority}; diff --git a/src/api/workspace.rs b/src/api/workspace.rs index 2c1c628d..c3250021 100644 --- a/src/api/workspace.rs +++ b/src/api/workspace.rs @@ -920,22 +920,27 @@ async fn update_collab_handler( .can_index_workspace(&workspace_id) .await? { - let encoded = EncodedCollab::decode_from_bytes(¶ms.encoded_collab_v1).map_err(|err| { - AppError::InvalidRequest(format!( - "Failed to decode collab `{}`: {}", - params.object_id, err - )) - })?; - match indexer.index(¶ms.object_id, encoded).await { - Ok(embeddings) => params.embeddings = embeddings, + let (encoded, mut mut_params) = tokio::task::spawn_blocking(move || { + EncodedCollab::decode_from_bytes(¶ms.encoded_collab_v1) + .map(|encoded_collab| (encoded_collab, params)) + .map_err(|err| AppError::InvalidRequest(format!("Failed to decode collab `{}", err))) + }) + .await + .map_err(|err| AppError::Internal(err.into()))??; + + match indexer.index(&mut_params.object_id, encoded).await { + Ok(embeddings) => mut_params.embeddings = embeddings, Err(err) => tracing::warn!( "failed to fetch embeddings for document {}: {}", - params.object_id, + mut_params.object_id, err ), } + + params = mut_params; } } + state .collab_access_control_storage .insert_or_update_collab(&workspace_id, &uid, params, false)