From 73127fb886c379a6adc3156cbb965a9376b693a3 Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Tue, 13 Aug 2024 16:20:06 +0800 Subject: [PATCH] chore: shorten group lifetime (#726) --- libs/collab-stream/src/stream_group.rs | 12 +++--- .../src/group/group_init.rs | 41 +++++++++++++------ .../appflowy-collaborate/src/group/manager.rs | 2 +- .../appflowy-collaborate/src/group/state.rs | 7 +--- .../src/indexer/document_indexer.rs | 14 +++++-- .../appflowy-collaborate/src/rt_server.rs | 3 +- 6 files changed, 50 insertions(+), 29 deletions(-) diff --git a/libs/collab-stream/src/stream_group.rs b/libs/collab-stream/src/stream_group.rs index 3bb4887c..c3b0de59 100644 --- a/libs/collab-stream/src/stream_group.rs +++ b/libs/collab-stream/src/stream_group.rs @@ -67,12 +67,7 @@ impl StreamGroup { match result { Ok(_) => Ok(()), Err(redis_error) => { - warn!( - "error when creating consumer group `{}` `{}`: {:?}", - self.stream_key, self.group_name, redis_error - ); - - return match redis_error.kind() { + let result = match redis_error.kind() { ErrorKind::ExtensionError => match redis_error.code() { None => Err(StreamError::from(redis_error)), Some(code) => { @@ -85,6 +80,11 @@ impl StreamGroup { }, _ => Err(StreamError::from(redis_error)), }; + + if result.is_err() { + error!("error when creating consumer group: {:?}", result); + } + result }, } } diff --git a/services/appflowy-collaborate/src/group/group_init.rs b/services/appflowy-collaborate/src/group/group_init.rs index 16c89786..a1a2c6a4 100644 --- a/services/appflowy-collaborate/src/group/group_init.rs +++ b/services/appflowy-collaborate/src/group/group_init.rs @@ -27,7 +27,7 @@ use collab_stream::stream_group::StreamGroup; use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, Ordering}; use std::time::Duration; use tokio::sync::mpsc; -use tracing::{debug, error, event, trace}; +use tracing::{error, event, info, trace}; use yrs::updates::decoder::Decode; use yrs::updates::encoder::Encode; use yrs::Update; @@ -187,20 +187,35 @@ impl CollabGroup { /// subscriber pub async fn is_inactive(&self) -> bool { let modified_at = self.broadcast.modified_at.lock(); + + // In debug mode, we set the timeout to 60 seconds if cfg!(debug_assertions) { - modified_at.elapsed().as_secs() > 60 && self.subscribers.is_empty() + trace!( + "Group:{} is inactive for {} seconds, subscribers: {}", + self.object_id, + modified_at.elapsed().as_secs(), + self.subscribers.len() + ); + modified_at.elapsed().as_secs() > 120 } else { let elapsed_secs = modified_at.elapsed().as_secs(); - const MAXIMUM_SECS: u64 = 60 * 60 * 12; // 12 hours - if elapsed_secs > MAXIMUM_SECS { - debug!( - "The group:{} is inactive for {} seconds", - self.object_id, elapsed_secs - ); - // If the group is inactive for more than 12 hours, mark it as inactive - true + if elapsed_secs > self.timeout_secs() { + // Mark the group as inactive if it has been inactive for more than 3 hours, even if there are subscribers. + // Otherwise, return true only if there are no subscribers. + const MAXIMUM_SECS: u64 = 3 * 60 * 60; + if elapsed_secs > MAXIMUM_SECS { + info!( + "Group:{} is inactive for {} seconds, subscribers: {}", + self.object_id, + modified_at.elapsed().as_secs(), + self.subscribers.len() + ); + true + } else { + self.subscribers.is_empty() + } } else { - elapsed_secs > self.timeout_secs() && self.subscribers.is_empty() + false } } } @@ -225,8 +240,8 @@ impl CollabGroup { fn timeout_secs(&self) -> u64 { match self.collab_type { CollabType::Document => 10 * 60, // 10 minutes - CollabType::Database | CollabType::DatabaseRow => 60 * 60, // 1 hour - CollabType::WorkspaceDatabase | CollabType::Folder | CollabType::UserAwareness => 2 * 60 * 60, // 2 hours, + CollabType::Database | CollabType::DatabaseRow => 30 * 60, // 30 minutes + CollabType::WorkspaceDatabase | CollabType::Folder | CollabType::UserAwareness => 6 * 60 * 60, // 6 hours, CollabType::Unknown => { 10 * 60 // 10 minutes }, diff --git a/services/appflowy-collaborate/src/group/manager.rs b/services/appflowy-collaborate/src/group/manager.rs index 56ddfe87..013fbe1c 100644 --- a/services/appflowy-collaborate/src/group/manager.rs +++ b/services/appflowy-collaborate/src/group/manager.rs @@ -76,7 +76,7 @@ where }) } - pub async fn inactive_groups(&self) -> Vec { + pub async fn get_inactive_groups(&self) -> Vec { self.state.get_inactive_group_ids().await } diff --git a/services/appflowy-collaborate/src/group/state.rs b/services/appflowy-collaborate/src/group/state.rs index 240cba16..4ac4dddc 100644 --- a/services/appflowy-collaborate/src/group/state.rs +++ b/services/appflowy-collaborate/src/group/state.rs @@ -45,10 +45,8 @@ impl GroupManagementState { } } - if !inactive_group_ids.is_empty() { - for object_id in &inactive_group_ids { - self.remove_group(object_id).await; - } + for object_id in &inactive_group_ids { + self.remove_group(object_id).await; } inactive_group_ids } @@ -123,7 +121,6 @@ impl GroupManagementState { // Log error if the group doesn't exist error!("Group for object_id:{} not found", object_id); } - self.metrics_calculate.num_of_active_collab.store( self.group_by_object_id.len() as i64, std::sync::atomic::Ordering::Relaxed, diff --git a/services/appflowy-collaborate/src/indexer/document_indexer.rs b/services/appflowy-collaborate/src/indexer/document_indexer.rs index 66c174f2..c1a8a699 100644 --- a/services/appflowy-collaborate/src/indexer/document_indexer.rs +++ b/services/appflowy-collaborate/src/indexer/document_indexer.rs @@ -1,10 +1,10 @@ +use anyhow::anyhow; use async_trait::async_trait; -use std::sync::Arc; - use collab::core::collab::MutexCollab; use collab_document::document::Document; use collab_document::error::DocumentError; use collab_entity::CollabType; +use std::sync::Arc; use app_error::AppError; use appflowy_ai_client::client::AppFlowyAIClient; @@ -27,7 +27,15 @@ impl DocumentIndexer { fn get_document_contents( collab: Arc, ) -> Result<(String, Vec), DocumentError> { - let object_id = collab.try_lock().unwrap().object_id.clone(); + let object_id = collab + .try_lock() + .ok_or_else(|| { + DocumentError::Internal(anyhow!( + "Failed to lock collab when trying to get document content".to_string() + )) + })? + .object_id + .clone(); let document = Document::open(collab)?; let document_data = document.get_document_data()?; let content = document_data.to_plain_text(); diff --git a/services/appflowy-collaborate/src/rt_server.rs b/services/appflowy-collaborate/src/rt_server.rs index 7dae0dc7..a4fd4a23 100644 --- a/services/appflowy-collaborate/src/rt_server.rs +++ b/services/appflowy-collaborate/src/rt_server.rs @@ -270,7 +270,8 @@ fn spawn_period_check_inactive_group( loop { interval.tick().await; if let Some(groups) = weak_groups.upgrade() { - let inactive_group_ids = groups.inactive_groups().await; + let inactive_group_ids = groups.get_inactive_groups().await; + trace!("Inactive group ids: {:?}", inactive_group_ids); for id in inactive_group_ids { cloned_group_sender_by_object_id.remove(&id); }