diff --git a/.sqlx/query-7c0d6de28d557d83cf94faceef1216ea932ffe1bf29f8ed3dafeb6ddeea437fb.json b/.sqlx/query-773aac7e401c3e6c04d1dc8ea412b9678b7227832a3487270d724f623072fe89.json similarity index 71% rename from .sqlx/query-7c0d6de28d557d83cf94faceef1216ea932ffe1bf29f8ed3dafeb6ddeea437fb.json rename to .sqlx/query-773aac7e401c3e6c04d1dc8ea412b9678b7227832a3487270d724f623072fe89.json index c2baf5a5..67f57a98 100644 --- a/.sqlx/query-7c0d6de28d557d83cf94faceef1216ea932ffe1bf29f8ed3dafeb6ddeea437fb.json +++ b/.sqlx/query-773aac7e401c3e6c04d1dc8ea412b9678b7227832a3487270d724f623072fe89.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\nSELECT\n w.settings['disable_search_indexing']::boolean as disable_search_indexing,\n CASE\n WHEN w.settings['disable_search_indexing']::boolean THEN\n FALSE\n ELSE\n EXISTS (SELECT 1 FROM af_collab_embeddings m WHERE m.partition_key = c.partition_key AND m.oid = c.oid)\n END as has_index\nFROM af_collab c\nJOIN af_workspace w ON c.workspace_id = w.workspace_id\nWHERE c.oid = $1", + "query": "\nSELECT\n w.settings['disable_search_indexing']::boolean as disable_search_indexing,\n CASE\n WHEN w.settings['disable_search_indexing']::boolean THEN\n FALSE\n ELSE\n EXISTS (SELECT 1 FROM af_collab_embeddings m WHERE m.partition_key = $3 AND m.oid = $2)\n END as has_index\nFROM af_workspace w\nWHERE w.workspace_id = $1", "describe": { "columns": [ { @@ -16,7 +16,9 @@ ], "parameters": { "Left": [ - "Text" + "Uuid", + "Text", + "Int4" ] }, "nullable": [ @@ -24,5 +26,5 @@ null ] }, - "hash": "7c0d6de28d557d83cf94faceef1216ea932ffe1bf29f8ed3dafeb6ddeea437fb" + "hash": "773aac7e401c3e6c04d1dc8ea412b9678b7227832a3487270d724f623072fe89" } diff --git a/libs/database/src/index/collab_embeddings_ops.rs b/libs/database/src/index/collab_embeddings_ops.rs index 79f53c32..a99da792 100644 --- a/libs/database/src/index/collab_embeddings_ops.rs +++ b/libs/database/src/index/collab_embeddings_ops.rs @@ -9,7 +9,9 @@ use database_entity::dto::AFCollabEmbeddingParams; pub async fn get_index_status( tx: &mut Transaction<'_, sqlx::Postgres>, - oid: &str, + workspace_id: &Uuid, + object_id: &str, + partition_key: i32, ) -> Result, sqlx::Error> { let result = sqlx::query!( r#" @@ -19,12 +21,13 @@ SELECT WHEN w.settings['disable_search_indexing']::boolean THEN FALSE ELSE - EXISTS (SELECT 1 FROM af_collab_embeddings m WHERE m.partition_key = c.partition_key AND m.oid = c.oid) + EXISTS (SELECT 1 FROM af_collab_embeddings m WHERE m.partition_key = $3 AND m.oid = $2) END as has_index -FROM af_collab c -JOIN af_workspace w ON c.workspace_id = w.workspace_id -WHERE c.oid = $1"#, - oid +FROM af_workspace w +WHERE w.workspace_id = $1"#, + workspace_id, + object_id, + partition_key ) .fetch_one(tx.deref_mut()) .await; @@ -37,8 +40,9 @@ WHERE c.oid = $1"#, }, Err(Error::RowNotFound) => { tracing::warn!( - "open-collab event for {} arrived before its workspace was created", - oid + "open-collab event for {}/{} arrived before its workspace was created", + workspace_id, + object_id ); Ok(Some(false)) }, diff --git a/services/appflowy-indexer/src/collab_handle.rs b/services/appflowy-indexer/src/collab_handle.rs index ce28e5f3..c1d8a6c1 100644 --- a/services/appflowy-indexer/src/collab_handle.rs +++ b/services/appflowy-indexer/src/collab_handle.rs @@ -51,7 +51,12 @@ impl CollabHandle { ingest_interval: Duration, ) -> Result> { let closing = CancellationToken::new(); - let was_indexed = match indexer.index_status(&object_id).await? { + let workspace_uuid = + Uuid::parse_str(&workspace_id).map_err(crate::error::Error::InvalidWorkspace)?; + let was_indexed = match indexer + .index_status(&workspace_uuid, &object_id, collab_type.clone()) + .await? + { IndexStatus::Indexed => true, IndexStatus::NotIndexed => false, IndexStatus::NotPermitted => { @@ -87,15 +92,13 @@ impl CollabHandle { if !messages.is_empty() { Self::handle_collab_updates(&mut update_stream, content.get_collab(), messages).await?; } - let workspace_id = - Uuid::parse_str(&workspace_id).map_err(crate::error::Error::InvalidWorkspace)?; let mut tasks = JoinSet::new(); tasks.spawn(Self::receive_collab_updates( update_stream, Arc::downgrade(&content), object_id.clone(), - workspace_id, + workspace_uuid, ingest_interval, closing.clone(), )); @@ -103,7 +106,7 @@ impl CollabHandle { content.changes(), indexer, object_id, - workspace_id, + workspace_uuid, ingest_interval, closing.clone(), )); diff --git a/services/appflowy-indexer/src/indexer.rs b/services/appflowy-indexer/src/indexer.rs index 7f450292..b39ff007 100644 --- a/services/appflowy-indexer/src/indexer.rs +++ b/services/appflowy-indexer/src/indexer.rs @@ -26,7 +26,12 @@ use crate::error::Result; #[async_trait] pub trait Indexer: Send + Sync { /// Check if document with given id has been already a corresponding index entry. - async fn index_status(&self, object_id: &str) -> Result; + async fn index_status( + &self, + workspace_id: &Uuid, + object_id: &str, + collab_type: CollabType, + ) -> Result; async fn update_index(&self, workspace_id: &Uuid, documents: Vec) -> Result<()>; async fn remove(&self, ids: &[FragmentID]) -> Result<()>; /// Returns a list of object ids, that have not been indexed yet. @@ -194,8 +199,19 @@ struct Embeddings { #[async_trait] impl Indexer for PostgresIndexer { - async fn index_status(&self, object_id: &str) -> Result { - let found = get_index_status(&mut self.db.begin().await?, object_id).await?; + async fn index_status( + &self, + workspace_id: &Uuid, + object_id: &str, + collab_type: CollabType, + ) -> Result { + let found = get_index_status( + &mut self.db.begin().await?, + workspace_id, + object_id, + collab_type as i32, + ) + .await?; match found { None => Ok(IndexStatus::NotPermitted), Some(true) => Ok(IndexStatus::Indexed),