diff --git a/services/appflowy-collaborate/src/group/manager.rs b/services/appflowy-collaborate/src/group/manager.rs index e3255f1a..44ff8ea3 100644 --- a/services/appflowy-collaborate/src/group/manager.rs +++ b/services/appflowy-collaborate/src/group/manager.rs @@ -243,11 +243,18 @@ where collab_type ); - let indexer = self - .indexer_provider - .indexer_for(workspace_id, collab_type.clone()) - .await - .map_err(|err| RealtimeError::Internal(err.into()))?; + let mut indexer = self.indexer_provider.indexer_for(collab_type.clone()); + if indexer.is_some() { + if !self + .indexer_provider + .can_index_workspace(workspace_id) + .await + .map_err(|e| RealtimeError::Internal(e.into()))? + { + tracing::trace!("workspace {} indexing is disabled", workspace_id); + indexer = None; + } + } let group = Arc::new( CollabGroup::new( user.uid, diff --git a/services/appflowy-collaborate/src/indexer/provider.rs b/services/appflowy-collaborate/src/indexer/provider.rs index 74ae1552..d833040c 100644 --- a/services/appflowy-collaborate/src/indexer/provider.rs +++ b/services/appflowy-collaborate/src/indexer/provider.rs @@ -19,11 +19,27 @@ use appflowy_ai_client::client::AppFlowyAIClient; use database::collab::select_blob_from_af_collab; use database::index::{get_collabs_without_embeddings, upsert_collab_embeddings}; use database::workspace::select_workspace_settings; -use database_entity::dto::AFCollabEmbeddings; +use database_entity::dto::{AFCollabEmbeddings, CollabParams}; #[async_trait] pub trait Indexer: Send + Sync { async fn index(&self, collab: MutexCollab) -> Result; + + async fn index_encoded( + &self, + object_id: &str, + encoded_collab: EncodedCollab, + ) -> Result { + let collab = Collab::new_with_source( + CollabOrigin::Empty, + object_id, + DataSource::DocStateV1(encoded_collab.doc_state.into()), + vec![], + false, + ) + .map_err(|e| AppError::Internal(e.into()))?; + self.index(MutexCollab::new(collab)).await + } } /// A structure responsible for resolving different [Indexer] types for different [CollabType]s, @@ -43,26 +59,22 @@ impl IndexerProvider { }) } - /// Returns indexer for a specific type of [Collab] object. - /// If collab of given type is not supported or workspace it belongs to has indexing disabled, - /// returns `None`. - pub async fn indexer_for( - &self, - workspace_id: &str, - collab_type: CollabType, - ) -> Result>, AppError> { - let indexer = self.indexer_cache.get(&collab_type).cloned(); - if indexer.is_none() { - return Ok(None); - } + pub async fn can_index_workspace(&self, workspace_id: &str) -> Result { let uuid = Uuid::parse_str(workspace_id)?; let settings = select_workspace_settings(&self.db, &uuid).await?; match settings { - Some(settings) if settings.disable_search_indexing => Ok(None), - _ => Ok(indexer), + None => Ok(true), + Some(settings) => Ok(!settings.disable_search_indexing), } } + /// Returns indexer for a specific type of [Collab] object. + /// If collab of given type is not supported or workspace it belongs to has indexing disabled, + /// returns `None`. + pub fn indexer_for(&self, collab_type: CollabType) -> Option> { + self.indexer_cache.get(&collab_type).cloned() + } + fn get_unindexed_collabs( &self, ) -> Pin>>> { @@ -140,6 +152,23 @@ impl IndexerProvider { } Ok(()) } + + pub async fn create_collab_embeddings( + &self, + params: &CollabParams, + ) -> Result, AppError> { + if let Some(indexer) = self.indexer_for(params.collab_type.clone()) { + let embeddings = indexer + .index_encoded( + ¶ms.object_id, + EncodedCollab::decode_from_bytes(¶ms.encoded_collab_v1)?, + ) + .await?; + Ok(Some(embeddings)) + } else { + Ok(None) + } + } } struct UnindexedCollab { diff --git a/src/api/workspace.rs b/src/api/workspace.rs index cbf6a852..91680178 100644 --- a/src/api/workspace.rs +++ b/src/api/workspace.rs @@ -5,6 +5,7 @@ use actix_web::{web, Scope}; use actix_web::{HttpRequest, Result}; use anyhow::{anyhow, Context}; use bytes::BytesMut; +use collab::entity::EncodedCollab; use collab_entity::CollabType; use prost::Message as ProstMessage; use sqlx::types::uuid; @@ -486,7 +487,7 @@ async fn create_collab_handler( }, }; - let (params, workspace_id) = params.split(); + let (mut params, workspace_id) = params.split(); if params.object_id == workspace_id { // Only the object with [CollabType::Folder] can have the same object_id as workspace_id. But @@ -506,6 +507,17 @@ async fn create_collab_handler( ); } + if state + .indexer_provider + .can_index_workspace(&workspace_id) + .await? + { + params.embeddings = state + .indexer_provider + .create_collab_embeddings(¶ms) + .await?; + } + let mut transaction = state .pg_pool .begin() @@ -597,7 +609,11 @@ async fn batch_create_collab_handler( if collab_params_list.is_empty() { return Err(AppError::InvalidRequest("Empty collab params list".to_string()).into()); } - for params in collab_params_list { + let can_index = state + .indexer_provider + .can_index_workspace(&workspace_id) + .await?; + for mut params in collab_params_list { let object_id = params.object_id.clone(); if validate_encode_collab( ¶ms.object_id, @@ -607,6 +623,15 @@ async fn batch_create_collab_handler( .await .is_ok() { + params.embeddings = if can_index { + state + .indexer_provider + .create_collab_embeddings(¶ms) + .await? + } else { + None + }; + state .collab_access_control_storage .insert_new_collab(&workspace_id, &uid, params) @@ -672,8 +697,18 @@ async fn create_collab_list_handler( .await .map_err(|err| AppError::Internal(anyhow!("Failed to start inserting collab: {}", err)))?; - for params in valid_items { + let can_index = state + .indexer_provider + .can_index_workspace(&workspace_id) + .await?; + for mut params in valid_items { let _object_id = params.object_id.clone(); + if can_index { + params.embeddings = state + .indexer_provider + .create_collab_embeddings(¶ms) + .await?; + } state .collab_access_control_storage .insert_new_collab_with_transaction(&workspace_id, &uid, params, &mut transaction) @@ -851,7 +886,24 @@ async fn update_collab_handler( let uid = state.user_cache.get_user_uid(&user_uuid).await?; let create_params = CreateCollabParams::from((workspace_id.to_string(), params)); - let (params, workspace_id) = create_params.split(); + let (mut params, workspace_id) = create_params.split(); + if let Some(indexer) = state + .indexer_provider + .indexer_for(params.collab_type.clone()) + { + if state + .indexer_provider + .can_index_workspace(&workspace_id) + .await? + { + let encoded_collab = EncodedCollab::decode_from_bytes(¶ms.encoded_collab_v1)?; + params.embeddings = Some( + indexer + .index_encoded(¶ms.object_id, encoded_collab) + .await?, + ); + } + } state .collab_access_control_storage .insert_or_update_collab(&workspace_id, &uid, params, false)