chore: decode encode collab using spawn blocking (#781)

This commit is contained in:
Nathan.fooo 2024-09-02 20:27:40 +08:00 committed by GitHub
parent abd96d8b56
commit 299680c14a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 37 additions and 25 deletions

View File

@ -0,0 +1,16 @@
use anyhow::anyhow;
use app_error::AppError;
use collab::entity::EncodedCollab;
#[inline]
pub(crate) async fn encode_collab_from_bytes(bytes: Vec<u8>) -> Result<EncodedCollab, AppError> {
// FIXME: Implement metrics to determine the appropriate data size for decoding in a blocking task.
tokio::task::spawn_blocking(move || match EncodedCollab::decode_from_bytes(&bytes) {
Ok(encoded_collab) => Ok(encoded_collab),
Err(err) => Err(AppError::Internal(anyhow!(
"Failed to decode collab from bytes: {:?}",
err
))),
})
.await?
}

View File

@ -1,7 +1,6 @@
use std::collections::HashMap;
use std::time::Duration;
use anyhow::anyhow;
use collab::entity::EncodedCollab;
use collab_entity::CollabType;
use sqlx::{Error, PgPool, Transaction};
@ -9,6 +8,7 @@ use tokio::time::sleep;
use tracing::{event, instrument, Level};
use uuid::Uuid;
use crate::collab::decode_util::encode_collab_from_bytes;
use app_error::AppError;
use database::collab::{
batch_select_collab_blob, insert_into_af_collab, is_collab_exists, select_blob_from_af_collab,
@ -92,12 +92,7 @@ impl CollabDiskCache {
match result {
Ok(data) => {
return tokio::task::spawn_blocking(move || {
EncodedCollab::decode_from_bytes(&data).map_err(|err| {
AppError::Internal(anyhow!("fail to decode data to EncodedCollab: {:?}", err))
})
})
.await?;
return encode_collab_from_bytes(data).await;
},
Err(e) => {
match e {

View File

@ -3,11 +3,11 @@ use collab::entity::EncodedCollab;
use redis::{pipe, AsyncCommands};
use tracing::{error, instrument, trace};
use crate::collab::decode_util::encode_collab_from_bytes;
use crate::state::RedisConnectionManager;
use app_error::AppError;
use database::collab::CollabMetadata;
use crate::state::RedisConnectionManager;
const SEVEN_DAYS: i64 = 604800;
const ONE_MONTH: u64 = 2592000;
#[derive(Clone)]
@ -96,13 +96,7 @@ impl CollabMemCache {
#[instrument(level = "trace", skip_all)]
pub async fn get_encode_collab(&self, object_id: &str) -> Option<EncodedCollab> {
match self.get_encode_collab_data(object_id).await {
Some(bytes) => match EncodedCollab::decode_from_bytes(&bytes) {
Ok(encoded_collab) => Some(encoded_collab),
Err(err) => {
error!("Failed to decode collab from redis cache bytes: {:?}", err);
None
},
},
Some(bytes) => encode_collab_from_bytes(bytes).await.ok(),
None => {
trace!(
"No encoded collab found in cache for object_id: {}",

View File

@ -1,5 +1,6 @@
pub mod access_control;
pub mod cache;
mod decode_util;
pub mod disk_cache;
pub mod mem_cache;
pub mod notification;
@ -7,4 +8,5 @@ pub mod queue;
mod queue_redis_ops;
pub mod storage;
pub mod validator;
pub use queue_redis_ops::{PendingWrite, RedisSortedSet, WritePriority};

View File

@ -920,22 +920,27 @@ async fn update_collab_handler(
.can_index_workspace(&workspace_id)
.await?
{
let encoded = EncodedCollab::decode_from_bytes(&params.encoded_collab_v1).map_err(|err| {
AppError::InvalidRequest(format!(
"Failed to decode collab `{}`: {}",
params.object_id, err
))
})?;
match indexer.index(&params.object_id, encoded).await {
Ok(embeddings) => params.embeddings = embeddings,
let (encoded, mut mut_params) = tokio::task::spawn_blocking(move || {
EncodedCollab::decode_from_bytes(&params.encoded_collab_v1)
.map(|encoded_collab| (encoded_collab, params))
.map_err(|err| AppError::InvalidRequest(format!("Failed to decode collab `{}", err)))
})
.await
.map_err(|err| AppError::Internal(err.into()))??;
match indexer.index(&mut_params.object_id, encoded).await {
Ok(embeddings) => mut_params.embeddings = embeddings,
Err(err) => tracing::warn!(
"failed to fetch embeddings for document {}: {}",
params.object_id,
mut_params.object_id,
err
),
}
params = mut_params;
}
}
state
.collab_access_control_storage
.insert_or_update_collab(&workspace_id, &uid, params, false)