From fff93e6083d8eb860ed3ea66261d5de04bdedb32 Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Wed, 11 Sep 2024 13:45:53 +0800 Subject: [PATCH] chore: Adjust gen history interval (#812) * chore: adjust generate history interval * chore: disable verbose log --- .../src/group/group_init.rs | 4 +- services/appflowy-history/src/biz/history.rs | 50 +++++++++-- .../appflowy-history/src/biz/persistence.rs | 2 +- services/appflowy-history/src/biz/snapshot.rs | 84 +++++++++++-------- .../appflowy-history/src/core/open_handle.rs | 78 +++++++++-------- xtask/src/main.rs | 4 +- 6 files changed, 134 insertions(+), 88 deletions(-) diff --git a/services/appflowy-collaborate/src/group/group_init.rs b/services/appflowy-collaborate/src/group/group_init.rs index 09c8b556..5c316a1b 100644 --- a/services/appflowy-collaborate/src/group/group_init.rs +++ b/services/appflowy-collaborate/src/group/group_init.rs @@ -200,7 +200,7 @@ impl CollabGroup { modified_at.elapsed().as_secs(), self.subscribers.len() ); - modified_at.elapsed().as_secs() > 60 * 2 + modified_at.elapsed().as_secs() > 60 * 3 } else { let elapsed_secs = modified_at.elapsed().as_secs(); if elapsed_secs > self.timeout_secs() { @@ -246,7 +246,7 @@ impl CollabGroup { #[inline] fn timeout_secs(&self) -> u64 { match self.collab_type { - CollabType::Document => 10 * 60, // 10 minutes + CollabType::Document => 30 * 60, // 30 minutes CollabType::Database | CollabType::DatabaseRow => 30 * 60, // 30 minutes CollabType::WorkspaceDatabase | CollabType::Folder | CollabType::UserAwareness => 6 * 60 * 60, // 6 hours, CollabType::Unknown => { diff --git a/services/appflowy-history/src/biz/history.rs b/services/appflowy-history/src/biz/history.rs index b566e07b..76c0c4d8 100644 --- a/services/appflowy-history/src/biz/history.rs +++ b/services/appflowy-history/src/biz/history.rs @@ -10,7 +10,9 @@ use std::sync::Arc; use database::history::ops::get_snapshot_meta_list; use tonic_proto::history::{RepeatedSnapshotMetaPb, SnapshotMetaPb}; -use crate::biz::snapshot::{CollabSnapshot, CollabSnapshotState, SnapshotGenerator}; +use crate::biz::snapshot::{ + calculate_edit_count, CollabSnapshot, CollabSnapshotState, SnapshotGenerator, +}; use crate::error::HistoryError; pub struct CollabHistory { @@ -22,8 +24,25 @@ pub struct CollabHistory { impl CollabHistory { pub async fn new(object_id: &str, collab: Arc>, collab_type: CollabType) -> Self { - let snapshot_generator = - SnapshotGenerator::new(object_id, Arc::downgrade(&collab), collab_type.clone()); + let current_edit_count = { + let read_guard = collab.read().await; + let txn = read_guard.transact(); + calculate_edit_count(&txn) + }; + + #[cfg(feature = "verbose_log")] + tracing::trace!( + "[History] object:{} init edit count: {}", + object_id, + current_edit_count + ); + + let snapshot_generator = SnapshotGenerator::new( + object_id, + Arc::downgrade(&collab), + collab_type.clone(), + current_edit_count as u32, + ); collab.read().await.add_plugin(Box::new(CountUpdatePlugin { snapshot_generator: snapshot_generator.clone(), })); @@ -43,7 +62,24 @@ impl CollabHistory { } } - pub async fn gen_snapshot_context(&self) -> Result, HistoryError> { + pub async fn gen_history( + &self, + min_snapshot_required: Option, + ) -> Result, HistoryError> { + if let Some(min_snapshot_required) = min_snapshot_required { + let num_snapshot = self.snapshot_generator.num_pending_snapshots().await; + if num_snapshot < min_snapshot_required { + #[cfg(feature = "verbose_log")] + tracing::trace!( + "[History]: {} current snapshot:{}, minimum required:{}", + self.object_id, + num_snapshot, + min_snapshot_required + ); + return Ok(None); + } + } + let collab = self.collab.clone(); let timestamp = chrono::Utc::now().timestamp(); let snapshots: Vec = self.snapshot_generator.consume_pending_snapshots().await @@ -54,6 +90,8 @@ impl CollabHistory { // If there are no snapshots, we don't need to generate a new snapshot if snapshots.is_empty() { + #[cfg(feature = "verbose_log")] + tracing::trace!("[History]: {} has no snapshots", self.object_id,); return Ok(None); } let collab_type = self.collab_type.clone(); @@ -88,7 +126,7 @@ impl CollabHistory { state_vector, chrono::Utc::now().timestamp(), ); - Ok(Some(SnapshotContext { + Ok(Some(HistoryContext { collab_type, state, snapshots, @@ -116,7 +154,7 @@ impl CollabHistory { } } -pub struct SnapshotContext { +pub struct HistoryContext { pub collab_type: CollabType, pub state: CollabSnapshotState, pub snapshots: Vec, diff --git a/services/appflowy-history/src/biz/persistence.rs b/services/appflowy-history/src/biz/persistence.rs index 756bfb94..f3ca0fec 100644 --- a/services/appflowy-history/src/biz/persistence.rs +++ b/services/appflowy-history/src/biz/persistence.rs @@ -19,7 +19,7 @@ impl HistoryPersistence { pg_pool, } } - pub async fn save_snapshot( + pub async fn insert_history( &self, state: CollabSnapshotState, snapshots: Vec, diff --git a/services/appflowy-history/src/biz/snapshot.rs b/services/appflowy-history/src/biz/snapshot.rs index 4139f8b5..4b4a9c43 100644 --- a/services/appflowy-history/src/biz/snapshot.rs +++ b/services/appflowy-history/src/biz/snapshot.rs @@ -22,13 +22,18 @@ pub struct SnapshotGenerator { } impl SnapshotGenerator { - pub fn new(object_id: &str, mutex_collab: Weak>, collab_type: CollabType) -> Self { + pub fn new( + object_id: &str, + mutex_collab: Weak>, + collab_type: CollabType, + edit_count: u32, + ) -> Self { Self { object_id: object_id.to_string(), mutex_collab, collab_type, - current_update_count: Default::default(), - prev_edit_count: Default::default(), + current_update_count: Arc::new(ArcSwap::new(Arc::new(edit_count))), + prev_edit_count: Arc::new(ArcSwap::new(Arc::new(edit_count))), pending_snapshots: Default::default(), } } @@ -43,11 +48,20 @@ impl SnapshotGenerator { !self.pending_snapshots.lock().await.is_empty() } + pub async fn num_pending_snapshots(&self) -> usize { + self.pending_snapshots.lock().await.len() + } + /// Generate a snapshot if the current edit count is not zero. pub async fn generate(&self) { if let Some(collab) = self.mutex_collab.upgrade() { - let is_change = self.current_update_count.load_full() != self.prev_edit_count.load_full(); - if is_change { + let current = self.current_update_count.load_full(); + let prev = self.prev_edit_count.load_full(); + if current < prev { + return; + } + let threshold_count = *current - *prev; + if threshold_count > snapshot_min_edit_threshold(&self.collab_type) { self .prev_edit_count .store(self.current_update_count.load_full()); @@ -60,12 +74,6 @@ impl SnapshotGenerator { "generate snapshot by periodic tick", ); self.pending_snapshots.lock().await.push(snapshot); - } else { - #[cfg(feature = "verbose_log")] - tracing::trace!( - "[History]: object:{} no change, skip generating snapshot", - self.object_id - ); } } else { warn!("collab is dropped. cannot generate snapshot") @@ -74,28 +82,10 @@ impl SnapshotGenerator { pub fn did_apply_update(&self, txn: &T) { let txn_edit_count = calculate_edit_count(txn); - #[cfg(feature = "verbose_log")] - tracing::trace!( - "[History] object:{} edit count: {}", - self.object_id, - txn_edit_count - ); - self .current_update_count .store(Arc::new(txn_edit_count as u32)); - // keep it simple for now. we just compare the update count to determine if we need to generate a snapshot. - // in the future, we can use a more sophisticated algorithm to determine when to generate a snapshot. - let threshold = gen_snapshot_threshold(&self.collab_type); - #[cfg(feature = "verbose_log")] - tracing::trace!( - "[History] object_id:{}, update count:{}, threshold={}", - self.object_id, - txn_edit_count, - threshold, - ); - let current = self.current_update_count.load_full(); let prev = self.prev_edit_count.load_full(); if current < prev { @@ -105,8 +95,16 @@ impl SnapshotGenerator { ); return; } - let threshold_count = *current - *prev; + let threshold = snapshot_max_edit_threshold(&self.collab_type); + #[cfg(feature = "verbose_log")] + tracing::trace!( + "[History] object_id:{}, update count:{}, threshold={}", + self.object_id, + threshold_count, + threshold, + ); + if threshold_count + 1 >= threshold { self.prev_edit_count.store(Arc::new(txn_edit_count as u32)); let pending_snapshots = self.pending_snapshots.clone(); @@ -129,14 +127,14 @@ impl SnapshotGenerator { } #[inline] -fn gen_snapshot_threshold(collab_type: &CollabType) -> u32 { +fn snapshot_max_edit_threshold(collab_type: &CollabType) -> u32 { match collab_type { CollabType::Document => 500, - CollabType::Database => 50, - CollabType::WorkspaceDatabase => 50, - CollabType::Folder => 50, - CollabType::DatabaseRow => 50, - CollabType::UserAwareness => 50, + CollabType::Database => 30, + CollabType::WorkspaceDatabase => 10, + CollabType::Folder => 10, + CollabType::DatabaseRow => 10, + CollabType::UserAwareness => 20, CollabType::Unknown => { if cfg!(debug_assertions) { 5 @@ -147,6 +145,18 @@ fn gen_snapshot_threshold(collab_type: &CollabType) -> u32 { } } +#[inline] +fn snapshot_min_edit_threshold(collab_type: &CollabType) -> u32 { + match collab_type { + CollabType::Document => 50, + CollabType::Database => 10, + CollabType::WorkspaceDatabase => 10, + CollabType::Folder => 10, + CollabType::DatabaseRow => 10, + CollabType::UserAwareness => 10, + CollabType::Unknown => 5, + } +} #[inline] pub fn gen_snapshot(collab: &Collab, object_id: &str, reason: &str) -> CollabSnapshot { tracing::trace!( @@ -245,7 +255,7 @@ impl From for SnapshotMetaPb { } #[inline] -fn calculate_edit_count(txn: &T) -> u64 { +pub(crate) fn calculate_edit_count(txn: &T) -> u64 { let snapshot = txn.snapshot(); let mut insert_count = 0; for (_, &clock) in snapshot.state_map.iter() { diff --git a/services/appflowy-history/src/core/open_handle.rs b/services/appflowy-history/src/core/open_handle.rs index 15272b42..f8573561 100644 --- a/services/appflowy-history/src/core/open_handle.rs +++ b/services/appflowy-history/src/core/open_handle.rs @@ -82,15 +82,6 @@ impl OpenCollabHandle { doc_state_version: 1, }) } - - pub async fn generate_history(&self) -> Result<(), HistoryError> { - if let Some(history_persistence) = &self.history_persistence { - // Generate at least one snapshot if the history is empty. - self.history.generate_snapshot_if_empty().await; - save_history_if_snapshot_exists(self.history.clone(), history_persistence.clone()).await; - } - Ok(()) - } } /// Spawns an asynchronous task to continuously receive and process updates from a given update stream. @@ -218,49 +209,56 @@ fn apply_updates( } Ok(()) } - fn spawn_save_history(history: Weak, history_persistence: Weak) { tokio::spawn(async move { let mut interval = if cfg!(debug_assertions) { - // In debug mode, save the history every 10 seconds. interval(Duration::from_secs(10)) } else { - // In release mode, save the history every 10 minutes. - interval(Duration::from_secs(10 * 60)) + interval(Duration::from_secs(5 * 60)) }; - interval.tick().await; + interval.tick().await; // Initial delay + let mut tick_count = 1; loop { - interval.tick().await; - if let (Some(history), Some(history_persistence)) = - (history.upgrade(), history_persistence.upgrade()) + interval.tick().await; // Wait for the next interval tick + if let (Some(history), Some(persistence)) = (history.upgrade(), history_persistence.upgrade()) { - history.generate_snapshot_if_empty().await; - save_history_if_snapshot_exists(history, history_persistence).await; - } else { + let min_snapshot_required = if tick_count % 10 == 0 { + history.generate_snapshot_if_empty().await; + None // No limit on snapshots every 3 ticks + } else { + Some(3) + }; + #[cfg(feature = "verbose_log")] - tracing::trace!("[History]: exit periodically save history task."); + tracing::trace!( + "[History]: {} periodic save history task. tick count: {}, min_snapshot_required:{:?}", + &history.object_id, + tick_count, + min_snapshot_required + ); + + // Generate history and attempt to insert it into persistence + match history.gen_history(min_snapshot_required).await { + Ok(Some(ctx)) => { + if let Err(err) = persistence + .insert_history(ctx.state, ctx.snapshots, ctx.collab_type) + .await + { + error!("Failed to save snapshot: {:?}", err); + } + }, + Ok(None) => {}, // No history to save + Err(err) => error!("Error generating history: {:?}", err), + } + + tick_count += 1; + } else { + // Exit loop if history or persistence has been dropped + #[cfg(feature = "verbose_log")] + tracing::trace!("[History]: exiting periodic save history task"); break; } } }); } - -#[inline] -async fn save_history_if_snapshot_exists( - history: Arc, - history_persistence: Arc, -) { - match history.gen_snapshot_context().await { - Ok(Some(ctx)) => { - if let Err(err) = history_persistence - .save_snapshot(ctx.state, ctx.snapshots, ctx.collab_type) - .await - { - error!("Failed to save snapshot: {:?}", err); - } - }, - Ok(None) => {}, - Err(err) => error!("{:?}", err), - } -} diff --git a/xtask/src/main.rs b/xtask/src/main.rs index b32d5054..2a3889c8 100644 --- a/xtask/src/main.rs +++ b/xtask/src/main.rs @@ -25,8 +25,8 @@ async fn main() -> Result<()> { let mut appflowy_history_cmd = Command::new("cargo") .args([ "run", - "--features", - "verbose_log", + // "--features", + // "verbose_log", "--manifest-path", "./services/appflowy-history/Cargo.toml", ])