From 1614474c2d707dbc98cbdda08ea0b4cda7817d09 Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Thu, 19 Dec 2024 22:03:10 +0800 Subject: [PATCH] chore: check collab type before index (#1093) --- .../src/indexer/indexer_scheduler.rs | 37 +++++++++++++++---- .../src/indexer/provider.rs | 4 ++ src/api/workspace.rs | 15 ++++++-- 3 files changed, 44 insertions(+), 12 deletions(-) diff --git a/services/appflowy-collaborate/src/indexer/indexer_scheduler.rs b/services/appflowy-collaborate/src/indexer/indexer_scheduler.rs index 8e02e32b..f6aa53e2 100644 --- a/services/appflowy-collaborate/src/indexer/indexer_scheduler.rs +++ b/services/appflowy-collaborate/src/indexer/indexer_scheduler.rs @@ -2,7 +2,7 @@ use crate::config::get_env_var; use crate::indexer::metrics::EmbeddingMetrics; use crate::indexer::vector::embedder::Embedder; use crate::indexer::vector::open_ai; -use crate::indexer::IndexerProvider; +use crate::indexer::{Indexer, IndexerProvider}; use crate::thread_pool_no_abort::{ThreadPoolNoAbort, ThreadPoolNoAbortBuilder}; use actix::dev::Stream; use anyhow::anyhow; @@ -129,6 +129,10 @@ impl IndexerScheduler { true } + pub fn is_indexing_enabled(&self, collab_type: &CollabType) -> bool { + self.indexer_provider.is_indexing_enabled(collab_type) + } + fn create_embedder(&self) -> Result { if self.config.openai_api_key.is_empty() { return Err(AppError::AIServiceUnavailable( @@ -159,10 +163,16 @@ impl IndexerScheduler { return Ok(()); } - let embedder = self.create_embedder()?; let indexed_collab = indexed_collab.into(); + let indexer = self + .indexer_provider + .indexer_for(&indexed_collab.collab_type); + if indexer.is_none() { + return Ok(()); + } + + let embedder = self.create_embedder()?; let workspace_id = Uuid::parse_str(workspace_id)?; - let indexer_provider = self.indexer_provider.clone(); let tx = self.schedule_tx.clone(); let metrics = self.metrics.clone(); @@ -178,7 +188,7 @@ impl IndexerScheduler { return; } - match process_collab(&embedder, &indexer_provider, &indexed_collab, &metrics) { + match process_collab(&embedder, indexer, &indexed_collab, &metrics) { Ok(Some((tokens_used, contents))) => { if let Err(err) = tx.send(EmbeddingRecord { workspace_id, @@ -209,12 +219,18 @@ impl IndexerScheduler { pub fn index_encoded_collabs( &self, workspace_id: &str, - indexed_collabs: Vec, + mut indexed_collabs: Vec, ) -> Result<(), AppError> { if !self.index_enabled() { return Ok(()); } + indexed_collabs.retain(|collab| self.is_indexing_enabled(&collab.collab_type)); + if indexed_collabs.is_empty() { + return Ok(()); + } + + info!("indexing {} collabs", indexed_collabs.len()); let embedder = self.create_embedder()?; let workspace_id = Uuid::parse_str(workspace_id)?; let indexer_provider = self.indexer_provider.clone(); @@ -226,6 +242,7 @@ impl IndexerScheduler { let embeddings_list = indexed_collabs .into_par_iter() .filter_map(|collab| { + let indexer = indexer_provider.indexer_for(&collab.collab_type)?; let task = ActiveTask::new(collab.object_id.clone()); let task_created_at = task.created_at; active_task.insert(collab.object_id.clone(), task); @@ -234,7 +251,7 @@ impl IndexerScheduler { if !should_embed(&active_task, &collab.object_id, task_created_at) { return None; } - process_collab(&embedder, &indexer_provider, &collab, &metrics).ok() + process_collab(&embedder, Some(indexer), &collab, &metrics).ok() }) .ok() }) @@ -272,6 +289,10 @@ impl IndexerScheduler { return Ok(()); } + if !self.is_indexing_enabled(collab_type) { + return Ok(()); + } + let indexer = self .indexer_provider .indexer_for(collab_type) @@ -571,11 +592,11 @@ async fn batch_insert_records( /// This function must be called within the rayon thread pool. fn process_collab( embdder: &Embedder, - indexer_provider: &IndexerProvider, + indexer: Option>, indexed_collab: &IndexedCollab, metrics: &EmbeddingMetrics, ) -> Result)>, AppError> { - if let Some(indexer) = indexer_provider.indexer_for(&indexed_collab.collab_type) { + if let Some(indexer) = indexer { metrics.record_embed_count(1); let encode_collab = EncodedCollab::decode_from_bytes(&indexed_collab.encoded_collab)?; let collab = Collab::new_with_source( diff --git a/services/appflowy-collaborate/src/indexer/provider.rs b/services/appflowy-collaborate/src/indexer/provider.rs index fea57041..ed70a710 100644 --- a/services/appflowy-collaborate/src/indexer/provider.rs +++ b/services/appflowy-collaborate/src/indexer/provider.rs @@ -52,4 +52,8 @@ impl IndexerProvider { pub fn indexer_for(&self, collab_type: &CollabType) -> Option> { self.indexer_cache.get(collab_type).cloned() } + + pub fn is_indexing_enabled(&self, collab_type: &CollabType) -> bool { + self.indexer_cache.contains_key(collab_type) + } } diff --git a/src/api/workspace.rs b/src/api/workspace.rs index fa18bb2d..52b37fb9 100644 --- a/src/api/workspace.rs +++ b/src/api/workspace.rs @@ -829,10 +829,17 @@ async fn batch_create_collab_handler( .can_index_workspace(&workspace_id) .await? { - state.indexer_scheduler.index_encoded_collabs( - &workspace_id, - collab_params_list.iter().map(IndexedCollab::from).collect(), - )?; + let indexed_collabs: Vec<_> = collab_params_list + .iter() + .filter(|p| state.indexer_scheduler.is_indexing_enabled(&p.collab_type)) + .map(IndexedCollab::from) + .collect(); + + if !indexed_collabs.is_empty() { + state + .indexer_scheduler + .index_encoded_collabs(&workspace_id, indexed_collabs)?; + } } let start = Instant::now();