From 7ad105ee34c125c3afa763cb73a66cc154638bd9 Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Fri, 14 Jun 2024 11:23:19 +0200 Subject: [PATCH 1/2] fix: index all of the document blocks --- libs/collab-stream/src/stream_group.rs | 19 ++++++---- .../appflowy-collaborate/src/collab/queue.rs | 7 ++-- .../src/collab/queue_redis_ops.rs | 1 + .../appflowy-indexer/src/collab_handle.rs | 5 +-- services/appflowy-indexer/src/consumer.rs | 14 ++++---- services/appflowy-indexer/src/extract.rs | 25 ++++++------- .../src/watchers/document_watcher.rs | 36 +++++-------------- 7 files changed, 47 insertions(+), 60 deletions(-) diff --git a/libs/collab-stream/src/stream_group.rs b/libs/collab-stream/src/stream_group.rs index 6d67ed31..e962a0d2 100644 --- a/libs/collab-stream/src/stream_group.rs +++ b/libs/collab-stream/src/stream_group.rs @@ -58,12 +58,19 @@ impl StreamGroup { /// Ensures the consumer group exists, creating it if necessary. pub async fn ensure_consumer_group(&mut self) -> Result<(), StreamError> { - let _: RedisResult<()> = self - .connection_manager - //Use '$' if you want new messages or '0' to read from the beginning. - .xgroup_create_mkstream(&self.stream_key, &self.group_name, "0") - .await; - + let result: RedisResult<()> = self + .connection_manager + //Use '$' if you want new messages or '0' to read from the beginning. + .xgroup_create_mkstream(&self.stream_key, &self.group_name, "0") + .await; + if let Err(e) = result { + tracing::warn!( + "error when creating consumer group `{}` `{}`: {:?}", + self.stream_key, + self.group_name, + e + ); + } Ok(()) } diff --git a/services/appflowy-collaborate/src/collab/queue.rs b/services/appflowy-collaborate/src/collab/queue.rs index 3f0c2e6d..951fc151 100644 --- a/services/appflowy-collaborate/src/collab/queue.rs +++ b/services/appflowy-collaborate/src/collab/queue.rs @@ -17,8 +17,8 @@ use app_error::AppError; use database_entity::dto::{CollabParams, QueryCollab, QueryCollabResult}; use crate::collab::queue_redis_ops::{ - get_pending_meta, remove_all_pending_meta, remove_pending_meta, storage_cache_key, PendingWrite, - WritePriority, PENDING_WRITE_META_EXPIRE_SECS, + get_pending_meta, remove_pending_meta, storage_cache_key, PendingWrite, WritePriority, + PENDING_WRITE_META_EXPIRE_SECS, }; use crate::collab::RedisSortedSet; use crate::metrics::CollabMetrics; @@ -170,7 +170,8 @@ impl StorageQueue { #[cfg(debug_assertions)] pub async fn clear(&self) -> Result<(), AppError> { self.pending_write_set.clear().await?; - remove_all_pending_meta(self.connection_manager.clone()).await?; + crate::collab::queue_redis_ops::remove_all_pending_meta(self.connection_manager.clone()) + .await?; Ok(()) } diff --git a/services/appflowy-collaborate/src/collab/queue_redis_ops.rs b/services/appflowy-collaborate/src/collab/queue_redis_ops.rs index 9477e371..e214768c 100644 --- a/services/appflowy-collaborate/src/collab/queue_redis_ops.rs +++ b/services/appflowy-collaborate/src/collab/queue_redis_ops.rs @@ -8,6 +8,7 @@ use serde_repr::{Deserialize_repr, Serialize_repr}; pub(crate) const PENDING_WRITE_META_EXPIRE_SECS: u64 = 604800; // 7 days in seconds +#[allow(dead_code)] pub(crate) async fn remove_all_pending_meta( mut connection_manager: RedisConnectionManager, ) -> Result<(), AppError> { diff --git a/services/appflowy-indexer/src/collab_handle.rs b/services/appflowy-indexer/src/collab_handle.rs index 122d0b3a..c33b1017 100644 --- a/services/appflowy-indexer/src/collab_handle.rs +++ b/services/appflowy-indexer/src/collab_handle.rs @@ -178,8 +178,9 @@ impl CollabHandle { Err(err) => tracing::error!("failed to decode update event: {}", err), } } - txn.commit(); - } + } else { + tracing::warn!("failed to obtain a collab lock"); + }; update_stream.ack_messages(&messages).await?; Ok(()) diff --git a/services/appflowy-indexer/src/consumer.rs b/services/appflowy-indexer/src/consumer.rs index d4f0d415..c33c2e81 100644 --- a/services/appflowy-indexer/src/consumer.rs +++ b/services/appflowy-indexer/src/consumer.rs @@ -123,12 +123,6 @@ impl OpenCollabConsumer { let fragment = { match &collab.collab_type { CollabType::Document => { - tracing::trace!( - "indexing document {}/{}", - collab.workspace_id, - collab.object_id - ); - let document = Document::from_doc_state( CollabOrigin::Empty, DataSource::DocStateV1(collab.collab.doc_state.to_vec()), @@ -137,6 +131,9 @@ impl OpenCollabConsumer { )?; let data = document.get_document_data()?; let content = crate::extract::document_to_plain_text(&data); + if content.is_empty() { + return Ok(()); + } Fragment { fragment_id: collab.object_id.clone(), object_id: collab.object_id.clone(), @@ -156,6 +153,11 @@ impl OpenCollabConsumer { }, } }; + tracing::trace!( + "indexing collab {}/{}", + collab.workspace_id, + collab.object_id + ); indexer .update_index(&collab.workspace_id, vec![fragment]) .await?; diff --git a/services/appflowy-indexer/src/extract.rs b/services/appflowy-indexer/src/extract.rs index b703a4bb..a4b22f93 100644 --- a/services/appflowy-indexer/src/extract.rs +++ b/services/appflowy-indexer/src/extract.rs @@ -8,23 +8,17 @@ pub fn document_to_plain_text(document: &DocumentData) -> String { // do a depth-first scan of the document blocks while let Some(block_id) = stack.pop() { if let Some(block) = document.blocks.get(block_id) { - if block.external_type.as_deref() == Some("text") { - if let Some(text_id) = block.external_id.as_deref() { - if let Some(json) = text_map.get(text_id) { - match serde_json::from_str::>(json) { - Ok(deltas) => { - for delta in deltas { - if let TextDelta::Inserted(text, _) = delta { - buf.push_str(&text); - } - } - }, - Err(err) => { - tracing::error!("text_id `{}` is not a valid delta array: {}", text_id, err) - }, + if let Some(delta) = block.data.get("delta") { + if let Ok(deltas) = serde_json::from_value::>(delta.clone()) { + for delta in deltas { + if let TextDelta::Inserted(text, _) = delta { + let trimmed = text.trim(); + if !trimmed.is_empty() { + buf.push_str(&trimmed); + buf.push(' '); + } } } - buf.push('\n'); } } if let Some(children) = document.meta.children_map.get(&block.children) { @@ -36,6 +30,7 @@ pub fn document_to_plain_text(document: &DocumentData) -> String { } } } + //tracing::trace!("Document plain text: `{}`", buf); buf } diff --git a/services/appflowy-indexer/src/watchers/document_watcher.rs b/services/appflowy-indexer/src/watchers/document_watcher.rs index 17c72f8b..2ad91968 100644 --- a/services/appflowy-indexer/src/watchers/document_watcher.rs +++ b/services/appflowy-indexer/src/watchers/document_watcher.rs @@ -19,7 +19,7 @@ use crate::indexer::Fragment; pub struct DocumentWatcher { object_id: String, content: Document, - receiver: tokio::sync::watch::Receiver>, + receiver: tokio::sync::watch::Receiver, } unsafe impl Send for DocumentWatcher {} @@ -31,7 +31,7 @@ impl DocumentWatcher { mut content: Document, index_initial_content: bool, ) -> Result { - let (tx, receiver) = tokio::sync::watch::channel(DashMap::new()); + let (tx, receiver) = tokio::sync::watch::channel(0); if index_initial_content { Self::index_initial_content(&mut content, &tx)?; } @@ -43,35 +43,16 @@ impl DocumentWatcher { }) } - fn attach_listener(document: &mut Document, notifier: Sender>) { - document.subscribe_block_changed(move |blocks, _| { - let changes: Vec<_> = blocks - .iter() - .flat_map(|block| { - block - .iter() - .map(|payload| (payload.id.clone(), payload.command.clone())) - }) - .collect(); - notifier.send_modify(|map| { - for (id, command) in changes { - map.insert(id, command); - } - }) + fn attach_listener(document: &mut Document, notifier: Sender) { + document.subscribe_block_changed(move |_, _| { + notifier.send_modify(|i| *i += 1); }); } - fn index_initial_content( - document: &mut Document, - notifier: &Sender>, - ) -> Result<()> { + fn index_initial_content(document: &mut Document, notifier: &Sender) -> Result<()> { let data = document.get_document_data()?; - if let Some(text_map) = data.meta.text_map.as_ref() { - notifier.send_modify(|map| { - for text_id in text_map.keys() { - map.insert(text_id.clone(), DeltaType::Inserted); - } - }); + if let Some(_) = data.meta.text_map.as_ref() { + notifier.send_modify(|i| *i += 1); } Ok(()) } @@ -83,7 +64,6 @@ impl DocumentWatcher { Box::pin(stream! { while let Ok(()) = receiver.changed().await { if let Some(collab) = collab.upgrade() { - receiver.borrow().clear(); match Self::get_document_content(collab) { Ok(content) => { yield FragmentUpdate::Update(Fragment { From 3c72f1292db1cb206126823d2cbdf6066f634ae1 Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Fri, 14 Jun 2024 12:29:14 +0200 Subject: [PATCH 2/2] chore: fixed tests for document content extraction --- services/appflowy-indexer/src/consumer.rs | 2 +- services/appflowy-indexer/src/extract.rs | 59 ++++++++++++++----- .../src/watchers/document_watcher.rs | 6 +- 3 files changed, 48 insertions(+), 19 deletions(-) diff --git a/services/appflowy-indexer/src/consumer.rs b/services/appflowy-indexer/src/consumer.rs index c33c2e81..75bbf00b 100644 --- a/services/appflowy-indexer/src/consumer.rs +++ b/services/appflowy-indexer/src/consumer.rs @@ -399,7 +399,7 @@ mod test { assert_ne!(contents.len(), 0); let content: Option = contents[0].get(0); - assert_eq!(content.as_deref(), Some("test-value\n")); + assert_eq!(content.as_deref(), Some("test-value ")); } #[ignore] diff --git a/services/appflowy-indexer/src/extract.rs b/services/appflowy-indexer/src/extract.rs index a4b22f93..407c7ce9 100644 --- a/services/appflowy-indexer/src/extract.rs +++ b/services/appflowy-indexer/src/extract.rs @@ -8,18 +8,10 @@ pub fn document_to_plain_text(document: &DocumentData) -> String { // do a depth-first scan of the document blocks while let Some(block_id) = stack.pop() { if let Some(block) = document.blocks.get(block_id) { - if let Some(delta) = block.data.get("delta") { - if let Ok(deltas) = serde_json::from_value::>(delta.clone()) { - for delta in deltas { - if let TextDelta::Inserted(text, _) = delta { - let trimmed = text.trim(); - if !trimmed.is_empty() { - buf.push_str(&trimmed); - buf.push(' '); - } - } - } - } + if let Some(deltas) = get_delta_from_block_data(block) { + push_deltas_to_str(&mut buf, deltas); + } else if let Some(deltas) = get_delta_from_external_text_id(block, text_map) { + push_deltas_to_str(&mut buf, deltas); } if let Some(children) = document.meta.children_map.get(&block.children) { // we want to process children blocks in the same order they are given in children_map @@ -34,6 +26,45 @@ pub fn document_to_plain_text(document: &DocumentData) -> String { buf } +/// Try to retrieve deltas from `block.data.delta`. +fn get_delta_from_block_data(block: &collab_document::blocks::Block) -> Option> { + if let Some(delta) = block.data.get("delta") { + if let Ok(deltas) = serde_json::from_value::>(delta.clone()) { + return Some(deltas); + } + } + None +} + +/// Try to retrieve deltas from text_map's text associated with `block.external_id`. +fn get_delta_from_external_text_id( + block: &collab_document::blocks::Block, + text_map: &std::collections::HashMap, +) -> Option> { + if block.external_type.as_deref() == Some("text") { + if let Some(text_id) = block.external_id.as_deref() { + if let Some(json) = text_map.get(text_id) { + if let Ok(deltas) = serde_json::from_str::>(json) { + return Some(deltas); + } + } + } + } + None +} + +fn push_deltas_to_str(buf: &mut String, deltas: Vec) { + for delta in deltas { + if let TextDelta::Inserted(text, _) = delta { + let trimmed = text.trim(); + if !trimmed.is_empty() { + buf.push_str(&trimmed); + buf.push(' '); + } + } + } +} + #[cfg(test)] mod test { use crate::extract::document_to_plain_text; @@ -45,7 +76,7 @@ mod test { fn document_plain_text() { let doc = get_started_document_data().unwrap(); let text = document_to_plain_text(&doc); - let expected = "\nWelcome to AppFlowy!\nHere are the basics\nClick anywhere and just start typing.\nHighlight any text, and use the editing menu to style your writing however you like.\nAs soon as you type / a menu will pop up. Select different types of content blocks you can add.\nType / followed by /bullet or /num to create a list.\nClick + New Page button at the bottom of your sidebar to add a new page.\nClick + next to any page title in the sidebar to quickly add a new subpage, Document, Grid, or Kanban Board.\n\n\nKeyboard shortcuts, markdown, and code block\nKeyboard shortcuts guide\nMarkdown reference\nType /code to insert a code block\n// This is the main function.\nfn main() {\n // Print text to the console.\n println!(\"Hello World!\");\n}\n\nHave a question❓\nClick ? at the bottom right for help and support.\n\n\nLike AppFlowy? Follow us:\nGitHub\nTwitter: @appflowy\nNewsletter\n\n\n\n\n"; + let expected = "Welcome to AppFlowy! Here are the basics Click anywhere and just start typing. Highlight any text, and use the editing menu to style your writing however you like. As soon as you type / a menu will pop up. Select different types of content blocks you can add. Type / followed by /bullet or /num to create a list. Click + New Page button at the bottom of your sidebar to add a new page. Click + next to any page title in the sidebar to quickly add a new subpage, Document , Grid , or Kanban Board . Keyboard shortcuts, markdown, and code block Keyboard shortcuts guide Markdown reference Type /code to insert a code block // This is the main function.\nfn main() {\n // Print text to the console.\n println!(\"Hello World!\");\n} Have a question❓ Click ? at the bottom right for help and support. Like AppFlowy? Follow us: GitHub Twitter : @appflowy Newsletter "; assert_eq!(&text, expected); } @@ -53,7 +84,7 @@ mod test { fn document_plain_text_with_nested_blocks() { let doc = get_initial_document_data().unwrap(); let text = document_to_plain_text(&doc); - let expected = "Welcome to AppFlowy!\nHere are the basics\nHere is H3\nClick anywhere and just start typing.\nClick Enter to create a new line.\nHighlight any text, and use the editing menu to style your writing however you like.\nAs soon as you type / a menu will pop up. Select different types of content blocks you can add.\nType / followed by /bullet or /num to create a list.\nClick + New Page button at the bottom of your sidebar to add a new page.\nClick + next to any page title in the sidebar to quickly add a new subpage, Document, Grid, or Kanban Board.\n\n\nKeyboard shortcuts, markdown, and code block\nKeyboard shortcuts guide\nMarkdown reference\nType /code to insert a code block\n// This is the main function.\nfn main() {\n // Print text to the console.\n println!(\"Hello World!\");\n}\n\nThis is a paragraph\nThis is a paragraph\nHave a question❓\nClick ? at the bottom right for help and support.\nThis is a paragraph\nThis is a paragraph\nClick ? at the bottom right for help and support.\n\n\nLike AppFlowy? Follow us:\nGitHub\nTwitter: @appflowy\nNewsletter\n\n\n\n\n"; + let expected = "Welcome to AppFlowy! Here are the basics Here is H3 Click anywhere and just start typing. Click Enter to create a new line. Highlight any text, and use the editing menu to style your writing however you like. As soon as you type / a menu will pop up. Select different types of content blocks you can add. Type / followed by /bullet or /num to create a list. Click + New Page button at the bottom of your sidebar to add a new page. Click + next to any page title in the sidebar to quickly add a new subpage, Document , Grid , or Kanban Board . Keyboard shortcuts, markdown, and code block Keyboard shortcuts guide Markdown reference Type /code to insert a code block // This is the main function.\nfn main() {\n // Print text to the console.\n println!(\"Hello World!\");\n} This is a paragraph This is a paragraph Have a question❓ Click ? at the bottom right for help and support. This is a paragraph This is a paragraph Click ? at the bottom right for help and support. Like AppFlowy? Follow us: GitHub Twitter : @appflowy Newsletter "; assert_eq!(&text, expected); } } diff --git a/services/appflowy-indexer/src/watchers/document_watcher.rs b/services/appflowy-indexer/src/watchers/document_watcher.rs index 2ad91968..88580f31 100644 --- a/services/appflowy-indexer/src/watchers/document_watcher.rs +++ b/services/appflowy-indexer/src/watchers/document_watcher.rs @@ -3,10 +3,8 @@ use std::sync::Arc; use async_stream::stream; use collab::core::collab::MutexCollab; -use collab_document::blocks::DeltaType; use collab_document::document::Document; use collab_entity::CollabType; -use dashmap::DashMap; use database_entity::dto::EmbeddingContentType; use futures::Stream; use tokio::sync::watch::Sender; @@ -149,7 +147,7 @@ mod test { collab_type: CollabType::Document, content_type: EmbeddingContentType::PlainText, object_id: "o-1".to_string(), - content: "A\n".to_string(), + content: "A ".to_string(), }) ); @@ -166,7 +164,7 @@ mod test { collab_type: CollabType::Document, content_type: EmbeddingContentType::PlainText, object_id: "o-1".to_string(), - content: "BA\n".to_string(), + content: "BA ".to_string(), }) ); }