From 9b308bceaabda5c0405040fef7806150132c44fa Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Wed, 26 Jun 2024 07:18:28 +0200 Subject: [PATCH] chore: appflowy indexer - do not break the document handler on missing data --- .../appflowy-indexer/src/collab_handle.rs | 12 +-- .../src/watchers/document_watcher.rs | 81 ++++++++++++------- 2 files changed, 57 insertions(+), 36 deletions(-) diff --git a/services/appflowy-indexer/src/collab_handle.rs b/services/appflowy-indexer/src/collab_handle.rs index c1d8a6c1..ce7753ef 100644 --- a/services/appflowy-indexer/src/collab_handle.rs +++ b/services/appflowy-indexer/src/collab_handle.rs @@ -7,8 +7,7 @@ use collab::core::collab::TransactionMutExt; use collab::core::collab::{DataSource, MutexCollab}; use collab::core::origin::CollabOrigin; use collab::preclude::updates::decoder::Decode; -use collab::preclude::Update; -use collab_document::document::Document; +use collab::preclude::{Collab, Update}; use collab_entity::CollabType; use futures::{Stream, StreamExt}; use tokio::select; @@ -70,13 +69,14 @@ impl CollabHandle { }; let content: Arc = match collab_type { CollabType::Document => { - let content = Document::from_doc_state( + let collab = Arc::new(MutexCollab::new(Collab::new_with_source( CollabOrigin::Empty, - DataSource::DocStateV1(doc_state), &object_id, + DataSource::DocStateV1(doc_state), vec![], - )?; - let watcher = DocumentWatcher::new(object_id.clone(), content, !was_indexed)?; + false, + )?)); + let watcher = DocumentWatcher::new(object_id.clone(), collab, !was_indexed)?; Arc::new(watcher) }, _ => return Ok(None), diff --git a/services/appflowy-indexer/src/watchers/document_watcher.rs b/services/appflowy-indexer/src/watchers/document_watcher.rs index 3c937ba0..d8737f22 100644 --- a/services/appflowy-indexer/src/watchers/document_watcher.rs +++ b/services/appflowy-indexer/src/watchers/document_watcher.rs @@ -8,6 +8,7 @@ use collab_entity::CollabType; use database_entity::dto::EmbeddingContentType; use futures::Stream; use tokio::sync::watch::Sender; +use yrs::Subscription; use crate::collab_handle::{FragmentUpdate, Indexable}; use crate::error::Result; @@ -16,8 +17,10 @@ use crate::indexer::Fragment; pub struct DocumentWatcher { object_id: String, - content: Document, + content: Arc, receiver: tokio::sync::watch::Receiver, + #[allow(dead_code)] + update_observer: Subscription, } unsafe impl Send for DocumentWatcher {} @@ -26,38 +29,54 @@ unsafe impl Sync for DocumentWatcher {} impl DocumentWatcher { pub fn new( object_id: String, - mut content: Document, + content: Arc, index_initial_content: bool, ) -> Result { let (tx, receiver) = tokio::sync::watch::channel(0); if index_initial_content { - Self::index_initial_content(&mut content, &tx)?; + Self::index_initial_content(content.clone(), &tx)?; } - Self::attach_listener(&mut content, tx); + let update_observer = Self::attach_listener(&content, tx); Ok(Self { object_id, content, receiver, + update_observer, }) } - fn attach_listener(document: &mut Document, notifier: Sender) { - document.subscribe_block_changed(move |_, _| { - notifier.send_modify(|i| *i += 1); - }); + fn attach_listener(document: &Arc, notifier: Sender) -> Subscription { + let lock = document.lock(); + lock + .get_doc() + .observe_update_v1(move |_, _| { + notifier.send_modify(|i| *i += 1); + }) + .unwrap() } - fn index_initial_content(document: &mut Document, notifier: &Sender) -> Result<()> { - let data = document.get_document_data()?; - if data.meta.text_map.as_ref().is_some() { - notifier.send_modify(|i| *i += 1); + fn index_initial_content(collab: Arc, notifier: &Sender) -> Result<()> { + match Document::open(collab.clone()) { + Ok(document) => match document.get_document_data() { + Ok(data) => { + if data.meta.text_map.as_ref().is_some() { + notifier.send_modify(|i| *i += 1); + } + }, + Err(err) => { + tracing::warn!("Failed to get document data: {}", err); + }, + }, + Err(err) => { + tracing::warn!("Failed to open document: {}", err); + }, } Ok(()) } pub fn as_stream(&self) -> Pin + Send + Sync>> { let mut receiver = self.receiver.clone(); - let collab = Arc::downgrade(self.content.get_collab()); + let collab = Arc::downgrade(&self.content); let object_id = self.object_id.clone(); Box::pin(stream! { while let Ok(()) = receiver.changed().await { @@ -91,7 +110,7 @@ impl DocumentWatcher { impl Indexable for DocumentWatcher { fn get_collab(&self) -> &MutexCollab { - self.content.get_collab() + &self.content } fn changes(&self) -> Pin + Send + Sync>> { @@ -102,13 +121,15 @@ impl Indexable for DocumentWatcher { #[cfg(test)] mod test { use crate::indexer::Fragment; - use collab::core::collab::DataSource; + use collab::core::collab::{DataSource, MutexCollab}; use collab::core::origin::CollabOrigin; + use collab::preclude::Collab; use collab_document::document::Document; use collab_document::document_data::default_document_collab_data; use collab_entity::CollabType; use database_entity::dto::EmbeddingContentType; use serde_json::json; + use std::sync::Arc; use tokio_stream::StreamExt; use crate::watchers::DocumentWatcher; @@ -118,24 +139,26 @@ mod test { let _ = env_logger::builder().is_test(true).try_init(); let encoded_collab = default_document_collab_data("o-1").unwrap(); - let doc = Document::from_doc_state( - CollabOrigin::Empty, - DataSource::DocStateV1(encoded_collab.doc_state.into()), - "o-1", - vec![], - ) - .unwrap(); + let collab = Arc::new(MutexCollab::new( + Collab::new_with_source( + CollabOrigin::Empty, + "o-1", + DataSource::DocStateV1(encoded_collab.doc_state.into()), + vec![], + false, + ) + .unwrap(), + )); + let doc = Document::open(collab.clone()).unwrap(); - let watcher = DocumentWatcher::new("o-1".to_string(), doc, true).unwrap(); + let watcher = DocumentWatcher::new("o-1".to_string(), collab, true).unwrap(); let mut stream = watcher.as_stream(); let text_id = { // modify the text block for the first time - let block = get_first_text_block(&watcher.content); + let block = get_first_text_block(&doc); let text_id = block.external_id.clone().unwrap(); - watcher - .content - .apply_text_delta(&text_id, json!([{"insert": "A"}]).to_string()); + doc.apply_text_delta(&text_id, json!([{"insert": "A"}]).to_string()); text_id }; @@ -152,9 +175,7 @@ mod test { ); // modify text block again, we expect to have a cumulative content of the block - watcher - .content - .apply_text_delta(&text_id, json!([{"insert": "B"}]).to_string()); + doc.apply_text_delta(&text_id, json!([{"insert": "B"}]).to_string()); let update = stream.next().await.unwrap(); assert_eq!(