chore: check collab type before index (#1093)

This commit is contained in:
Nathan.fooo 2024-12-19 22:03:10 +08:00 committed by GitHub
parent ea131f0baa
commit 1614474c2d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 44 additions and 12 deletions

View File

@ -2,7 +2,7 @@ use crate::config::get_env_var;
use crate::indexer::metrics::EmbeddingMetrics;
use crate::indexer::vector::embedder::Embedder;
use crate::indexer::vector::open_ai;
use crate::indexer::IndexerProvider;
use crate::indexer::{Indexer, IndexerProvider};
use crate::thread_pool_no_abort::{ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
use actix::dev::Stream;
use anyhow::anyhow;
@ -129,6 +129,10 @@ impl IndexerScheduler {
true
}
pub fn is_indexing_enabled(&self, collab_type: &CollabType) -> bool {
self.indexer_provider.is_indexing_enabled(collab_type)
}
fn create_embedder(&self) -> Result<Embedder, AppError> {
if self.config.openai_api_key.is_empty() {
return Err(AppError::AIServiceUnavailable(
@ -159,10 +163,16 @@ impl IndexerScheduler {
return Ok(());
}
let embedder = self.create_embedder()?;
let indexed_collab = indexed_collab.into();
let indexer = self
.indexer_provider
.indexer_for(&indexed_collab.collab_type);
if indexer.is_none() {
return Ok(());
}
let embedder = self.create_embedder()?;
let workspace_id = Uuid::parse_str(workspace_id)?;
let indexer_provider = self.indexer_provider.clone();
let tx = self.schedule_tx.clone();
let metrics = self.metrics.clone();
@ -178,7 +188,7 @@ impl IndexerScheduler {
return;
}
match process_collab(&embedder, &indexer_provider, &indexed_collab, &metrics) {
match process_collab(&embedder, indexer, &indexed_collab, &metrics) {
Ok(Some((tokens_used, contents))) => {
if let Err(err) = tx.send(EmbeddingRecord {
workspace_id,
@ -209,12 +219,18 @@ impl IndexerScheduler {
pub fn index_encoded_collabs(
&self,
workspace_id: &str,
indexed_collabs: Vec<IndexedCollab>,
mut indexed_collabs: Vec<IndexedCollab>,
) -> Result<(), AppError> {
if !self.index_enabled() {
return Ok(());
}
indexed_collabs.retain(|collab| self.is_indexing_enabled(&collab.collab_type));
if indexed_collabs.is_empty() {
return Ok(());
}
info!("indexing {} collabs", indexed_collabs.len());
let embedder = self.create_embedder()?;
let workspace_id = Uuid::parse_str(workspace_id)?;
let indexer_provider = self.indexer_provider.clone();
@ -226,6 +242,7 @@ impl IndexerScheduler {
let embeddings_list = indexed_collabs
.into_par_iter()
.filter_map(|collab| {
let indexer = indexer_provider.indexer_for(&collab.collab_type)?;
let task = ActiveTask::new(collab.object_id.clone());
let task_created_at = task.created_at;
active_task.insert(collab.object_id.clone(), task);
@ -234,7 +251,7 @@ impl IndexerScheduler {
if !should_embed(&active_task, &collab.object_id, task_created_at) {
return None;
}
process_collab(&embedder, &indexer_provider, &collab, &metrics).ok()
process_collab(&embedder, Some(indexer), &collab, &metrics).ok()
})
.ok()
})
@ -272,6 +289,10 @@ impl IndexerScheduler {
return Ok(());
}
if !self.is_indexing_enabled(collab_type) {
return Ok(());
}
let indexer = self
.indexer_provider
.indexer_for(collab_type)
@ -571,11 +592,11 @@ async fn batch_insert_records(
/// This function must be called within the rayon thread pool.
fn process_collab(
embdder: &Embedder,
indexer_provider: &IndexerProvider,
indexer: Option<Arc<dyn Indexer>>,
indexed_collab: &IndexedCollab,
metrics: &EmbeddingMetrics,
) -> Result<Option<(u32, Vec<AFCollabEmbeddedChunk>)>, AppError> {
if let Some(indexer) = indexer_provider.indexer_for(&indexed_collab.collab_type) {
if let Some(indexer) = indexer {
metrics.record_embed_count(1);
let encode_collab = EncodedCollab::decode_from_bytes(&indexed_collab.encoded_collab)?;
let collab = Collab::new_with_source(

View File

@ -52,4 +52,8 @@ impl IndexerProvider {
pub fn indexer_for(&self, collab_type: &CollabType) -> Option<Arc<dyn Indexer>> {
self.indexer_cache.get(collab_type).cloned()
}
pub fn is_indexing_enabled(&self, collab_type: &CollabType) -> bool {
self.indexer_cache.contains_key(collab_type)
}
}

View File

@ -829,10 +829,17 @@ async fn batch_create_collab_handler(
.can_index_workspace(&workspace_id)
.await?
{
state.indexer_scheduler.index_encoded_collabs(
&workspace_id,
collab_params_list.iter().map(IndexedCollab::from).collect(),
)?;
let indexed_collabs: Vec<_> = collab_params_list
.iter()
.filter(|p| state.indexer_scheduler.is_indexing_enabled(&p.collab_type))
.map(IndexedCollab::from)
.collect();
if !indexed_collabs.is_empty() {
state
.indexer_scheduler
.index_encoded_collabs(&workspace_id, indexed_collabs)?;
}
}
let start = Instant::now();