chore: split document content into multiple embeddings (#990)

* chore: split document content into multiple embeddings

* chore: fallback to uuid v4 generation due to unmaintained transient dependency

* chore: fix lint errors
This commit is contained in:
Bartosz Sypytkowski 2024-11-14 07:15:52 +01:00 committed by GitHub
parent a4ea4eeb31
commit f6fef9918b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 77 additions and 47 deletions

View File

@ -1,14 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "DELETE FROM af_collab_embeddings WHERE fragment_id IN (SELECT unnest($1::text[]))",
"describe": {
"columns": [],
"parameters": {
"Left": [
"TextArray"
]
},
"nullable": []
},
"hash": "6a1722f63a88debb617c20f91d2adfe4049234258ff2c8429db85206a96c53c1"
}

View File

@ -0,0 +1,14 @@
{
"db_name": "PostgreSQL",
"query": "DELETE FROM af_collab_embeddings WHERE oid = $1",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text"
]
},
"nullable": []
},
"hash": "b7d2c2d32d4b221ed8c74d6549a9aa0e03922fcdddddb1c473eb9496b7bb9721"
}

View File

@ -77,34 +77,35 @@ pub async fn upsert_collab_embeddings(
.await?;
}
for r in records {
sqlx::query(
r#"INSERT INTO af_collab_embeddings (fragment_id, oid, partition_key, content_type, content, embedding, indexed_at)
if !records.is_empty() {
// replace existing collab embeddings
remove_collab_embeddings(tx, &records[0].object_id).await?;
for r in records {
sqlx::query(
r#"INSERT INTO af_collab_embeddings (fragment_id, oid, partition_key, content_type, content, embedding, indexed_at)
VALUES ($1, $2, $3, $4, $5, $6, NOW())
ON CONFLICT (fragment_id) DO UPDATE SET content_type = $4, content = $5, embedding = $6, indexed_at = NOW()"#,
)
.bind(&r.fragment_id)
.bind(&r.object_id)
.bind(crate::collab::partition_key_from_collab_type(&r.collab_type))
.bind(r.content_type as i32)
.bind(&r.content)
.bind(r.embedding.clone().map(Vector::from))
.execute(tx.deref_mut())
.await?;
)
.bind(&r.fragment_id)
.bind(&r.object_id)
.bind(crate::collab::partition_key_from_collab_type(&r.collab_type))
.bind(r.content_type as i32)
.bind(&r.content)
.bind(r.embedding.clone().map(Vector::from))
.execute(tx.deref_mut())
.await?;
}
}
Ok(())
}
pub async fn remove_collab_embeddings(
tx: &mut Transaction<'_, sqlx::Postgres>,
ids: &[String],
object_id: &str,
) -> Result<(), sqlx::Error> {
sqlx::query!(
"DELETE FROM af_collab_embeddings WHERE fragment_id IN (SELECT unnest($1::text[]))",
ids
)
.execute(tx.deref_mut())
.await?;
sqlx::query!("DELETE FROM af_collab_embeddings WHERE oid = $1", object_id)
.execute(tx.deref_mut())
.await?;
Ok(())
}

View File

@ -4,16 +4,16 @@ use anyhow::anyhow;
use async_trait::async_trait;
use collab::preclude::Collab;
use collab_document::document::DocumentBody;
use collab_document::error::DocumentError;
use collab_entity::CollabType;
use app_error::AppError;
use appflowy_ai_client::client::AppFlowyAIClient;
use appflowy_ai_client::dto::{
EmbeddingEncodingFormat, EmbeddingInput, EmbeddingOutput, EmbeddingRequest, EmbeddingsModel,
};
use collab_document::document::DocumentBody;
use collab_document::error::DocumentError;
use collab_entity::CollabType;
use database_entity::dto::{AFCollabEmbeddingParams, AFCollabEmbeddings, EmbeddingContentType};
use uuid::Uuid;
use crate::indexer::{DocumentDataExt, Indexer};
@ -22,6 +22,9 @@ pub struct DocumentIndexer {
}
impl DocumentIndexer {
/// We assume that every token is ~4 bytes. We're going to split document content into fragments
/// of ~2000 tokens each.
pub const DOC_CONTENT_SPLIT: usize = 8000;
pub fn new(ai_client: AppFlowyAIClient) -> Arc<Self> {
Arc::new(Self { ai_client })
}
@ -42,16 +45,42 @@ impl Indexer for DocumentIndexer {
match result {
Ok(document_data) => {
let content = document_data.to_plain_text();
let plain_text_param = AFCollabEmbeddingParams {
fragment_id: object_id.clone(),
object_id: object_id.clone(),
collab_type: CollabType::Document,
content_type: EmbeddingContentType::PlainText,
content,
embedding: None,
};
let mut result = Vec::with_capacity(1 + content.len() / Self::DOC_CONTENT_SPLIT);
Ok(vec![plain_text_param])
let mut slice = content.as_str();
while slice.len() > Self::DOC_CONTENT_SPLIT {
// we should split document into multiple fragments
let (left, right) = slice.split_at(Self::DOC_CONTENT_SPLIT);
let param = AFCollabEmbeddingParams {
fragment_id: Uuid::new_v4().to_string(),
object_id: object_id.clone(),
collab_type: CollabType::Document,
content_type: EmbeddingContentType::PlainText,
content: left.to_string(),
embedding: None,
};
result.push(param);
slice = right;
}
let content = if slice.len() == content.len() {
content // we didn't slice the content
} else {
slice.to_string()
};
if !content.is_empty() {
let param = AFCollabEmbeddingParams {
fragment_id: object_id.clone(),
object_id: object_id.clone(),
collab_type: CollabType::Document,
content_type: EmbeddingContentType::PlainText,
content,
embedding: None,
};
result.push(param);
}
Ok(result)
},
Err(err) => {
if matches!(err, DocumentError::NoRequiredData) {
@ -81,7 +110,7 @@ impl Indexer for DocumentIndexer {
.embeddings(EmbeddingRequest {
input: EmbeddingInput::StringArray(contents),
model: EmbeddingsModel::TextEmbedding3Small.to_string(),
chunk_size: 2000,
chunk_size: (Self::DOC_CONTENT_SPLIT / 4) as i32,
encoding_format: EmbeddingEncodingFormat::Float,
dimensions: 1536,
})