Merge pull request #650 from AppFlowy-IO/fix-indexing-order

fix: do not throw errors if indexer found document before workspace was created
This commit is contained in:
Bartosz Sypytkowski 2024-06-25 09:23:33 +02:00 committed by GitHub
commit 7d03e9cf85
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 77 additions and 30 deletions

View File

@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "\nSELECT\n w.settings['disable_search_indexing']::boolean as disable_search_indexing,\n CASE\n WHEN w.settings['disable_search_indexing']::boolean THEN\n FALSE\n ELSE\n EXISTS (SELECT 1 FROM af_collab_embeddings m WHERE m.partition_key = c.partition_key AND m.oid = c.oid)\n END as has_index\nFROM af_collab c\nJOIN af_workspace w ON c.workspace_id = w.workspace_id\nWHERE c.oid = $1",
"query": "\nSELECT\n w.settings['disable_search_indexing']::boolean as disable_search_indexing,\n CASE\n WHEN w.settings['disable_search_indexing']::boolean THEN\n FALSE\n ELSE\n EXISTS (SELECT 1 FROM af_collab_embeddings m WHERE m.partition_key = $3 AND m.oid = $2)\n END as has_index\nFROM af_workspace w\nWHERE w.workspace_id = $1",
"describe": {
"columns": [
{
@ -16,7 +16,9 @@
],
"parameters": {
"Left": [
"Text"
"Uuid",
"Text",
"Int4"
]
},
"nullable": [
@ -24,5 +26,5 @@
null
]
},
"hash": "7c0d6de28d557d83cf94faceef1216ea932ffe1bf29f8ed3dafeb6ddeea437fb"
"hash": "773aac7e401c3e6c04d1dc8ea412b9678b7227832a3487270d724f623072fe89"
}

View File

@ -2,14 +2,16 @@ use std::ops::DerefMut;
use collab_entity::CollabType;
use pgvector::Vector;
use sqlx::{Executor, Postgres, Transaction};
use sqlx::{Error, Executor, Postgres, Transaction};
use uuid::Uuid;
use database_entity::dto::AFCollabEmbeddingParams;
pub async fn get_index_status(
tx: &mut Transaction<'_, sqlx::Postgres>,
oid: &str,
workspace_id: &Uuid,
object_id: &str,
partition_key: i32,
) -> Result<Option<bool>, sqlx::Error> {
let result = sqlx::query!(
r#"
@ -19,19 +21,33 @@ SELECT
WHEN w.settings['disable_search_indexing']::boolean THEN
FALSE
ELSE
EXISTS (SELECT 1 FROM af_collab_embeddings m WHERE m.partition_key = c.partition_key AND m.oid = c.oid)
EXISTS (SELECT 1 FROM af_collab_embeddings m WHERE m.partition_key = $3 AND m.oid = $2)
END as has_index
FROM af_collab c
JOIN af_workspace w ON c.workspace_id = w.workspace_id
WHERE c.oid = $1"#,
oid
FROM af_workspace w
WHERE w.workspace_id = $1"#,
workspace_id,
object_id,
partition_key
)
.fetch_one(tx.deref_mut())
.await?;
if result.disable_search_indexing.unwrap_or(false) {
return Ok(None);
.await;
match result {
Ok(row) => {
if row.disable_search_indexing.unwrap_or(false) {
return Ok(None);
}
Ok(Some(row.has_index.unwrap_or(false)))
},
Err(Error::RowNotFound) => {
tracing::warn!(
"open-collab event for {}/{} arrived before its workspace was created",
workspace_id,
object_id
);
Ok(Some(false))
},
Err(e) => Err(e),
}
Ok(Some(result.has_index.unwrap_or(false)))
}
pub async fn upsert_collab_embeddings(

View File

@ -51,7 +51,12 @@ impl CollabHandle {
ingest_interval: Duration,
) -> Result<Option<Self>> {
let closing = CancellationToken::new();
let was_indexed = match indexer.index_status(&object_id).await? {
let workspace_uuid =
Uuid::parse_str(&workspace_id).map_err(crate::error::Error::InvalidWorkspace)?;
let was_indexed = match indexer
.index_status(&workspace_uuid, &object_id, collab_type.clone())
.await?
{
IndexStatus::Indexed => true,
IndexStatus::NotIndexed => false,
IndexStatus::NotPermitted => {
@ -87,15 +92,13 @@ impl CollabHandle {
if !messages.is_empty() {
Self::handle_collab_updates(&mut update_stream, content.get_collab(), messages).await?;
}
let workspace_id =
Uuid::parse_str(&workspace_id).map_err(crate::error::Error::InvalidWorkspace)?;
let mut tasks = JoinSet::new();
tasks.spawn(Self::receive_collab_updates(
update_stream,
Arc::downgrade(&content),
object_id.clone(),
workspace_id,
workspace_uuid,
ingest_interval,
closing.clone(),
));
@ -103,7 +106,7 @@ impl CollabHandle {
content.changes(),
indexer,
object_id,
workspace_id,
workspace_uuid,
ingest_interval,
closing.clone(),
));

View File

@ -240,8 +240,8 @@ impl OpenCollabConsumer {
Err(e) => {
tracing::error!(
"failed to open handle for {}/{}: {}",
object_id,
workspace_id,
object_id,
e
);
},
@ -427,13 +427,18 @@ mod test {
let document = Document::create_with_data(collab.clone(), doc_data).unwrap();
let encoded_collab = document.encode_collab().unwrap();
let _workspace_id = setup_collab(&db, uid, object_id, &encoded_collab).await;
let workspace_id = setup_collab(&db, uid, object_id, &encoded_collab).await;
{
let mut tx = db.begin().await.unwrap();
let status = get_index_status(&mut tx, &object_id.to_string())
.await
.unwrap();
let status = get_index_status(
&mut tx,
&workspace_id,
&object_id.to_string(),
CollabType::Document as i32,
)
.await
.unwrap();
assert_eq!(
status,
Some(false),
@ -455,9 +460,14 @@ mod test {
{
let mut tx = db.begin().await.unwrap();
let status = get_index_status(&mut tx, &object_id.to_string())
.await
.unwrap();
let status = get_index_status(
&mut tx,
&workspace_id,
&object_id.to_string(),
CollabType::Document as i32,
)
.await
.unwrap();
assert_eq!(status, Some(true), "collab should be indexed after start");
}
}

View File

@ -26,7 +26,12 @@ use crate::error::Result;
#[async_trait]
pub trait Indexer: Send + Sync {
/// Check if document with given id has been already a corresponding index entry.
async fn index_status(&self, object_id: &str) -> Result<IndexStatus>;
async fn index_status(
&self,
workspace_id: &Uuid,
object_id: &str,
collab_type: CollabType,
) -> Result<IndexStatus>;
async fn update_index(&self, workspace_id: &Uuid, documents: Vec<Fragment>) -> Result<()>;
async fn remove(&self, ids: &[FragmentID]) -> Result<()>;
/// Returns a list of object ids, that have not been indexed yet.
@ -194,8 +199,19 @@ struct Embeddings {
#[async_trait]
impl Indexer for PostgresIndexer {
async fn index_status(&self, object_id: &str) -> Result<IndexStatus> {
let found = get_index_status(&mut self.db.begin().await?, object_id).await?;
async fn index_status(
&self,
workspace_id: &Uuid,
object_id: &str,
collab_type: CollabType,
) -> Result<IndexStatus> {
let found = get_index_status(
&mut self.db.begin().await?,
workspace_id,
object_id,
collab_type as i32,
)
.await?;
match found {
None => Ok(IndexStatus::NotPermitted),
Some(true) => Ok(IndexStatus::Indexed),