chore: appflowy indexer - do not break the document handler on missing data

This commit is contained in:
Bartosz Sypytkowski 2024-06-26 07:18:28 +02:00
parent 7d03e9cf85
commit 9b308bceaa
2 changed files with 57 additions and 36 deletions

View File

@ -7,8 +7,7 @@ use collab::core::collab::TransactionMutExt;
use collab::core::collab::{DataSource, MutexCollab};
use collab::core::origin::CollabOrigin;
use collab::preclude::updates::decoder::Decode;
use collab::preclude::Update;
use collab_document::document::Document;
use collab::preclude::{Collab, Update};
use collab_entity::CollabType;
use futures::{Stream, StreamExt};
use tokio::select;
@ -70,13 +69,14 @@ impl CollabHandle {
};
let content: Arc<dyn Indexable> = match collab_type {
CollabType::Document => {
let content = Document::from_doc_state(
let collab = Arc::new(MutexCollab::new(Collab::new_with_source(
CollabOrigin::Empty,
DataSource::DocStateV1(doc_state),
&object_id,
DataSource::DocStateV1(doc_state),
vec![],
)?;
let watcher = DocumentWatcher::new(object_id.clone(), content, !was_indexed)?;
false,
)?));
let watcher = DocumentWatcher::new(object_id.clone(), collab, !was_indexed)?;
Arc::new(watcher)
},
_ => return Ok(None),

View File

@ -8,6 +8,7 @@ use collab_entity::CollabType;
use database_entity::dto::EmbeddingContentType;
use futures::Stream;
use tokio::sync::watch::Sender;
use yrs::Subscription;
use crate::collab_handle::{FragmentUpdate, Indexable};
use crate::error::Result;
@ -16,8 +17,10 @@ use crate::indexer::Fragment;
pub struct DocumentWatcher {
object_id: String,
content: Document,
content: Arc<MutexCollab>,
receiver: tokio::sync::watch::Receiver<u64>,
#[allow(dead_code)]
update_observer: Subscription,
}
unsafe impl Send for DocumentWatcher {}
@ -26,38 +29,54 @@ unsafe impl Sync for DocumentWatcher {}
impl DocumentWatcher {
pub fn new(
object_id: String,
mut content: Document,
content: Arc<MutexCollab>,
index_initial_content: bool,
) -> Result<Self> {
let (tx, receiver) = tokio::sync::watch::channel(0);
if index_initial_content {
Self::index_initial_content(&mut content, &tx)?;
Self::index_initial_content(content.clone(), &tx)?;
}
Self::attach_listener(&mut content, tx);
let update_observer = Self::attach_listener(&content, tx);
Ok(Self {
object_id,
content,
receiver,
update_observer,
})
}
fn attach_listener(document: &mut Document, notifier: Sender<u64>) {
document.subscribe_block_changed(move |_, _| {
notifier.send_modify(|i| *i += 1);
});
fn attach_listener(document: &Arc<MutexCollab>, notifier: Sender<u64>) -> Subscription {
let lock = document.lock();
lock
.get_doc()
.observe_update_v1(move |_, _| {
notifier.send_modify(|i| *i += 1);
})
.unwrap()
}
fn index_initial_content(document: &mut Document, notifier: &Sender<u64>) -> Result<()> {
let data = document.get_document_data()?;
if data.meta.text_map.as_ref().is_some() {
notifier.send_modify(|i| *i += 1);
fn index_initial_content(collab: Arc<MutexCollab>, notifier: &Sender<u64>) -> Result<()> {
match Document::open(collab.clone()) {
Ok(document) => match document.get_document_data() {
Ok(data) => {
if data.meta.text_map.as_ref().is_some() {
notifier.send_modify(|i| *i += 1);
}
},
Err(err) => {
tracing::warn!("Failed to get document data: {}", err);
},
},
Err(err) => {
tracing::warn!("Failed to open document: {}", err);
},
}
Ok(())
}
pub fn as_stream(&self) -> Pin<Box<dyn Stream<Item = FragmentUpdate> + Send + Sync>> {
let mut receiver = self.receiver.clone();
let collab = Arc::downgrade(self.content.get_collab());
let collab = Arc::downgrade(&self.content);
let object_id = self.object_id.clone();
Box::pin(stream! {
while let Ok(()) = receiver.changed().await {
@ -91,7 +110,7 @@ impl DocumentWatcher {
impl Indexable for DocumentWatcher {
fn get_collab(&self) -> &MutexCollab {
self.content.get_collab()
&self.content
}
fn changes(&self) -> Pin<Box<dyn Stream<Item = FragmentUpdate> + Send + Sync>> {
@ -102,13 +121,15 @@ impl Indexable for DocumentWatcher {
#[cfg(test)]
mod test {
use crate::indexer::Fragment;
use collab::core::collab::DataSource;
use collab::core::collab::{DataSource, MutexCollab};
use collab::core::origin::CollabOrigin;
use collab::preclude::Collab;
use collab_document::document::Document;
use collab_document::document_data::default_document_collab_data;
use collab_entity::CollabType;
use database_entity::dto::EmbeddingContentType;
use serde_json::json;
use std::sync::Arc;
use tokio_stream::StreamExt;
use crate::watchers::DocumentWatcher;
@ -118,24 +139,26 @@ mod test {
let _ = env_logger::builder().is_test(true).try_init();
let encoded_collab = default_document_collab_data("o-1").unwrap();
let doc = Document::from_doc_state(
CollabOrigin::Empty,
DataSource::DocStateV1(encoded_collab.doc_state.into()),
"o-1",
vec![],
)
.unwrap();
let collab = Arc::new(MutexCollab::new(
Collab::new_with_source(
CollabOrigin::Empty,
"o-1",
DataSource::DocStateV1(encoded_collab.doc_state.into()),
vec![],
false,
)
.unwrap(),
));
let doc = Document::open(collab.clone()).unwrap();
let watcher = DocumentWatcher::new("o-1".to_string(), doc, true).unwrap();
let watcher = DocumentWatcher::new("o-1".to_string(), collab, true).unwrap();
let mut stream = watcher.as_stream();
let text_id = {
// modify the text block for the first time
let block = get_first_text_block(&watcher.content);
let block = get_first_text_block(&doc);
let text_id = block.external_id.clone().unwrap();
watcher
.content
.apply_text_delta(&text_id, json!([{"insert": "A"}]).to_string());
doc.apply_text_delta(&text_id, json!([{"insert": "A"}]).to_string());
text_id
};
@ -152,9 +175,7 @@ mod test {
);
// modify text block again, we expect to have a cumulative content of the block
watcher
.content
.apply_text_delta(&text_id, json!([{"insert": "B"}]).to_string());
doc.apply_text_delta(&text_id, json!([{"insert": "B"}]).to_string());
let update = stream.next().await.unwrap();
assert_eq!(