chore: failure in embedding fetch should block collab creation

This commit is contained in:
Bartosz Sypytkowski 2024-06-28 06:11:19 +02:00
parent 9ea085d6db
commit c1f8c486fd
1 changed files with 67 additions and 29 deletions

View File

@ -7,6 +7,7 @@ use anyhow::{anyhow, Context};
use bytes::BytesMut;
use collab::entity::EncodedCollab;
use collab_entity::CollabType;
use futures_util::future::try_join_all;
use prost::Message as ProstMessage;
use sqlx::types::uuid;
use tokio::time::Instant;
@ -19,6 +20,7 @@ use validator::Validate;
use access_control::collab::CollabAccessControl;
use app_error::AppError;
use appflowy_collaborate::actix_ws::entities::ClientStreamMessage;
use appflowy_collaborate::indexer::IndexerProvider;
use authentication::jwt::UserUuid;
use collab_rt_entity::realtime_proto::HttpRealtimeMessage;
use collab_rt_entity::RealtimeMessage;
@ -512,10 +514,18 @@ async fn create_collab_handler(
.can_index_workspace(&workspace_id)
.await?
{
params.embeddings = state
match state
.indexer_provider
.create_collab_embeddings(&params)
.await?;
.await
{
Ok(embeddings) => params.embeddings = embeddings,
Err(err) => tracing::warn!(
"failed to fetch embeddings for document {}: {}",
params.object_id,
err
),
}
}
let mut transaction = state
@ -609,11 +619,20 @@ async fn batch_create_collab_handler(
if collab_params_list.is_empty() {
return Err(AppError::InvalidRequest("Empty collab params list".to_string()).into());
}
let can_index = state
if state
.indexer_provider
.can_index_workspace(&workspace_id)
.await?;
for mut params in collab_params_list {
.await?
{
if let Err(err) = fetch_embeddings(&state.indexer_provider, &mut collab_params_list).await {
tracing::warn!(
"failed to fetch embeddings for {} new documents: {}",
collab_params_list.len(),
err
);
}
}
for params in collab_params_list {
let object_id = params.object_id.clone();
if validate_encode_collab(
&params.object_id,
@ -623,15 +642,6 @@ async fn batch_create_collab_handler(
.await
.is_ok()
{
params.embeddings = if can_index {
state
.indexer_provider
.create_collab_embeddings(&params)
.await?
} else {
None
};
state
.collab_access_control_storage
.insert_new_collab(&workspace_id, &uid, params)
@ -691,24 +701,29 @@ async fn create_collab_list_handler(
if valid_items.is_empty() {
return Err(AppError::InvalidRequest("Empty collab params list".to_string()).into());
}
if state
.indexer_provider
.can_index_workspace(&workspace_id)
.await?
{
if let Err(err) = fetch_embeddings(&state.indexer_provider, &mut valid_items).await {
tracing::warn!(
"failed to fetch embeddings for {} new documents: {}",
valid_items.len(),
err
);
}
}
let mut transaction = state
.pg_pool
.begin()
.await
.map_err(|err| AppError::Internal(anyhow!("Failed to start inserting collab: {}", err)))?;
let can_index = state
.indexer_provider
.can_index_workspace(&workspace_id)
.await?;
for mut params in valid_items {
for params in valid_items {
let _object_id = params.object_id.clone();
if can_index {
params.embeddings = state
.indexer_provider
.create_collab_embeddings(&params)
.await?;
}
state
.collab_access_control_storage
.insert_new_collab_with_transaction(&workspace_id, &uid, params, &mut transaction)
@ -896,11 +911,16 @@ async fn update_collab_handler(
.can_index_workspace(&workspace_id)
.await?
{
let encoded_collab = EncodedCollab::decode_from_bytes(&params.encoded_collab_v1)
let encoded = EncodedCollab::decode_from_bytes(&params.encoded_collab_v1)
.map_err(|e| AppError::Internal(e.into()))?;
params.embeddings = indexer
.index_encoded(&params.object_id, encoded_collab)
.await?;
match indexer.index_encoded(&params.object_id, encoded).await {
Ok(embeddings) => params.embeddings = embeddings,
Err(err) => tracing::warn!(
"failed to fetch embeddings for document {}: {}",
params.object_id,
err
),
}
}
}
state
@ -1256,3 +1276,21 @@ async fn parser_realtime_msg(
))),
}
}
async fn fetch_embeddings(
indexer_provider: &IndexerProvider,
params: &mut [CollabParams],
) -> Result<(), AppError> {
let mut futures = Vec::with_capacity(params.len());
for param in params.iter() {
let future = indexer_provider.create_collab_embeddings(param);
futures.push(future);
}
let results = try_join_all(futures).await?;
for (i, embeddings) in results.into_iter().enumerate() {
params[i].embeddings = embeddings;
}
Ok(())
}