diff --git a/.sqlx/query-6a1722f63a88debb617c20f91d2adfe4049234258ff2c8429db85206a96c53c1.json b/.sqlx/query-6a1722f63a88debb617c20f91d2adfe4049234258ff2c8429db85206a96c53c1.json deleted file mode 100644 index 7c848d9a..00000000 --- a/.sqlx/query-6a1722f63a88debb617c20f91d2adfe4049234258ff2c8429db85206a96c53c1.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "DELETE FROM af_collab_embeddings WHERE fragment_id IN (SELECT unnest($1::text[]))", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "TextArray" - ] - }, - "nullable": [] - }, - "hash": "6a1722f63a88debb617c20f91d2adfe4049234258ff2c8429db85206a96c53c1" -} diff --git a/.sqlx/query-b7d2c2d32d4b221ed8c74d6549a9aa0e03922fcdddddb1c473eb9496b7bb9721.json b/.sqlx/query-b7d2c2d32d4b221ed8c74d6549a9aa0e03922fcdddddb1c473eb9496b7bb9721.json new file mode 100644 index 00000000..2a50aa45 --- /dev/null +++ b/.sqlx/query-b7d2c2d32d4b221ed8c74d6549a9aa0e03922fcdddddb1c473eb9496b7bb9721.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "DELETE FROM af_collab_embeddings WHERE oid = $1", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [] + }, + "hash": "b7d2c2d32d4b221ed8c74d6549a9aa0e03922fcdddddb1c473eb9496b7bb9721" +} diff --git a/libs/database/src/index/collab_embeddings_ops.rs b/libs/database/src/index/collab_embeddings_ops.rs index ee60901b..1722b54d 100644 --- a/libs/database/src/index/collab_embeddings_ops.rs +++ b/libs/database/src/index/collab_embeddings_ops.rs @@ -77,34 +77,35 @@ pub async fn upsert_collab_embeddings( .await?; } - for r in records { - sqlx::query( - r#"INSERT INTO af_collab_embeddings (fragment_id, oid, partition_key, content_type, content, embedding, indexed_at) + if !records.is_empty() { + // replace existing collab embeddings + remove_collab_embeddings(tx, &records[0].object_id).await?; + for r in records { + sqlx::query( + r#"INSERT INTO af_collab_embeddings (fragment_id, oid, partition_key, content_type, content, embedding, indexed_at) VALUES ($1, $2, $3, $4, $5, $6, NOW()) ON CONFLICT (fragment_id) DO UPDATE SET content_type = $4, content = $5, embedding = $6, indexed_at = NOW()"#, - ) - .bind(&r.fragment_id) - .bind(&r.object_id) - .bind(crate::collab::partition_key_from_collab_type(&r.collab_type)) - .bind(r.content_type as i32) - .bind(&r.content) - .bind(r.embedding.clone().map(Vector::from)) - .execute(tx.deref_mut()) - .await?; + ) + .bind(&r.fragment_id) + .bind(&r.object_id) + .bind(crate::collab::partition_key_from_collab_type(&r.collab_type)) + .bind(r.content_type as i32) + .bind(&r.content) + .bind(r.embedding.clone().map(Vector::from)) + .execute(tx.deref_mut()) + .await?; + } } Ok(()) } pub async fn remove_collab_embeddings( tx: &mut Transaction<'_, sqlx::Postgres>, - ids: &[String], + object_id: &str, ) -> Result<(), sqlx::Error> { - sqlx::query!( - "DELETE FROM af_collab_embeddings WHERE fragment_id IN (SELECT unnest($1::text[]))", - ids - ) - .execute(tx.deref_mut()) - .await?; + sqlx::query!("DELETE FROM af_collab_embeddings WHERE oid = $1", object_id) + .execute(tx.deref_mut()) + .await?; Ok(()) } diff --git a/services/appflowy-collaborate/src/indexer/document_indexer.rs b/services/appflowy-collaborate/src/indexer/document_indexer.rs index eda937a2..7cd42abb 100644 --- a/services/appflowy-collaborate/src/indexer/document_indexer.rs +++ b/services/appflowy-collaborate/src/indexer/document_indexer.rs @@ -4,16 +4,16 @@ use anyhow::anyhow; use async_trait::async_trait; use collab::preclude::Collab; -use collab_document::document::DocumentBody; -use collab_document::error::DocumentError; -use collab_entity::CollabType; - use app_error::AppError; use appflowy_ai_client::client::AppFlowyAIClient; use appflowy_ai_client::dto::{ EmbeddingEncodingFormat, EmbeddingInput, EmbeddingOutput, EmbeddingRequest, EmbeddingsModel, }; +use collab_document::document::DocumentBody; +use collab_document::error::DocumentError; +use collab_entity::CollabType; use database_entity::dto::{AFCollabEmbeddingParams, AFCollabEmbeddings, EmbeddingContentType}; +use uuid::Uuid; use crate::indexer::{DocumentDataExt, Indexer}; @@ -22,6 +22,9 @@ pub struct DocumentIndexer { } impl DocumentIndexer { + /// We assume that every token is ~4 bytes. We're going to split document content into fragments + /// of ~2000 tokens each. + pub const DOC_CONTENT_SPLIT: usize = 8000; pub fn new(ai_client: AppFlowyAIClient) -> Arc { Arc::new(Self { ai_client }) } @@ -42,16 +45,42 @@ impl Indexer for DocumentIndexer { match result { Ok(document_data) => { let content = document_data.to_plain_text(); - let plain_text_param = AFCollabEmbeddingParams { - fragment_id: object_id.clone(), - object_id: object_id.clone(), - collab_type: CollabType::Document, - content_type: EmbeddingContentType::PlainText, - content, - embedding: None, - }; + let mut result = Vec::with_capacity(1 + content.len() / Self::DOC_CONTENT_SPLIT); - Ok(vec![plain_text_param]) + let mut slice = content.as_str(); + while slice.len() > Self::DOC_CONTENT_SPLIT { + // we should split document into multiple fragments + let (left, right) = slice.split_at(Self::DOC_CONTENT_SPLIT); + let param = AFCollabEmbeddingParams { + fragment_id: Uuid::new_v4().to_string(), + object_id: object_id.clone(), + collab_type: CollabType::Document, + content_type: EmbeddingContentType::PlainText, + content: left.to_string(), + embedding: None, + }; + result.push(param); + slice = right; + } + + let content = if slice.len() == content.len() { + content // we didn't slice the content + } else { + slice.to_string() + }; + if !content.is_empty() { + let param = AFCollabEmbeddingParams { + fragment_id: object_id.clone(), + object_id: object_id.clone(), + collab_type: CollabType::Document, + content_type: EmbeddingContentType::PlainText, + content, + embedding: None, + }; + result.push(param); + } + + Ok(result) }, Err(err) => { if matches!(err, DocumentError::NoRequiredData) { @@ -81,7 +110,7 @@ impl Indexer for DocumentIndexer { .embeddings(EmbeddingRequest { input: EmbeddingInput::StringArray(contents), model: EmbeddingsModel::TextEmbedding3Small.to_string(), - chunk_size: 2000, + chunk_size: (Self::DOC_CONTENT_SPLIT / 4) as i32, encoding_format: EmbeddingEncodingFormat::Float, dimensions: 1536, })