Merge pull request #623 from AppFlowy-IO/indexer-missing-text-data

fix: index all of the document blocks
This commit is contained in:
Bartosz Sypytkowski 2024-06-14 13:30:01 +02:00 committed by GitHub
commit 6685b24239
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 86 additions and 70 deletions

View File

@ -58,12 +58,19 @@ impl StreamGroup {
/// Ensures the consumer group exists, creating it if necessary.
pub async fn ensure_consumer_group(&mut self) -> Result<(), StreamError> {
let _: RedisResult<()> = self
.connection_manager
//Use '$' if you want new messages or '0' to read from the beginning.
.xgroup_create_mkstream(&self.stream_key, &self.group_name, "0")
.await;
let result: RedisResult<()> = self
.connection_manager
//Use '$' if you want new messages or '0' to read from the beginning.
.xgroup_create_mkstream(&self.stream_key, &self.group_name, "0")
.await;
if let Err(e) = result {
tracing::warn!(
"error when creating consumer group `{}` `{}`: {:?}",
self.stream_key,
self.group_name,
e
);
}
Ok(())
}

View File

@ -17,8 +17,8 @@ use app_error::AppError;
use database_entity::dto::{CollabParams, QueryCollab, QueryCollabResult};
use crate::collab::queue_redis_ops::{
get_pending_meta, remove_all_pending_meta, remove_pending_meta, storage_cache_key, PendingWrite,
WritePriority, PENDING_WRITE_META_EXPIRE_SECS,
get_pending_meta, remove_pending_meta, storage_cache_key, PendingWrite, WritePriority,
PENDING_WRITE_META_EXPIRE_SECS,
};
use crate::collab::RedisSortedSet;
use crate::metrics::CollabMetrics;
@ -170,7 +170,8 @@ impl StorageQueue {
#[cfg(debug_assertions)]
pub async fn clear(&self) -> Result<(), AppError> {
self.pending_write_set.clear().await?;
remove_all_pending_meta(self.connection_manager.clone()).await?;
crate::collab::queue_redis_ops::remove_all_pending_meta(self.connection_manager.clone())
.await?;
Ok(())
}

View File

@ -8,6 +8,7 @@ use serde_repr::{Deserialize_repr, Serialize_repr};
pub(crate) const PENDING_WRITE_META_EXPIRE_SECS: u64 = 604800; // 7 days in seconds
#[allow(dead_code)]
pub(crate) async fn remove_all_pending_meta(
mut connection_manager: RedisConnectionManager,
) -> Result<(), AppError> {

View File

@ -178,8 +178,9 @@ impl CollabHandle {
Err(err) => tracing::error!("failed to decode update event: {}", err),
}
}
txn.commit();
}
} else {
tracing::warn!("failed to obtain a collab lock");
};
update_stream.ack_messages(&messages).await?;
Ok(())

View File

@ -123,12 +123,6 @@ impl OpenCollabConsumer {
let fragment = {
match &collab.collab_type {
CollabType::Document => {
tracing::trace!(
"indexing document {}/{}",
collab.workspace_id,
collab.object_id
);
let document = Document::from_doc_state(
CollabOrigin::Empty,
DataSource::DocStateV1(collab.collab.doc_state.to_vec()),
@ -137,6 +131,9 @@ impl OpenCollabConsumer {
)?;
let data = document.get_document_data()?;
let content = crate::extract::document_to_plain_text(&data);
if content.is_empty() {
return Ok(());
}
Fragment {
fragment_id: collab.object_id.clone(),
object_id: collab.object_id.clone(),
@ -156,6 +153,11 @@ impl OpenCollabConsumer {
},
}
};
tracing::trace!(
"indexing collab {}/{}",
collab.workspace_id,
collab.object_id
);
indexer
.update_index(&collab.workspace_id, vec![fragment])
.await?;
@ -397,7 +399,7 @@ mod test {
assert_ne!(contents.len(), 0);
let content: Option<String> = contents[0].get(0);
assert_eq!(content.as_deref(), Some("test-value\n"));
assert_eq!(content.as_deref(), Some("test-value "));
}
#[ignore]

View File

@ -8,24 +8,10 @@ pub fn document_to_plain_text(document: &DocumentData) -> String {
// do a depth-first scan of the document blocks
while let Some(block_id) = stack.pop() {
if let Some(block) = document.blocks.get(block_id) {
if block.external_type.as_deref() == Some("text") {
if let Some(text_id) = block.external_id.as_deref() {
if let Some(json) = text_map.get(text_id) {
match serde_json::from_str::<Vec<TextDelta>>(json) {
Ok(deltas) => {
for delta in deltas {
if let TextDelta::Inserted(text, _) = delta {
buf.push_str(&text);
}
}
},
Err(err) => {
tracing::error!("text_id `{}` is not a valid delta array: {}", text_id, err)
},
}
}
buf.push('\n');
}
if let Some(deltas) = get_delta_from_block_data(block) {
push_deltas_to_str(&mut buf, deltas);
} else if let Some(deltas) = get_delta_from_external_text_id(block, text_map) {
push_deltas_to_str(&mut buf, deltas);
}
if let Some(children) = document.meta.children_map.get(&block.children) {
// we want to process children blocks in the same order they are given in children_map
@ -36,9 +22,49 @@ pub fn document_to_plain_text(document: &DocumentData) -> String {
}
}
}
//tracing::trace!("Document plain text: `{}`", buf);
buf
}
/// Try to retrieve deltas from `block.data.delta`.
fn get_delta_from_block_data(block: &collab_document::blocks::Block) -> Option<Vec<TextDelta>> {
if let Some(delta) = block.data.get("delta") {
if let Ok(deltas) = serde_json::from_value::<Vec<TextDelta>>(delta.clone()) {
return Some(deltas);
}
}
None
}
/// Try to retrieve deltas from text_map's text associated with `block.external_id`.
fn get_delta_from_external_text_id(
block: &collab_document::blocks::Block,
text_map: &std::collections::HashMap<String, String>,
) -> Option<Vec<TextDelta>> {
if block.external_type.as_deref() == Some("text") {
if let Some(text_id) = block.external_id.as_deref() {
if let Some(json) = text_map.get(text_id) {
if let Ok(deltas) = serde_json::from_str::<Vec<TextDelta>>(json) {
return Some(deltas);
}
}
}
}
None
}
fn push_deltas_to_str(buf: &mut String, deltas: Vec<TextDelta>) {
for delta in deltas {
if let TextDelta::Inserted(text, _) = delta {
let trimmed = text.trim();
if !trimmed.is_empty() {
buf.push_str(&trimmed);
buf.push(' ');
}
}
}
}
#[cfg(test)]
mod test {
use crate::extract::document_to_plain_text;
@ -50,7 +76,7 @@ mod test {
fn document_plain_text() {
let doc = get_started_document_data().unwrap();
let text = document_to_plain_text(&doc);
let expected = "\nWelcome to AppFlowy!\nHere are the basics\nClick anywhere and just start typing.\nHighlight any text, and use the editing menu to style your writing however you like.\nAs soon as you type / a menu will pop up. Select different types of content blocks you can add.\nType / followed by /bullet or /num to create a list.\nClick + New Page button at the bottom of your sidebar to add a new page.\nClick + next to any page title in the sidebar to quickly add a new subpage, Document, Grid, or Kanban Board.\n\n\nKeyboard shortcuts, markdown, and code block\nKeyboard shortcuts guide\nMarkdown reference\nType /code to insert a code block\n// This is the main function.\nfn main() {\n // Print text to the console.\n println!(\"Hello World!\");\n}\n\nHave a question❓\nClick ? at the bottom right for help and support.\n\n\nLike AppFlowy? Follow us:\nGitHub\nTwitter: @appflowy\nNewsletter\n\n\n\n\n";
let expected = "Welcome to AppFlowy! Here are the basics Click anywhere and just start typing. Highlight any text, and use the editing menu to style your writing however you like. As soon as you type / a menu will pop up. Select different types of content blocks you can add. Type / followed by /bullet or /num to create a list. Click + New Page button at the bottom of your sidebar to add a new page. Click + next to any page title in the sidebar to quickly add a new subpage, Document , Grid , or Kanban Board . Keyboard shortcuts, markdown, and code block Keyboard shortcuts guide Markdown reference Type /code to insert a code block // This is the main function.\nfn main() {\n // Print text to the console.\n println!(\"Hello World!\");\n} Have a question❓ Click ? at the bottom right for help and support. Like AppFlowy? Follow us: GitHub Twitter : @appflowy Newsletter ";
assert_eq!(&text, expected);
}
@ -58,7 +84,7 @@ mod test {
fn document_plain_text_with_nested_blocks() {
let doc = get_initial_document_data().unwrap();
let text = document_to_plain_text(&doc);
let expected = "Welcome to AppFlowy!\nHere are the basics\nHere is H3\nClick anywhere and just start typing.\nClick Enter to create a new line.\nHighlight any text, and use the editing menu to style your writing however you like.\nAs soon as you type / a menu will pop up. Select different types of content blocks you can add.\nType / followed by /bullet or /num to create a list.\nClick + New Page button at the bottom of your sidebar to add a new page.\nClick + next to any page title in the sidebar to quickly add a new subpage, Document, Grid, or Kanban Board.\n\n\nKeyboard shortcuts, markdown, and code block\nKeyboard shortcuts guide\nMarkdown reference\nType /code to insert a code block\n// This is the main function.\nfn main() {\n // Print text to the console.\n println!(\"Hello World!\");\n}\n\nThis is a paragraph\nThis is a paragraph\nHave a question❓\nClick ? at the bottom right for help and support.\nThis is a paragraph\nThis is a paragraph\nClick ? at the bottom right for help and support.\n\n\nLike AppFlowy? Follow us:\nGitHub\nTwitter: @appflowy\nNewsletter\n\n\n\n\n";
let expected = "Welcome to AppFlowy! Here are the basics Here is H3 Click anywhere and just start typing. Click Enter to create a new line. Highlight any text, and use the editing menu to style your writing however you like. As soon as you type / a menu will pop up. Select different types of content blocks you can add. Type / followed by /bullet or /num to create a list. Click + New Page button at the bottom of your sidebar to add a new page. Click + next to any page title in the sidebar to quickly add a new subpage, Document , Grid , or Kanban Board . Keyboard shortcuts, markdown, and code block Keyboard shortcuts guide Markdown reference Type /code to insert a code block // This is the main function.\nfn main() {\n // Print text to the console.\n println!(\"Hello World!\");\n} This is a paragraph This is a paragraph Have a question❓ Click ? at the bottom right for help and support. This is a paragraph This is a paragraph Click ? at the bottom right for help and support. Like AppFlowy? Follow us: GitHub Twitter : @appflowy Newsletter ";
assert_eq!(&text, expected);
}
}

View File

@ -3,10 +3,8 @@ use std::sync::Arc;
use async_stream::stream;
use collab::core::collab::MutexCollab;
use collab_document::blocks::DeltaType;
use collab_document::document::Document;
use collab_entity::CollabType;
use dashmap::DashMap;
use database_entity::dto::EmbeddingContentType;
use futures::Stream;
use tokio::sync::watch::Sender;
@ -19,7 +17,7 @@ use crate::indexer::Fragment;
pub struct DocumentWatcher {
object_id: String,
content: Document,
receiver: tokio::sync::watch::Receiver<DashMap<String, DeltaType>>,
receiver: tokio::sync::watch::Receiver<u64>,
}
unsafe impl Send for DocumentWatcher {}
@ -31,7 +29,7 @@ impl DocumentWatcher {
mut content: Document,
index_initial_content: bool,
) -> Result<Self> {
let (tx, receiver) = tokio::sync::watch::channel(DashMap::new());
let (tx, receiver) = tokio::sync::watch::channel(0);
if index_initial_content {
Self::index_initial_content(&mut content, &tx)?;
}
@ -43,35 +41,16 @@ impl DocumentWatcher {
})
}
fn attach_listener(document: &mut Document, notifier: Sender<DashMap<String, DeltaType>>) {
document.subscribe_block_changed(move |blocks, _| {
let changes: Vec<_> = blocks
.iter()
.flat_map(|block| {
block
.iter()
.map(|payload| (payload.id.clone(), payload.command.clone()))
})
.collect();
notifier.send_modify(|map| {
for (id, command) in changes {
map.insert(id, command);
}
})
fn attach_listener(document: &mut Document, notifier: Sender<u64>) {
document.subscribe_block_changed(move |_, _| {
notifier.send_modify(|i| *i += 1);
});
}
fn index_initial_content(
document: &mut Document,
notifier: &Sender<DashMap<String, DeltaType>>,
) -> Result<()> {
fn index_initial_content(document: &mut Document, notifier: &Sender<u64>) -> Result<()> {
let data = document.get_document_data()?;
if let Some(text_map) = data.meta.text_map.as_ref() {
notifier.send_modify(|map| {
for text_id in text_map.keys() {
map.insert(text_id.clone(), DeltaType::Inserted);
}
});
if let Some(_) = data.meta.text_map.as_ref() {
notifier.send_modify(|i| *i += 1);
}
Ok(())
}
@ -83,7 +62,6 @@ impl DocumentWatcher {
Box::pin(stream! {
while let Ok(()) = receiver.changed().await {
if let Some(collab) = collab.upgrade() {
receiver.borrow().clear();
match Self::get_document_content(collab) {
Ok(content) => {
yield FragmentUpdate::Update(Fragment {
@ -169,7 +147,7 @@ mod test {
collab_type: CollabType::Document,
content_type: EmbeddingContentType::PlainText,
object_id: "o-1".to_string(),
content: "A\n".to_string(),
content: "A ".to_string(),
})
);
@ -186,7 +164,7 @@ mod test {
collab_type: CollabType::Document,
content_type: EmbeddingContentType::PlainText,
object_id: "o-1".to_string(),
content: "BA\n".to_string(),
content: "BA ".to_string(),
})
);
}