From cce52a51852efd037464d5dfdf8f2e6c22b08e4d Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Thu, 27 Jun 2024 08:36:51 +0200 Subject: [PATCH] chore: do not throw errors on invalid document schema during indexing --- ...a673ea4e196920636e4a4db9d42fad3ef4d73.json | 14 ---------- ...dcc9d963cc033582bf2e945e8bf3a301b4247.json | 22 --------------- services/appflowy-collaborate/Cargo.toml | 4 +-- .../src/group/persistence.rs | 2 +- .../src/indexer/document_indexer.rs | 19 ++++++++----- .../src/indexer/provider.rs | 27 ++++++++++--------- src/api/workspace.rs | 11 ++++---- 7 files changed, 34 insertions(+), 65 deletions(-) delete mode 100644 .sqlx/query-884c44d3a87ca4e520f9e8cec6ba673ea4e196920636e4a4db9d42fad3ef4d73.json delete mode 100644 .sqlx/query-a06e1d9f6f95e4c4c2b98310ebddcc9d963cc033582bf2e945e8bf3a301b4247.json diff --git a/.sqlx/query-884c44d3a87ca4e520f9e8cec6ba673ea4e196920636e4a4db9d42fad3ef4d73.json b/.sqlx/query-884c44d3a87ca4e520f9e8cec6ba673ea4e196920636e4a4db9d42fad3ef4d73.json deleted file mode 100644 index aafc6f23..00000000 --- a/.sqlx/query-884c44d3a87ca4e520f9e8cec6ba673ea4e196920636e4a4db9d42fad3ef4d73.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n UPDATE auth.users\n SET role = 'supabase_admin', email_confirmed_at = NOW()\n WHERE id = $1\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Uuid" - ] - }, - "nullable": [] - }, - "hash": "884c44d3a87ca4e520f9e8cec6ba673ea4e196920636e4a4db9d42fad3ef4d73" -} diff --git a/.sqlx/query-a06e1d9f6f95e4c4c2b98310ebddcc9d963cc033582bf2e945e8bf3a301b4247.json b/.sqlx/query-a06e1d9f6f95e4c4c2b98310ebddcc9d963cc033582bf2e945e8bf3a301b4247.json deleted file mode 100644 index 909e6ad4..00000000 --- a/.sqlx/query-a06e1d9f6f95e4c4c2b98310ebddcc9d963cc033582bf2e945e8bf3a301b4247.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "SELECT pg_advisory_xact_lock($1)", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "pg_advisory_xact_lock", - "type_info": "Void" - } - ], - "parameters": { - "Left": [ - "Int8" - ] - }, - "nullable": [ - null - ] - }, - "hash": "a06e1d9f6f95e4c4c2b98310ebddcc9d963cc033582bf2e945e8bf3a301b4247" -} diff --git a/services/appflowy-collaborate/Cargo.toml b/services/appflowy-collaborate/Cargo.toml index 0e6938ca..266a2aef 100644 --- a/services/appflowy-collaborate/Cargo.toml +++ b/services/appflowy-collaborate/Cargo.toml @@ -17,7 +17,7 @@ actix.workspace = true actix-web.workspace = true actix-http = { workspace = true, default-features = false, features = ["openssl", "compress-brotli", "compress-gzip"] } actix-web-actors = { version = "4.3" } -app-error = { workspace = true, features = ["sqlx_error", "actix_web_error", "tokio_error"] } +app-error = { workspace = true, features = ["sqlx_error", "actix_web_error", "tokio_error", "bincode_error", "appflowy_ai_error"] } authentication.workspace = true brotli.workspace = true dashmap.workspace = true @@ -59,7 +59,7 @@ indexmap = "2.2.5" semver = "1.0.22" redis = "0.25.2" secrecy.workspace = true -shared-entity = { workspace = true, features = ["cloud"]} +shared-entity = { workspace = true, features = ["cloud"] } parking_lot = "0.12.1" lazy_static = "1.4.0" itertools = "0.12.0" diff --git a/services/appflowy-collaborate/src/group/persistence.rs b/services/appflowy-collaborate/src/group/persistence.rs index ff60b98e..277a78c2 100644 --- a/services/appflowy-collaborate/src/group/persistence.rs +++ b/services/appflowy-collaborate/src/group/persistence.rs @@ -115,7 +115,7 @@ where }; let collab_embedding = if let Some(indexer) = &self.indexer { - Some(indexer.index(collab.clone()).await?) + indexer.index(collab.clone()).await? } else { None }; diff --git a/services/appflowy-collaborate/src/indexer/document_indexer.rs b/services/appflowy-collaborate/src/indexer/document_indexer.rs index c185422d..66c174f2 100644 --- a/services/appflowy-collaborate/src/indexer/document_indexer.rs +++ b/services/appflowy-collaborate/src/indexer/document_indexer.rs @@ -47,9 +47,15 @@ impl DocumentIndexer { #[async_trait] impl Indexer for DocumentIndexer { - async fn index(&self, collab: MutexCollab) -> Result { - let (object_id, mut params) = Self::get_document_contents(Arc::new(collab)) - .map_err(|e| AppError::OpenError(e.to_string()))?; + async fn index(&self, collab: MutexCollab) -> Result, AppError> { + let (object_id, mut params) = match Self::get_document_contents(Arc::new(collab)) { + Ok(result) => result, + Err(err) => { + tracing::warn!("failed to get document data: {}", err); + return Ok(None); + }, + }; + let contents: Vec<_> = params .iter() .map(|fragment| fragment.content.clone()) @@ -64,8 +70,7 @@ impl Indexer for DocumentIndexer { encoding_format: EmbeddingEncodingFormat::Float, dimensions: 1536, }) - .await - .map_err(|e| AppError::Internal(e.into()))?; + .await?; for embedding in resp.data { let param = &mut params[embedding.index as usize]; @@ -86,9 +91,9 @@ impl Indexer for DocumentIndexer { object_id, resp.total_tokens ); - Ok(AFCollabEmbeddings { + Ok(Some(AFCollabEmbeddings { tokens_consumed: resp.total_tokens as u32, params, - }) + })) } } diff --git a/services/appflowy-collaborate/src/indexer/provider.rs b/services/appflowy-collaborate/src/indexer/provider.rs index d833040c..14f10230 100644 --- a/services/appflowy-collaborate/src/indexer/provider.rs +++ b/services/appflowy-collaborate/src/indexer/provider.rs @@ -23,13 +23,13 @@ use database_entity::dto::{AFCollabEmbeddings, CollabParams}; #[async_trait] pub trait Indexer: Send + Sync { - async fn index(&self, collab: MutexCollab) -> Result; + async fn index(&self, collab: MutexCollab) -> Result, AppError>; async fn index_encoded( &self, object_id: &str, encoded_collab: EncodedCollab, - ) -> Result { + ) -> Result, AppError> { let collab = Collab::new_with_source( CollabOrigin::Empty, object_id, @@ -139,16 +139,17 @@ impl IndexerProvider { ) .map_err(|err| AppError::Internal(err.into()))?, ); - let embeddings = indexer.index(collab).await?; - let mut tx = self.db.begin().await?; - upsert_collab_embeddings( - &mut tx, - &unindexed.workspace_id, - embeddings.tokens_consumed, - &embeddings.params, - ) - .await?; - tx.commit().await?; + if let Some(embeddings) = indexer.index(collab).await? { + let mut tx = self.db.begin().await?; + upsert_collab_embeddings( + &mut tx, + &unindexed.workspace_id, + embeddings.tokens_consumed, + &embeddings.params, + ) + .await?; + tx.commit().await?; + } } Ok(()) } @@ -164,7 +165,7 @@ impl IndexerProvider { EncodedCollab::decode_from_bytes(¶ms.encoded_collab_v1)?, ) .await?; - Ok(Some(embeddings)) + Ok(embeddings) } else { Ok(None) } diff --git a/src/api/workspace.rs b/src/api/workspace.rs index 91680178..c6b858ca 100644 --- a/src/api/workspace.rs +++ b/src/api/workspace.rs @@ -896,12 +896,11 @@ async fn update_collab_handler( .can_index_workspace(&workspace_id) .await? { - let encoded_collab = EncodedCollab::decode_from_bytes(¶ms.encoded_collab_v1)?; - params.embeddings = Some( - indexer - .index_encoded(¶ms.object_id, encoded_collab) - .await?, - ); + let encoded_collab = EncodedCollab::decode_from_bytes(¶ms.encoded_collab_v1) + .map_err(|e| AppError::Internal(e.into()))?; + params.embeddings = indexer + .index_encoded(¶ms.object_id, encoded_collab) + .await?; } } state