chore: do not throw errors on invalid document schema during indexing

This commit is contained in:
Bartosz Sypytkowski 2024-06-27 08:36:51 +02:00
parent 5d3574d643
commit cce52a5185
7 changed files with 34 additions and 65 deletions

View File

@ -1,14 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "\n UPDATE auth.users\n SET role = 'supabase_admin', email_confirmed_at = NOW()\n WHERE id = $1\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": []
},
"hash": "884c44d3a87ca4e520f9e8cec6ba673ea4e196920636e4a4db9d42fad3ef4d73"
}

View File

@ -1,22 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "SELECT pg_advisory_xact_lock($1)",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "pg_advisory_xact_lock",
"type_info": "Void"
}
],
"parameters": {
"Left": [
"Int8"
]
},
"nullable": [
null
]
},
"hash": "a06e1d9f6f95e4c4c2b98310ebddcc9d963cc033582bf2e945e8bf3a301b4247"
}

View File

@ -17,7 +17,7 @@ actix.workspace = true
actix-web.workspace = true
actix-http = { workspace = true, default-features = false, features = ["openssl", "compress-brotli", "compress-gzip"] }
actix-web-actors = { version = "4.3" }
app-error = { workspace = true, features = ["sqlx_error", "actix_web_error", "tokio_error"] }
app-error = { workspace = true, features = ["sqlx_error", "actix_web_error", "tokio_error", "bincode_error", "appflowy_ai_error"] }
authentication.workspace = true
brotli.workspace = true
dashmap.workspace = true
@ -59,7 +59,7 @@ indexmap = "2.2.5"
semver = "1.0.22"
redis = "0.25.2"
secrecy.workspace = true
shared-entity = { workspace = true, features = ["cloud"]}
shared-entity = { workspace = true, features = ["cloud"] }
parking_lot = "0.12.1"
lazy_static = "1.4.0"
itertools = "0.12.0"

View File

@ -115,7 +115,7 @@ where
};
let collab_embedding = if let Some(indexer) = &self.indexer {
Some(indexer.index(collab.clone()).await?)
indexer.index(collab.clone()).await?
} else {
None
};

View File

@ -47,9 +47,15 @@ impl DocumentIndexer {
#[async_trait]
impl Indexer for DocumentIndexer {
async fn index(&self, collab: MutexCollab) -> Result<AFCollabEmbeddings, AppError> {
let (object_id, mut params) = Self::get_document_contents(Arc::new(collab))
.map_err(|e| AppError::OpenError(e.to_string()))?;
async fn index(&self, collab: MutexCollab) -> Result<Option<AFCollabEmbeddings>, AppError> {
let (object_id, mut params) = match Self::get_document_contents(Arc::new(collab)) {
Ok(result) => result,
Err(err) => {
tracing::warn!("failed to get document data: {}", err);
return Ok(None);
},
};
let contents: Vec<_> = params
.iter()
.map(|fragment| fragment.content.clone())
@ -64,8 +70,7 @@ impl Indexer for DocumentIndexer {
encoding_format: EmbeddingEncodingFormat::Float,
dimensions: 1536,
})
.await
.map_err(|e| AppError::Internal(e.into()))?;
.await?;
for embedding in resp.data {
let param = &mut params[embedding.index as usize];
@ -86,9 +91,9 @@ impl Indexer for DocumentIndexer {
object_id,
resp.total_tokens
);
Ok(AFCollabEmbeddings {
Ok(Some(AFCollabEmbeddings {
tokens_consumed: resp.total_tokens as u32,
params,
})
}))
}
}

View File

@ -23,13 +23,13 @@ use database_entity::dto::{AFCollabEmbeddings, CollabParams};
#[async_trait]
pub trait Indexer: Send + Sync {
async fn index(&self, collab: MutexCollab) -> Result<AFCollabEmbeddings, AppError>;
async fn index(&self, collab: MutexCollab) -> Result<Option<AFCollabEmbeddings>, AppError>;
async fn index_encoded(
&self,
object_id: &str,
encoded_collab: EncodedCollab,
) -> Result<AFCollabEmbeddings, AppError> {
) -> Result<Option<AFCollabEmbeddings>, AppError> {
let collab = Collab::new_with_source(
CollabOrigin::Empty,
object_id,
@ -139,16 +139,17 @@ impl IndexerProvider {
)
.map_err(|err| AppError::Internal(err.into()))?,
);
let embeddings = indexer.index(collab).await?;
let mut tx = self.db.begin().await?;
upsert_collab_embeddings(
&mut tx,
&unindexed.workspace_id,
embeddings.tokens_consumed,
&embeddings.params,
)
.await?;
tx.commit().await?;
if let Some(embeddings) = indexer.index(collab).await? {
let mut tx = self.db.begin().await?;
upsert_collab_embeddings(
&mut tx,
&unindexed.workspace_id,
embeddings.tokens_consumed,
&embeddings.params,
)
.await?;
tx.commit().await?;
}
}
Ok(())
}
@ -164,7 +165,7 @@ impl IndexerProvider {
EncodedCollab::decode_from_bytes(&params.encoded_collab_v1)?,
)
.await?;
Ok(Some(embeddings))
Ok(embeddings)
} else {
Ok(None)
}

View File

@ -896,12 +896,11 @@ async fn update_collab_handler(
.can_index_workspace(&workspace_id)
.await?
{
let encoded_collab = EncodedCollab::decode_from_bytes(&params.encoded_collab_v1)?;
params.embeddings = Some(
indexer
.index_encoded(&params.object_id, encoded_collab)
.await?,
);
let encoded_collab = EncodedCollab::decode_from_bytes(&params.encoded_collab_v1)
.map_err(|e| AppError::Internal(e.into()))?;
params.embeddings = indexer
.index_encoded(&params.object_id, encoded_collab)
.await?;
}
}
state