From f13b9ee7877cd8a3df11a428e96252a598a438d7 Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Mon, 24 Jun 2024 10:03:49 +0200 Subject: [PATCH] chore: add indexing of collabs at application start --- .../appflowy-collaborate/src/group/manager.rs | 1 + .../src/indexer/indexer.rs | 56 ------- .../appflowy-collaborate/src/indexer/mod.rs | 4 +- .../src/indexer/provider.rs | 150 ++++++++++++++++++ .../appflowy-collaborate/src/rt_server.rs | 8 +- 5 files changed, 160 insertions(+), 59 deletions(-) delete mode 100644 services/appflowy-collaborate/src/indexer/indexer.rs create mode 100644 services/appflowy-collaborate/src/indexer/provider.rs diff --git a/services/appflowy-collaborate/src/group/manager.rs b/services/appflowy-collaborate/src/group/manager.rs index 84d10e95..e3255f1a 100644 --- a/services/appflowy-collaborate/src/group/manager.rs +++ b/services/appflowy-collaborate/src/group/manager.rs @@ -45,6 +45,7 @@ where S: CollabStorage, AC: RealtimeAccessControl, { + #[allow(clippy::too_many_arguments)] pub async fn new( storage: Arc, access_control: Arc, diff --git a/services/appflowy-collaborate/src/indexer/indexer.rs b/services/appflowy-collaborate/src/indexer/indexer.rs deleted file mode 100644 index 7a13cffc..00000000 --- a/services/appflowy-collaborate/src/indexer/indexer.rs +++ /dev/null @@ -1,56 +0,0 @@ -use async_trait::async_trait; -use collab::core::collab::MutexCollab; -use collab_entity::CollabType; -use sqlx::PgPool; -use std::collections::HashMap; -use std::sync::Arc; -use uuid::Uuid; - -use crate::indexer::DocumentIndexer; -use app_error::AppError; -use appflowy_ai_client::client::AppFlowyAIClient; -use database::workspace::select_workspace_settings; -use database_entity::dto::AFCollabEmbeddings; - -#[async_trait] -pub trait Indexer: Send + Sync { - async fn index(&self, collab: MutexCollab) -> Result; -} - -/// A structure responsible for resolving different [Indexer] types for different [CollabType]s, -/// including access permission checks for the specific workspaces. -pub struct IndexerProvider { - db: PgPool, - indexer_cache: HashMap>, -} - -impl IndexerProvider { - pub fn new(db: PgPool, ai_client: AppFlowyAIClient) -> Arc { - let mut cache: HashMap> = HashMap::new(); - cache.insert(CollabType::Document, DocumentIndexer::new(ai_client)); - Arc::new(Self { - db, - indexer_cache: cache, - }) - } - - /// Returns indexer for a specific type of [Collab] object. - /// If collab of given type is not supported or workspace it belongs to has indexing disabled, - /// returns `None`. - pub async fn indexer_for( - &self, - workspace_id: &str, - collab_type: CollabType, - ) -> Result>, AppError> { - let indexer = self.indexer_cache.get(&collab_type).cloned(); - if indexer.is_none() { - return Ok(None); - } - let uuid = Uuid::parse_str(workspace_id)?; - let settings = select_workspace_settings(&self.db, &uuid).await?; - match settings { - Some(settings) if settings.disable_search_indexing => Ok(None), - _ => Ok(indexer), - } - } -} diff --git a/services/appflowy-collaborate/src/indexer/mod.rs b/services/appflowy-collaborate/src/indexer/mod.rs index 5a8999e0..09581298 100644 --- a/services/appflowy-collaborate/src/indexer/mod.rs +++ b/services/appflowy-collaborate/src/indexer/mod.rs @@ -1,7 +1,7 @@ mod document_indexer; mod ext; -mod indexer; +mod provider; pub use document_indexer::DocumentIndexer; pub use ext::DocumentDataExt; -pub use indexer::*; +pub use provider::*; diff --git a/services/appflowy-collaborate/src/indexer/provider.rs b/services/appflowy-collaborate/src/indexer/provider.rs new file mode 100644 index 00000000..74ae1552 --- /dev/null +++ b/services/appflowy-collaborate/src/indexer/provider.rs @@ -0,0 +1,150 @@ +use actix::dev::Stream; +use async_stream::try_stream; +use async_trait::async_trait; +use collab::core::collab::{DataSource, MutexCollab}; +use collab::core::origin::CollabOrigin; +use collab::entity::EncodedCollab; +use collab::preclude::Collab; +use collab_entity::CollabType; +use sqlx::PgPool; +use std::collections::HashMap; +use std::pin::Pin; +use std::sync::Arc; +use tokio_stream::StreamExt; +use uuid::Uuid; + +use crate::indexer::DocumentIndexer; +use app_error::AppError; +use appflowy_ai_client::client::AppFlowyAIClient; +use database::collab::select_blob_from_af_collab; +use database::index::{get_collabs_without_embeddings, upsert_collab_embeddings}; +use database::workspace::select_workspace_settings; +use database_entity::dto::AFCollabEmbeddings; + +#[async_trait] +pub trait Indexer: Send + Sync { + async fn index(&self, collab: MutexCollab) -> Result; +} + +/// A structure responsible for resolving different [Indexer] types for different [CollabType]s, +/// including access permission checks for the specific workspaces. +pub struct IndexerProvider { + db: PgPool, + indexer_cache: HashMap>, +} + +impl IndexerProvider { + pub fn new(db: PgPool, ai_client: AppFlowyAIClient) -> Arc { + let mut cache: HashMap> = HashMap::new(); + cache.insert(CollabType::Document, DocumentIndexer::new(ai_client)); + Arc::new(Self { + db, + indexer_cache: cache, + }) + } + + /// Returns indexer for a specific type of [Collab] object. + /// If collab of given type is not supported or workspace it belongs to has indexing disabled, + /// returns `None`. + pub async fn indexer_for( + &self, + workspace_id: &str, + collab_type: CollabType, + ) -> Result>, AppError> { + let indexer = self.indexer_cache.get(&collab_type).cloned(); + if indexer.is_none() { + return Ok(None); + } + let uuid = Uuid::parse_str(workspace_id)?; + let settings = select_workspace_settings(&self.db, &uuid).await?; + match settings { + Some(settings) if settings.disable_search_indexing => Ok(None), + _ => Ok(indexer), + } + } + + fn get_unindexed_collabs( + &self, + ) -> Pin>>> { + let db = self.db.clone(); + Box::pin(try_stream! { + let collabs = get_collabs_without_embeddings(&db).await?; + + if !collabs.is_empty() { + tracing::trace!("found {} unindexed collabs", collabs.len()); + } + for cid in collabs { + match &cid.collab_type { + CollabType::Document => { + let collab = + select_blob_from_af_collab(&db, &CollabType::Document, &cid.object_id).await?; + let collab = EncodedCollab::decode_from_bytes(&collab)?; + yield UnindexedCollab { + workspace_id: cid.workspace_id, + object_id: cid.object_id, + collab_type: cid.collab_type, + collab, + }; + }, + CollabType::Database + | CollabType::WorkspaceDatabase + | CollabType::Folder + | CollabType::DatabaseRow + | CollabType::UserAwareness + | CollabType::Unknown => { /* atm. only document types are supported */ }, + } + } + }) + } + + pub async fn handle_unindexed_collabs(indexer: Arc) { + let mut stream = indexer.get_unindexed_collabs(); + while let Some(result) = stream.next().await { + match result { + Ok(collab) => { + let workspace = collab.workspace_id; + let oid = collab.object_id.clone(); + if let Err(err) = Self::index_collab(&indexer, collab).await { + tracing::warn!("failed to index collab {}/{}: {}", workspace, oid, err); + } + }, + Err(err) => { + tracing::error!("failed to get unindexed document: {}", err); + }, + } + } + } + + async fn index_collab(&self, unindexed: UnindexedCollab) -> Result<(), AppError> { + if let Some(indexer) = self.indexer_cache.get(&unindexed.collab_type) { + let collab = MutexCollab::new( + Collab::new_with_source( + CollabOrigin::Empty, + &unindexed.object_id, + DataSource::DocStateV1(unindexed.collab.doc_state.into()), + vec![], + false, + ) + .map_err(|err| AppError::Internal(err.into()))?, + ); + let embeddings = indexer.index(collab).await?; + let mut tx = self.db.begin().await?; + upsert_collab_embeddings( + &mut tx, + &unindexed.workspace_id, + embeddings.tokens_consumed, + &embeddings.params, + ) + .await?; + tx.commit().await?; + } + Ok(()) + } +} + +struct UnindexedCollab { + pub workspace_id: Uuid, + pub object_id: String, + pub collab_type: CollabType, + pub collab: EncodedCollab, +} diff --git a/services/appflowy-collaborate/src/rt_server.rs b/services/appflowy-collaborate/src/rt_server.rs index b2989096..7dae0dc7 100644 --- a/services/appflowy-collaborate/src/rt_server.rs +++ b/services/appflowy-collaborate/src/rt_server.rs @@ -73,7 +73,7 @@ where group_persistence_interval, edit_state_max_count, edit_state_max_secs, - indexer_provider, + indexer_provider.clone(), ) .await?, ); @@ -86,6 +86,8 @@ where spawn_metrics(&metrics, &metrics_calculate, &storage); + spawn_handle_unindexed_collabs(indexer_provider); + Ok(Self { storage, group_manager, @@ -251,6 +253,10 @@ where } } +fn spawn_handle_unindexed_collabs(indexer_provider: Arc) { + tokio::task::spawn_local(IndexerProvider::handle_unindexed_collabs(indexer_provider)); +} + fn spawn_period_check_inactive_group( weak_groups: Weak>, group_sender_by_object_id: &Arc>,