diff --git a/libs/collab-stream/src/stream_group.rs b/libs/collab-stream/src/stream_group.rs index 6d67ed31..e962a0d2 100644 --- a/libs/collab-stream/src/stream_group.rs +++ b/libs/collab-stream/src/stream_group.rs @@ -58,12 +58,19 @@ impl StreamGroup { /// Ensures the consumer group exists, creating it if necessary. pub async fn ensure_consumer_group(&mut self) -> Result<(), StreamError> { - let _: RedisResult<()> = self - .connection_manager - //Use '$' if you want new messages or '0' to read from the beginning. - .xgroup_create_mkstream(&self.stream_key, &self.group_name, "0") - .await; - + let result: RedisResult<()> = self + .connection_manager + //Use '$' if you want new messages or '0' to read from the beginning. + .xgroup_create_mkstream(&self.stream_key, &self.group_name, "0") + .await; + if let Err(e) = result { + tracing::warn!( + "error when creating consumer group `{}` `{}`: {:?}", + self.stream_key, + self.group_name, + e + ); + } Ok(()) } diff --git a/services/appflowy-collaborate/src/collab/queue.rs b/services/appflowy-collaborate/src/collab/queue.rs index 3f0c2e6d..951fc151 100644 --- a/services/appflowy-collaborate/src/collab/queue.rs +++ b/services/appflowy-collaborate/src/collab/queue.rs @@ -17,8 +17,8 @@ use app_error::AppError; use database_entity::dto::{CollabParams, QueryCollab, QueryCollabResult}; use crate::collab::queue_redis_ops::{ - get_pending_meta, remove_all_pending_meta, remove_pending_meta, storage_cache_key, PendingWrite, - WritePriority, PENDING_WRITE_META_EXPIRE_SECS, + get_pending_meta, remove_pending_meta, storage_cache_key, PendingWrite, WritePriority, + PENDING_WRITE_META_EXPIRE_SECS, }; use crate::collab::RedisSortedSet; use crate::metrics::CollabMetrics; @@ -170,7 +170,8 @@ impl StorageQueue { #[cfg(debug_assertions)] pub async fn clear(&self) -> Result<(), AppError> { self.pending_write_set.clear().await?; - remove_all_pending_meta(self.connection_manager.clone()).await?; + crate::collab::queue_redis_ops::remove_all_pending_meta(self.connection_manager.clone()) + .await?; Ok(()) } diff --git a/services/appflowy-collaborate/src/collab/queue_redis_ops.rs b/services/appflowy-collaborate/src/collab/queue_redis_ops.rs index 9477e371..e214768c 100644 --- a/services/appflowy-collaborate/src/collab/queue_redis_ops.rs +++ b/services/appflowy-collaborate/src/collab/queue_redis_ops.rs @@ -8,6 +8,7 @@ use serde_repr::{Deserialize_repr, Serialize_repr}; pub(crate) const PENDING_WRITE_META_EXPIRE_SECS: u64 = 604800; // 7 days in seconds +#[allow(dead_code)] pub(crate) async fn remove_all_pending_meta( mut connection_manager: RedisConnectionManager, ) -> Result<(), AppError> { diff --git a/services/appflowy-indexer/src/collab_handle.rs b/services/appflowy-indexer/src/collab_handle.rs index 122d0b3a..c33b1017 100644 --- a/services/appflowy-indexer/src/collab_handle.rs +++ b/services/appflowy-indexer/src/collab_handle.rs @@ -178,8 +178,9 @@ impl CollabHandle { Err(err) => tracing::error!("failed to decode update event: {}", err), } } - txn.commit(); - } + } else { + tracing::warn!("failed to obtain a collab lock"); + }; update_stream.ack_messages(&messages).await?; Ok(()) diff --git a/services/appflowy-indexer/src/consumer.rs b/services/appflowy-indexer/src/consumer.rs index d4f0d415..75bbf00b 100644 --- a/services/appflowy-indexer/src/consumer.rs +++ b/services/appflowy-indexer/src/consumer.rs @@ -123,12 +123,6 @@ impl OpenCollabConsumer { let fragment = { match &collab.collab_type { CollabType::Document => { - tracing::trace!( - "indexing document {}/{}", - collab.workspace_id, - collab.object_id - ); - let document = Document::from_doc_state( CollabOrigin::Empty, DataSource::DocStateV1(collab.collab.doc_state.to_vec()), @@ -137,6 +131,9 @@ impl OpenCollabConsumer { )?; let data = document.get_document_data()?; let content = crate::extract::document_to_plain_text(&data); + if content.is_empty() { + return Ok(()); + } Fragment { fragment_id: collab.object_id.clone(), object_id: collab.object_id.clone(), @@ -156,6 +153,11 @@ impl OpenCollabConsumer { }, } }; + tracing::trace!( + "indexing collab {}/{}", + collab.workspace_id, + collab.object_id + ); indexer .update_index(&collab.workspace_id, vec![fragment]) .await?; @@ -397,7 +399,7 @@ mod test { assert_ne!(contents.len(), 0); let content: Option = contents[0].get(0); - assert_eq!(content.as_deref(), Some("test-value\n")); + assert_eq!(content.as_deref(), Some("test-value ")); } #[ignore] diff --git a/services/appflowy-indexer/src/extract.rs b/services/appflowy-indexer/src/extract.rs index b703a4bb..407c7ce9 100644 --- a/services/appflowy-indexer/src/extract.rs +++ b/services/appflowy-indexer/src/extract.rs @@ -8,24 +8,10 @@ pub fn document_to_plain_text(document: &DocumentData) -> String { // do a depth-first scan of the document blocks while let Some(block_id) = stack.pop() { if let Some(block) = document.blocks.get(block_id) { - if block.external_type.as_deref() == Some("text") { - if let Some(text_id) = block.external_id.as_deref() { - if let Some(json) = text_map.get(text_id) { - match serde_json::from_str::>(json) { - Ok(deltas) => { - for delta in deltas { - if let TextDelta::Inserted(text, _) = delta { - buf.push_str(&text); - } - } - }, - Err(err) => { - tracing::error!("text_id `{}` is not a valid delta array: {}", text_id, err) - }, - } - } - buf.push('\n'); - } + if let Some(deltas) = get_delta_from_block_data(block) { + push_deltas_to_str(&mut buf, deltas); + } else if let Some(deltas) = get_delta_from_external_text_id(block, text_map) { + push_deltas_to_str(&mut buf, deltas); } if let Some(children) = document.meta.children_map.get(&block.children) { // we want to process children blocks in the same order they are given in children_map @@ -36,9 +22,49 @@ pub fn document_to_plain_text(document: &DocumentData) -> String { } } } + //tracing::trace!("Document plain text: `{}`", buf); buf } +/// Try to retrieve deltas from `block.data.delta`. +fn get_delta_from_block_data(block: &collab_document::blocks::Block) -> Option> { + if let Some(delta) = block.data.get("delta") { + if let Ok(deltas) = serde_json::from_value::>(delta.clone()) { + return Some(deltas); + } + } + None +} + +/// Try to retrieve deltas from text_map's text associated with `block.external_id`. +fn get_delta_from_external_text_id( + block: &collab_document::blocks::Block, + text_map: &std::collections::HashMap, +) -> Option> { + if block.external_type.as_deref() == Some("text") { + if let Some(text_id) = block.external_id.as_deref() { + if let Some(json) = text_map.get(text_id) { + if let Ok(deltas) = serde_json::from_str::>(json) { + return Some(deltas); + } + } + } + } + None +} + +fn push_deltas_to_str(buf: &mut String, deltas: Vec) { + for delta in deltas { + if let TextDelta::Inserted(text, _) = delta { + let trimmed = text.trim(); + if !trimmed.is_empty() { + buf.push_str(&trimmed); + buf.push(' '); + } + } + } +} + #[cfg(test)] mod test { use crate::extract::document_to_plain_text; @@ -50,7 +76,7 @@ mod test { fn document_plain_text() { let doc = get_started_document_data().unwrap(); let text = document_to_plain_text(&doc); - let expected = "\nWelcome to AppFlowy!\nHere are the basics\nClick anywhere and just start typing.\nHighlight any text, and use the editing menu to style your writing however you like.\nAs soon as you type / a menu will pop up. Select different types of content blocks you can add.\nType / followed by /bullet or /num to create a list.\nClick + New Page button at the bottom of your sidebar to add a new page.\nClick + next to any page title in the sidebar to quickly add a new subpage, Document, Grid, or Kanban Board.\n\n\nKeyboard shortcuts, markdown, and code block\nKeyboard shortcuts guide\nMarkdown reference\nType /code to insert a code block\n// This is the main function.\nfn main() {\n // Print text to the console.\n println!(\"Hello World!\");\n}\n\nHave a question❓\nClick ? at the bottom right for help and support.\n\n\nLike AppFlowy? Follow us:\nGitHub\nTwitter: @appflowy\nNewsletter\n\n\n\n\n"; + let expected = "Welcome to AppFlowy! Here are the basics Click anywhere and just start typing. Highlight any text, and use the editing menu to style your writing however you like. As soon as you type / a menu will pop up. Select different types of content blocks you can add. Type / followed by /bullet or /num to create a list. Click + New Page button at the bottom of your sidebar to add a new page. Click + next to any page title in the sidebar to quickly add a new subpage, Document , Grid , or Kanban Board . Keyboard shortcuts, markdown, and code block Keyboard shortcuts guide Markdown reference Type /code to insert a code block // This is the main function.\nfn main() {\n // Print text to the console.\n println!(\"Hello World!\");\n} Have a question❓ Click ? at the bottom right for help and support. Like AppFlowy? Follow us: GitHub Twitter : @appflowy Newsletter "; assert_eq!(&text, expected); } @@ -58,7 +84,7 @@ mod test { fn document_plain_text_with_nested_blocks() { let doc = get_initial_document_data().unwrap(); let text = document_to_plain_text(&doc); - let expected = "Welcome to AppFlowy!\nHere are the basics\nHere is H3\nClick anywhere and just start typing.\nClick Enter to create a new line.\nHighlight any text, and use the editing menu to style your writing however you like.\nAs soon as you type / a menu will pop up. Select different types of content blocks you can add.\nType / followed by /bullet or /num to create a list.\nClick + New Page button at the bottom of your sidebar to add a new page.\nClick + next to any page title in the sidebar to quickly add a new subpage, Document, Grid, or Kanban Board.\n\n\nKeyboard shortcuts, markdown, and code block\nKeyboard shortcuts guide\nMarkdown reference\nType /code to insert a code block\n// This is the main function.\nfn main() {\n // Print text to the console.\n println!(\"Hello World!\");\n}\n\nThis is a paragraph\nThis is a paragraph\nHave a question❓\nClick ? at the bottom right for help and support.\nThis is a paragraph\nThis is a paragraph\nClick ? at the bottom right for help and support.\n\n\nLike AppFlowy? Follow us:\nGitHub\nTwitter: @appflowy\nNewsletter\n\n\n\n\n"; + let expected = "Welcome to AppFlowy! Here are the basics Here is H3 Click anywhere and just start typing. Click Enter to create a new line. Highlight any text, and use the editing menu to style your writing however you like. As soon as you type / a menu will pop up. Select different types of content blocks you can add. Type / followed by /bullet or /num to create a list. Click + New Page button at the bottom of your sidebar to add a new page. Click + next to any page title in the sidebar to quickly add a new subpage, Document , Grid , or Kanban Board . Keyboard shortcuts, markdown, and code block Keyboard shortcuts guide Markdown reference Type /code to insert a code block // This is the main function.\nfn main() {\n // Print text to the console.\n println!(\"Hello World!\");\n} This is a paragraph This is a paragraph Have a question❓ Click ? at the bottom right for help and support. This is a paragraph This is a paragraph Click ? at the bottom right for help and support. Like AppFlowy? Follow us: GitHub Twitter : @appflowy Newsletter "; assert_eq!(&text, expected); } } diff --git a/services/appflowy-indexer/src/watchers/document_watcher.rs b/services/appflowy-indexer/src/watchers/document_watcher.rs index 17c72f8b..88580f31 100644 --- a/services/appflowy-indexer/src/watchers/document_watcher.rs +++ b/services/appflowy-indexer/src/watchers/document_watcher.rs @@ -3,10 +3,8 @@ use std::sync::Arc; use async_stream::stream; use collab::core::collab::MutexCollab; -use collab_document::blocks::DeltaType; use collab_document::document::Document; use collab_entity::CollabType; -use dashmap::DashMap; use database_entity::dto::EmbeddingContentType; use futures::Stream; use tokio::sync::watch::Sender; @@ -19,7 +17,7 @@ use crate::indexer::Fragment; pub struct DocumentWatcher { object_id: String, content: Document, - receiver: tokio::sync::watch::Receiver>, + receiver: tokio::sync::watch::Receiver, } unsafe impl Send for DocumentWatcher {} @@ -31,7 +29,7 @@ impl DocumentWatcher { mut content: Document, index_initial_content: bool, ) -> Result { - let (tx, receiver) = tokio::sync::watch::channel(DashMap::new()); + let (tx, receiver) = tokio::sync::watch::channel(0); if index_initial_content { Self::index_initial_content(&mut content, &tx)?; } @@ -43,35 +41,16 @@ impl DocumentWatcher { }) } - fn attach_listener(document: &mut Document, notifier: Sender>) { - document.subscribe_block_changed(move |blocks, _| { - let changes: Vec<_> = blocks - .iter() - .flat_map(|block| { - block - .iter() - .map(|payload| (payload.id.clone(), payload.command.clone())) - }) - .collect(); - notifier.send_modify(|map| { - for (id, command) in changes { - map.insert(id, command); - } - }) + fn attach_listener(document: &mut Document, notifier: Sender) { + document.subscribe_block_changed(move |_, _| { + notifier.send_modify(|i| *i += 1); }); } - fn index_initial_content( - document: &mut Document, - notifier: &Sender>, - ) -> Result<()> { + fn index_initial_content(document: &mut Document, notifier: &Sender) -> Result<()> { let data = document.get_document_data()?; - if let Some(text_map) = data.meta.text_map.as_ref() { - notifier.send_modify(|map| { - for text_id in text_map.keys() { - map.insert(text_id.clone(), DeltaType::Inserted); - } - }); + if let Some(_) = data.meta.text_map.as_ref() { + notifier.send_modify(|i| *i += 1); } Ok(()) } @@ -83,7 +62,6 @@ impl DocumentWatcher { Box::pin(stream! { while let Ok(()) = receiver.changed().await { if let Some(collab) = collab.upgrade() { - receiver.borrow().clear(); match Self::get_document_content(collab) { Ok(content) => { yield FragmentUpdate::Update(Fragment { @@ -169,7 +147,7 @@ mod test { collab_type: CollabType::Document, content_type: EmbeddingContentType::PlainText, object_id: "o-1".to_string(), - content: "A\n".to_string(), + content: "A ".to_string(), }) ); @@ -186,7 +164,7 @@ mod test { collab_type: CollabType::Document, content_type: EmbeddingContentType::PlainText, object_id: "o-1".to_string(), - content: "BA\n".to_string(), + content: "BA ".to_string(), }) ); }