Merge pull request #653 from AppFlowy-IO/lazy-indexer

chore: appflowy indexer - do not break the document handler on missing data
This commit is contained in:
Bartosz Sypytkowski 2024-06-26 07:50:25 +02:00 committed by GitHub
commit c0bca1cb8c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 57 additions and 36 deletions

View File

@ -7,8 +7,7 @@ use collab::core::collab::TransactionMutExt;
use collab::core::collab::{DataSource, MutexCollab}; use collab::core::collab::{DataSource, MutexCollab};
use collab::core::origin::CollabOrigin; use collab::core::origin::CollabOrigin;
use collab::preclude::updates::decoder::Decode; use collab::preclude::updates::decoder::Decode;
use collab::preclude::Update; use collab::preclude::{Collab, Update};
use collab_document::document::Document;
use collab_entity::CollabType; use collab_entity::CollabType;
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use tokio::select; use tokio::select;
@ -70,13 +69,14 @@ impl CollabHandle {
}; };
let content: Arc<dyn Indexable> = match collab_type { let content: Arc<dyn Indexable> = match collab_type {
CollabType::Document => { CollabType::Document => {
let content = Document::from_doc_state( let collab = Arc::new(MutexCollab::new(Collab::new_with_source(
CollabOrigin::Empty, CollabOrigin::Empty,
DataSource::DocStateV1(doc_state),
&object_id, &object_id,
DataSource::DocStateV1(doc_state),
vec![], vec![],
)?; false,
let watcher = DocumentWatcher::new(object_id.clone(), content, !was_indexed)?; )?));
let watcher = DocumentWatcher::new(object_id.clone(), collab, !was_indexed)?;
Arc::new(watcher) Arc::new(watcher)
}, },
_ => return Ok(None), _ => return Ok(None),

View File

@ -8,6 +8,7 @@ use collab_entity::CollabType;
use database_entity::dto::EmbeddingContentType; use database_entity::dto::EmbeddingContentType;
use futures::Stream; use futures::Stream;
use tokio::sync::watch::Sender; use tokio::sync::watch::Sender;
use yrs::Subscription;
use crate::collab_handle::{FragmentUpdate, Indexable}; use crate::collab_handle::{FragmentUpdate, Indexable};
use crate::error::Result; use crate::error::Result;
@ -16,8 +17,10 @@ use crate::indexer::Fragment;
pub struct DocumentWatcher { pub struct DocumentWatcher {
object_id: String, object_id: String,
content: Document, content: Arc<MutexCollab>,
receiver: tokio::sync::watch::Receiver<u64>, receiver: tokio::sync::watch::Receiver<u64>,
#[allow(dead_code)]
update_observer: Subscription,
} }
unsafe impl Send for DocumentWatcher {} unsafe impl Send for DocumentWatcher {}
@ -26,38 +29,54 @@ unsafe impl Sync for DocumentWatcher {}
impl DocumentWatcher { impl DocumentWatcher {
pub fn new( pub fn new(
object_id: String, object_id: String,
mut content: Document, content: Arc<MutexCollab>,
index_initial_content: bool, index_initial_content: bool,
) -> Result<Self> { ) -> Result<Self> {
let (tx, receiver) = tokio::sync::watch::channel(0); let (tx, receiver) = tokio::sync::watch::channel(0);
if index_initial_content { if index_initial_content {
Self::index_initial_content(&mut content, &tx)?; Self::index_initial_content(content.clone(), &tx)?;
} }
Self::attach_listener(&mut content, tx); let update_observer = Self::attach_listener(&content, tx);
Ok(Self { Ok(Self {
object_id, object_id,
content, content,
receiver, receiver,
update_observer,
}) })
} }
fn attach_listener(document: &mut Document, notifier: Sender<u64>) { fn attach_listener(document: &Arc<MutexCollab>, notifier: Sender<u64>) -> Subscription {
document.subscribe_block_changed(move |_, _| { let lock = document.lock();
notifier.send_modify(|i| *i += 1); lock
}); .get_doc()
.observe_update_v1(move |_, _| {
notifier.send_modify(|i| *i += 1);
})
.unwrap()
} }
fn index_initial_content(document: &mut Document, notifier: &Sender<u64>) -> Result<()> { fn index_initial_content(collab: Arc<MutexCollab>, notifier: &Sender<u64>) -> Result<()> {
let data = document.get_document_data()?; match Document::open(collab.clone()) {
if data.meta.text_map.as_ref().is_some() { Ok(document) => match document.get_document_data() {
notifier.send_modify(|i| *i += 1); Ok(data) => {
if data.meta.text_map.as_ref().is_some() {
notifier.send_modify(|i| *i += 1);
}
},
Err(err) => {
tracing::warn!("Failed to get document data: {}", err);
},
},
Err(err) => {
tracing::warn!("Failed to open document: {}", err);
},
} }
Ok(()) Ok(())
} }
pub fn as_stream(&self) -> Pin<Box<dyn Stream<Item = FragmentUpdate> + Send + Sync>> { pub fn as_stream(&self) -> Pin<Box<dyn Stream<Item = FragmentUpdate> + Send + Sync>> {
let mut receiver = self.receiver.clone(); let mut receiver = self.receiver.clone();
let collab = Arc::downgrade(self.content.get_collab()); let collab = Arc::downgrade(&self.content);
let object_id = self.object_id.clone(); let object_id = self.object_id.clone();
Box::pin(stream! { Box::pin(stream! {
while let Ok(()) = receiver.changed().await { while let Ok(()) = receiver.changed().await {
@ -91,7 +110,7 @@ impl DocumentWatcher {
impl Indexable for DocumentWatcher { impl Indexable for DocumentWatcher {
fn get_collab(&self) -> &MutexCollab { fn get_collab(&self) -> &MutexCollab {
self.content.get_collab() &self.content
} }
fn changes(&self) -> Pin<Box<dyn Stream<Item = FragmentUpdate> + Send + Sync>> { fn changes(&self) -> Pin<Box<dyn Stream<Item = FragmentUpdate> + Send + Sync>> {
@ -102,13 +121,15 @@ impl Indexable for DocumentWatcher {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use crate::indexer::Fragment; use crate::indexer::Fragment;
use collab::core::collab::DataSource; use collab::core::collab::{DataSource, MutexCollab};
use collab::core::origin::CollabOrigin; use collab::core::origin::CollabOrigin;
use collab::preclude::Collab;
use collab_document::document::Document; use collab_document::document::Document;
use collab_document::document_data::default_document_collab_data; use collab_document::document_data::default_document_collab_data;
use collab_entity::CollabType; use collab_entity::CollabType;
use database_entity::dto::EmbeddingContentType; use database_entity::dto::EmbeddingContentType;
use serde_json::json; use serde_json::json;
use std::sync::Arc;
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use crate::watchers::DocumentWatcher; use crate::watchers::DocumentWatcher;
@ -118,24 +139,26 @@ mod test {
let _ = env_logger::builder().is_test(true).try_init(); let _ = env_logger::builder().is_test(true).try_init();
let encoded_collab = default_document_collab_data("o-1").unwrap(); let encoded_collab = default_document_collab_data("o-1").unwrap();
let doc = Document::from_doc_state( let collab = Arc::new(MutexCollab::new(
CollabOrigin::Empty, Collab::new_with_source(
DataSource::DocStateV1(encoded_collab.doc_state.into()), CollabOrigin::Empty,
"o-1", "o-1",
vec![], DataSource::DocStateV1(encoded_collab.doc_state.into()),
) vec![],
.unwrap(); false,
)
.unwrap(),
));
let doc = Document::open(collab.clone()).unwrap();
let watcher = DocumentWatcher::new("o-1".to_string(), doc, true).unwrap(); let watcher = DocumentWatcher::new("o-1".to_string(), collab, true).unwrap();
let mut stream = watcher.as_stream(); let mut stream = watcher.as_stream();
let text_id = { let text_id = {
// modify the text block for the first time // modify the text block for the first time
let block = get_first_text_block(&watcher.content); let block = get_first_text_block(&doc);
let text_id = block.external_id.clone().unwrap(); let text_id = block.external_id.clone().unwrap();
watcher doc.apply_text_delta(&text_id, json!([{"insert": "A"}]).to_string());
.content
.apply_text_delta(&text_id, json!([{"insert": "A"}]).to_string());
text_id text_id
}; };
@ -152,9 +175,7 @@ mod test {
); );
// modify text block again, we expect to have a cumulative content of the block // modify text block again, we expect to have a cumulative content of the block
watcher doc.apply_text_delta(&text_id, json!([{"insert": "B"}]).to_string());
.content
.apply_text_delta(&text_id, json!([{"insert": "B"}]).to_string());
let update = stream.next().await.unwrap(); let update = stream.next().await.unwrap();
assert_eq!( assert_eq!(