diff --git a/libs/collab-stream/src/stream_group.rs b/libs/collab-stream/src/stream_group.rs index 6d67ed31..e962a0d2 100644 --- a/libs/collab-stream/src/stream_group.rs +++ b/libs/collab-stream/src/stream_group.rs @@ -58,12 +58,19 @@ impl StreamGroup { /// Ensures the consumer group exists, creating it if necessary. pub async fn ensure_consumer_group(&mut self) -> Result<(), StreamError> { - let _: RedisResult<()> = self - .connection_manager - //Use '$' if you want new messages or '0' to read from the beginning. - .xgroup_create_mkstream(&self.stream_key, &self.group_name, "0") - .await; - + let result: RedisResult<()> = self + .connection_manager + //Use '$' if you want new messages or '0' to read from the beginning. + .xgroup_create_mkstream(&self.stream_key, &self.group_name, "0") + .await; + if let Err(e) = result { + tracing::warn!( + "error when creating consumer group `{}` `{}`: {:?}", + self.stream_key, + self.group_name, + e + ); + } Ok(()) } diff --git a/services/appflowy-collaborate/src/collab/queue.rs b/services/appflowy-collaborate/src/collab/queue.rs index 3f0c2e6d..951fc151 100644 --- a/services/appflowy-collaborate/src/collab/queue.rs +++ b/services/appflowy-collaborate/src/collab/queue.rs @@ -17,8 +17,8 @@ use app_error::AppError; use database_entity::dto::{CollabParams, QueryCollab, QueryCollabResult}; use crate::collab::queue_redis_ops::{ - get_pending_meta, remove_all_pending_meta, remove_pending_meta, storage_cache_key, PendingWrite, - WritePriority, PENDING_WRITE_META_EXPIRE_SECS, + get_pending_meta, remove_pending_meta, storage_cache_key, PendingWrite, WritePriority, + PENDING_WRITE_META_EXPIRE_SECS, }; use crate::collab::RedisSortedSet; use crate::metrics::CollabMetrics; @@ -170,7 +170,8 @@ impl StorageQueue { #[cfg(debug_assertions)] pub async fn clear(&self) -> Result<(), AppError> { self.pending_write_set.clear().await?; - remove_all_pending_meta(self.connection_manager.clone()).await?; + crate::collab::queue_redis_ops::remove_all_pending_meta(self.connection_manager.clone()) + .await?; Ok(()) } diff --git a/services/appflowy-collaborate/src/collab/queue_redis_ops.rs b/services/appflowy-collaborate/src/collab/queue_redis_ops.rs index 9477e371..e214768c 100644 --- a/services/appflowy-collaborate/src/collab/queue_redis_ops.rs +++ b/services/appflowy-collaborate/src/collab/queue_redis_ops.rs @@ -8,6 +8,7 @@ use serde_repr::{Deserialize_repr, Serialize_repr}; pub(crate) const PENDING_WRITE_META_EXPIRE_SECS: u64 = 604800; // 7 days in seconds +#[allow(dead_code)] pub(crate) async fn remove_all_pending_meta( mut connection_manager: RedisConnectionManager, ) -> Result<(), AppError> { diff --git a/services/appflowy-indexer/src/collab_handle.rs b/services/appflowy-indexer/src/collab_handle.rs index 122d0b3a..c33b1017 100644 --- a/services/appflowy-indexer/src/collab_handle.rs +++ b/services/appflowy-indexer/src/collab_handle.rs @@ -178,8 +178,9 @@ impl CollabHandle { Err(err) => tracing::error!("failed to decode update event: {}", err), } } - txn.commit(); - } + } else { + tracing::warn!("failed to obtain a collab lock"); + }; update_stream.ack_messages(&messages).await?; Ok(()) diff --git a/services/appflowy-indexer/src/consumer.rs b/services/appflowy-indexer/src/consumer.rs index d4f0d415..c33c2e81 100644 --- a/services/appflowy-indexer/src/consumer.rs +++ b/services/appflowy-indexer/src/consumer.rs @@ -123,12 +123,6 @@ impl OpenCollabConsumer { let fragment = { match &collab.collab_type { CollabType::Document => { - tracing::trace!( - "indexing document {}/{}", - collab.workspace_id, - collab.object_id - ); - let document = Document::from_doc_state( CollabOrigin::Empty, DataSource::DocStateV1(collab.collab.doc_state.to_vec()), @@ -137,6 +131,9 @@ impl OpenCollabConsumer { )?; let data = document.get_document_data()?; let content = crate::extract::document_to_plain_text(&data); + if content.is_empty() { + return Ok(()); + } Fragment { fragment_id: collab.object_id.clone(), object_id: collab.object_id.clone(), @@ -156,6 +153,11 @@ impl OpenCollabConsumer { }, } }; + tracing::trace!( + "indexing collab {}/{}", + collab.workspace_id, + collab.object_id + ); indexer .update_index(&collab.workspace_id, vec![fragment]) .await?; diff --git a/services/appflowy-indexer/src/extract.rs b/services/appflowy-indexer/src/extract.rs index b703a4bb..a4b22f93 100644 --- a/services/appflowy-indexer/src/extract.rs +++ b/services/appflowy-indexer/src/extract.rs @@ -8,23 +8,17 @@ pub fn document_to_plain_text(document: &DocumentData) -> String { // do a depth-first scan of the document blocks while let Some(block_id) = stack.pop() { if let Some(block) = document.blocks.get(block_id) { - if block.external_type.as_deref() == Some("text") { - if let Some(text_id) = block.external_id.as_deref() { - if let Some(json) = text_map.get(text_id) { - match serde_json::from_str::>(json) { - Ok(deltas) => { - for delta in deltas { - if let TextDelta::Inserted(text, _) = delta { - buf.push_str(&text); - } - } - }, - Err(err) => { - tracing::error!("text_id `{}` is not a valid delta array: {}", text_id, err) - }, + if let Some(delta) = block.data.get("delta") { + if let Ok(deltas) = serde_json::from_value::>(delta.clone()) { + for delta in deltas { + if let TextDelta::Inserted(text, _) = delta { + let trimmed = text.trim(); + if !trimmed.is_empty() { + buf.push_str(&trimmed); + buf.push(' '); + } } } - buf.push('\n'); } } if let Some(children) = document.meta.children_map.get(&block.children) { @@ -36,6 +30,7 @@ pub fn document_to_plain_text(document: &DocumentData) -> String { } } } + //tracing::trace!("Document plain text: `{}`", buf); buf } diff --git a/services/appflowy-indexer/src/watchers/document_watcher.rs b/services/appflowy-indexer/src/watchers/document_watcher.rs index 17c72f8b..2ad91968 100644 --- a/services/appflowy-indexer/src/watchers/document_watcher.rs +++ b/services/appflowy-indexer/src/watchers/document_watcher.rs @@ -19,7 +19,7 @@ use crate::indexer::Fragment; pub struct DocumentWatcher { object_id: String, content: Document, - receiver: tokio::sync::watch::Receiver>, + receiver: tokio::sync::watch::Receiver, } unsafe impl Send for DocumentWatcher {} @@ -31,7 +31,7 @@ impl DocumentWatcher { mut content: Document, index_initial_content: bool, ) -> Result { - let (tx, receiver) = tokio::sync::watch::channel(DashMap::new()); + let (tx, receiver) = tokio::sync::watch::channel(0); if index_initial_content { Self::index_initial_content(&mut content, &tx)?; } @@ -43,35 +43,16 @@ impl DocumentWatcher { }) } - fn attach_listener(document: &mut Document, notifier: Sender>) { - document.subscribe_block_changed(move |blocks, _| { - let changes: Vec<_> = blocks - .iter() - .flat_map(|block| { - block - .iter() - .map(|payload| (payload.id.clone(), payload.command.clone())) - }) - .collect(); - notifier.send_modify(|map| { - for (id, command) in changes { - map.insert(id, command); - } - }) + fn attach_listener(document: &mut Document, notifier: Sender) { + document.subscribe_block_changed(move |_, _| { + notifier.send_modify(|i| *i += 1); }); } - fn index_initial_content( - document: &mut Document, - notifier: &Sender>, - ) -> Result<()> { + fn index_initial_content(document: &mut Document, notifier: &Sender) -> Result<()> { let data = document.get_document_data()?; - if let Some(text_map) = data.meta.text_map.as_ref() { - notifier.send_modify(|map| { - for text_id in text_map.keys() { - map.insert(text_id.clone(), DeltaType::Inserted); - } - }); + if let Some(_) = data.meta.text_map.as_ref() { + notifier.send_modify(|i| *i += 1); } Ok(()) } @@ -83,7 +64,6 @@ impl DocumentWatcher { Box::pin(stream! { while let Ok(()) = receiver.changed().await { if let Some(collab) = collab.upgrade() { - receiver.borrow().clear(); match Self::get_document_content(collab) { Ok(content) => { yield FragmentUpdate::Update(Fragment {