chore: return unindex collab one by one (#1095)

* chore: return unindex collab one by one

* chore: clippy
This commit is contained in:
Nathan.fooo 2024-12-20 15:14:47 +08:00 committed by GitHub
parent a68dde0252
commit 46f9c7811f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 61 additions and 63 deletions

View File

@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "\n select c.workspace_id, c.oid, c.partition_key\n from af_collab c\n join af_workspace w on c.workspace_id = w.workspace_id\n where not coalesce(w.settings['disable_search_indexding']::boolean, false)\n and not exists (\n select 1\n from af_collab_embeddings em\n where em.oid = c.oid and em.partition_key = 0)",
"query": "\n select c.workspace_id, c.oid, c.partition_key\n from af_collab c\n join af_workspace w on c.workspace_id = w.workspace_id\n where not coalesce(w.settings['disable_search_indexding']::boolean, false)\n and not exists (\n select 1 from af_collab_embeddings em\n where em.oid = c.oid and em.partition_key = 0\n )\n ",
"describe": {
"columns": [
{
@ -28,5 +28,5 @@
false
]
},
"hash": "26f293af01281f6f4a99fd69d3a4acc1a556dd3628873fa3ce3e8eaa18ccda1b"
"hash": "ad216288cbbe83aba35b5d04705ee5964f1da4f3839c4725a6784c13f2245379"
}

View File

@ -1,7 +1,9 @@
use collab_entity::CollabType;
use futures_util::stream::BoxStream;
use futures_util::StreamExt;
use pgvector::Vector;
use sqlx::postgres::{PgHasArrayType, PgTypeInfo};
use sqlx::{Error, Executor, Postgres, Transaction};
use sqlx::{Error, Executor, PgPool, Postgres, Transaction};
use std::ops::DerefMut;
use uuid::Uuid;
@ -109,35 +111,29 @@ pub async fn upsert_collab_embeddings(
Ok(())
}
pub async fn get_collabs_without_embeddings<'a, E>(
executor: E,
) -> Result<Vec<CollabId>, sqlx::Error>
where
E: Executor<'a, Database = Postgres>,
{
let oids = sqlx::query!(
pub fn get_collabs_without_embeddings(pg_pool: &PgPool) -> BoxStream<sqlx::Result<CollabId>> {
// atm. get only documents
sqlx::query!(
r#"
select c.workspace_id, c.oid, c.partition_key
from af_collab c
join af_workspace w on c.workspace_id = w.workspace_id
where not coalesce(w.settings['disable_search_indexding']::boolean, false)
and not exists (
select 1
from af_collab_embeddings em
where em.oid = c.oid and em.partition_key = 0)"# // atm. get only documents
)
.fetch_all(executor)
.await?;
Ok(
oids
.into_iter()
.map(|r| CollabId {
collab_type: CollabType::from(r.partition_key),
workspace_id: r.workspace_id,
object_id: r.oid,
})
.collect(),
select c.workspace_id, c.oid, c.partition_key
from af_collab c
join af_workspace w on c.workspace_id = w.workspace_id
where not coalesce(w.settings['disable_search_indexding']::boolean, false)
and not exists (
select 1 from af_collab_embeddings em
where em.oid = c.oid and em.partition_key = 0
)
"#
)
.fetch(pg_pool)
.map(|row| {
row.map(|r| CollabId {
collab_type: CollabType::from(r.partition_key),
workspace_id: r.workspace_id,
object_id: r.oid,
})
})
.boxed()
}
#[derive(Debug, Clone)]

View File

@ -4,11 +4,9 @@ use crate::indexer::vector::embedder::Embedder;
use crate::indexer::vector::open_ai;
use crate::indexer::{Indexer, IndexerProvider};
use crate::thread_pool_no_abort::{ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
use actix::dev::Stream;
use anyhow::anyhow;
use app_error::AppError;
use appflowy_ai_client::dto::{EmbeddingRequest, OpenAIEmbeddingResponse};
use async_stream::try_stream;
use bytes::Bytes;
use collab::core::collab::DataSource;
use collab::core::origin::CollabOrigin;
@ -21,10 +19,10 @@ use database::collab::{CollabStorage, GetCollabOrigin};
use database::index::{get_collabs_without_embeddings, upsert_collab_embeddings};
use database::workspace::select_workspace_settings;
use database_entity::dto::{AFCollabEmbeddedChunk, CollabParams};
use futures_util::stream::BoxStream;
use futures_util::StreamExt;
use rayon::prelude::*;
use sqlx::PgPool;
use std::pin::Pin;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
@ -403,7 +401,7 @@ async fn handle_unindexed_collabs(scheduler: Arc<IndexerScheduler>) {
tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
let mut i = 0;
let mut stream = get_unindexed_collabs(&scheduler.pg_pool, scheduler.storage.clone());
let mut stream = get_unindexed_collabs(&scheduler.pg_pool, scheduler.storage.clone()).await;
let record_tx = scheduler.schedule_tx.clone();
let start = Instant::now();
while let Some(result) = stream.next().await {
@ -442,39 +440,43 @@ async fn handle_unindexed_collabs(scheduler: Arc<IndexerScheduler>) {
)
}
fn get_unindexed_collabs(
pub async fn get_unindexed_collabs(
pg_pool: &PgPool,
storage: Arc<dyn CollabStorage>,
) -> Pin<Box<dyn Stream<Item = Result<UnindexedCollab, anyhow::Error>> + Send>> {
let db = pg_pool.clone();
Box::pin(try_stream! {
let collabs = get_collabs_without_embeddings(&db).await?;
if !collabs.is_empty() {
info!("found {} unindexed collabs", collabs.len());
}
for cid in collabs {
match &cid.collab_type {
CollabType::Document => {
let collab = storage
.get_encode_collab(GetCollabOrigin::Server, cid.clone().into(), false)
.await?;
) -> BoxStream<Result<UnindexedCollab, anyhow::Error>> {
let cloned_storage = storage.clone();
get_collabs_without_embeddings(pg_pool)
.map(move |result| {
let storage = cloned_storage.clone();
async move {
match result {
Ok(cid) => match cid.collab_type {
CollabType::Document => {
let collab = storage
.get_encode_collab(GetCollabOrigin::Server, cid.clone().into(), false)
.await?;
yield UnindexedCollab {
workspace_id: cid.workspace_id,
object_id: cid.object_id,
collab_type: cid.collab_type,
collab,
};
},
CollabType::Database
| CollabType::WorkspaceDatabase
| CollabType::Folder
| CollabType::DatabaseRow
| CollabType::UserAwareness
| CollabType::Unknown => { /* atm. only document types are supported */ },
Ok(Some(UnindexedCollab {
workspace_id: cid.workspace_id,
object_id: cid.object_id,
collab_type: cid.collab_type,
collab,
}))
},
_ => Ok::<_, anyhow::Error>(None),
},
Err(e) => Err(e.into()),
}
}
}
})
})
.filter_map(|future| async {
match future.await {
Ok(Some(unindexed_collab)) => Some(Ok(unindexed_collab)),
Ok(None) => None,
Err(e) => Some(Err(e)),
}
})
.boxed()
}
async fn index_unindexd_collab(