diff --git a/src/api/workspace.rs b/src/api/workspace.rs index c6b858ca..e0f48459 100644 --- a/src/api/workspace.rs +++ b/src/api/workspace.rs @@ -7,6 +7,7 @@ use anyhow::{anyhow, Context}; use bytes::BytesMut; use collab::entity::EncodedCollab; use collab_entity::CollabType; +use futures_util::future::try_join_all; use prost::Message as ProstMessage; use sqlx::types::uuid; use tokio::time::Instant; @@ -19,6 +20,7 @@ use validator::Validate; use access_control::collab::CollabAccessControl; use app_error::AppError; use appflowy_collaborate::actix_ws::entities::ClientStreamMessage; +use appflowy_collaborate::indexer::IndexerProvider; use authentication::jwt::UserUuid; use collab_rt_entity::realtime_proto::HttpRealtimeMessage; use collab_rt_entity::RealtimeMessage; @@ -512,10 +514,18 @@ async fn create_collab_handler( .can_index_workspace(&workspace_id) .await? { - params.embeddings = state + match state .indexer_provider .create_collab_embeddings(¶ms) - .await?; + .await + { + Ok(embeddings) => params.embeddings = embeddings, + Err(err) => tracing::warn!( + "failed to fetch embeddings for document {}: {}", + params.object_id, + err + ), + } } let mut transaction = state @@ -609,11 +619,20 @@ async fn batch_create_collab_handler( if collab_params_list.is_empty() { return Err(AppError::InvalidRequest("Empty collab params list".to_string()).into()); } - let can_index = state + if state .indexer_provider .can_index_workspace(&workspace_id) - .await?; - for mut params in collab_params_list { + .await? + { + if let Err(err) = fetch_embeddings(&state.indexer_provider, &mut collab_params_list).await { + tracing::warn!( + "failed to fetch embeddings for {} new documents: {}", + collab_params_list.len(), + err + ); + } + } + for params in collab_params_list { let object_id = params.object_id.clone(); if validate_encode_collab( ¶ms.object_id, @@ -623,15 +642,6 @@ async fn batch_create_collab_handler( .await .is_ok() { - params.embeddings = if can_index { - state - .indexer_provider - .create_collab_embeddings(¶ms) - .await? - } else { - None - }; - state .collab_access_control_storage .insert_new_collab(&workspace_id, &uid, params) @@ -691,24 +701,29 @@ async fn create_collab_list_handler( if valid_items.is_empty() { return Err(AppError::InvalidRequest("Empty collab params list".to_string()).into()); } + + if state + .indexer_provider + .can_index_workspace(&workspace_id) + .await? + { + if let Err(err) = fetch_embeddings(&state.indexer_provider, &mut valid_items).await { + tracing::warn!( + "failed to fetch embeddings for {} new documents: {}", + valid_items.len(), + err + ); + } + } + let mut transaction = state .pg_pool .begin() .await .map_err(|err| AppError::Internal(anyhow!("Failed to start inserting collab: {}", err)))?; - let can_index = state - .indexer_provider - .can_index_workspace(&workspace_id) - .await?; - for mut params in valid_items { + for params in valid_items { let _object_id = params.object_id.clone(); - if can_index { - params.embeddings = state - .indexer_provider - .create_collab_embeddings(¶ms) - .await?; - } state .collab_access_control_storage .insert_new_collab_with_transaction(&workspace_id, &uid, params, &mut transaction) @@ -896,11 +911,16 @@ async fn update_collab_handler( .can_index_workspace(&workspace_id) .await? { - let encoded_collab = EncodedCollab::decode_from_bytes(¶ms.encoded_collab_v1) + let encoded = EncodedCollab::decode_from_bytes(¶ms.encoded_collab_v1) .map_err(|e| AppError::Internal(e.into()))?; - params.embeddings = indexer - .index_encoded(¶ms.object_id, encoded_collab) - .await?; + match indexer.index_encoded(¶ms.object_id, encoded).await { + Ok(embeddings) => params.embeddings = embeddings, + Err(err) => tracing::warn!( + "failed to fetch embeddings for document {}: {}", + params.object_id, + err + ), + } } } state @@ -1256,3 +1276,21 @@ async fn parser_realtime_msg( ))), } } + +async fn fetch_embeddings( + indexer_provider: &IndexerProvider, + params: &mut [CollabParams], +) -> Result<(), AppError> { + let mut futures = Vec::with_capacity(params.len()); + for param in params.iter() { + let future = indexer_provider.create_collab_embeddings(param); + futures.push(future); + } + + let results = try_join_all(futures).await?; + for (i, embeddings) in results.into_iter().enumerate() { + params[i].embeddings = embeddings; + } + + Ok(()) +}