chore: Adjust gen history interval (#812)

* chore: adjust generate history interval

* chore: disable verbose log
This commit is contained in:
Nathan.fooo 2024-09-11 13:45:53 +08:00 committed by GitHub
parent 7cd88b7502
commit fff93e6083
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 134 additions and 88 deletions

View File

@ -200,7 +200,7 @@ impl CollabGroup {
modified_at.elapsed().as_secs(),
self.subscribers.len()
);
modified_at.elapsed().as_secs() > 60 * 2
modified_at.elapsed().as_secs() > 60 * 3
} else {
let elapsed_secs = modified_at.elapsed().as_secs();
if elapsed_secs > self.timeout_secs() {
@ -246,7 +246,7 @@ impl CollabGroup {
#[inline]
fn timeout_secs(&self) -> u64 {
match self.collab_type {
CollabType::Document => 10 * 60, // 10 minutes
CollabType::Document => 30 * 60, // 30 minutes
CollabType::Database | CollabType::DatabaseRow => 30 * 60, // 30 minutes
CollabType::WorkspaceDatabase | CollabType::Folder | CollabType::UserAwareness => 6 * 60 * 60, // 6 hours,
CollabType::Unknown => {

View File

@ -10,7 +10,9 @@ use std::sync::Arc;
use database::history::ops::get_snapshot_meta_list;
use tonic_proto::history::{RepeatedSnapshotMetaPb, SnapshotMetaPb};
use crate::biz::snapshot::{CollabSnapshot, CollabSnapshotState, SnapshotGenerator};
use crate::biz::snapshot::{
calculate_edit_count, CollabSnapshot, CollabSnapshotState, SnapshotGenerator,
};
use crate::error::HistoryError;
pub struct CollabHistory {
@ -22,8 +24,25 @@ pub struct CollabHistory {
impl CollabHistory {
pub async fn new(object_id: &str, collab: Arc<RwLock<Collab>>, collab_type: CollabType) -> Self {
let snapshot_generator =
SnapshotGenerator::new(object_id, Arc::downgrade(&collab), collab_type.clone());
let current_edit_count = {
let read_guard = collab.read().await;
let txn = read_guard.transact();
calculate_edit_count(&txn)
};
#[cfg(feature = "verbose_log")]
tracing::trace!(
"[History] object:{} init edit count: {}",
object_id,
current_edit_count
);
let snapshot_generator = SnapshotGenerator::new(
object_id,
Arc::downgrade(&collab),
collab_type.clone(),
current_edit_count as u32,
);
collab.read().await.add_plugin(Box::new(CountUpdatePlugin {
snapshot_generator: snapshot_generator.clone(),
}));
@ -43,7 +62,24 @@ impl CollabHistory {
}
}
pub async fn gen_snapshot_context(&self) -> Result<Option<SnapshotContext>, HistoryError> {
pub async fn gen_history(
&self,
min_snapshot_required: Option<usize>,
) -> Result<Option<HistoryContext>, HistoryError> {
if let Some(min_snapshot_required) = min_snapshot_required {
let num_snapshot = self.snapshot_generator.num_pending_snapshots().await;
if num_snapshot < min_snapshot_required {
#[cfg(feature = "verbose_log")]
tracing::trace!(
"[History]: {} current snapshot:{}, minimum required:{}",
self.object_id,
num_snapshot,
min_snapshot_required
);
return Ok(None);
}
}
let collab = self.collab.clone();
let timestamp = chrono::Utc::now().timestamp();
let snapshots: Vec<CollabSnapshot> = self.snapshot_generator.consume_pending_snapshots().await
@ -54,6 +90,8 @@ impl CollabHistory {
// If there are no snapshots, we don't need to generate a new snapshot
if snapshots.is_empty() {
#[cfg(feature = "verbose_log")]
tracing::trace!("[History]: {} has no snapshots", self.object_id,);
return Ok(None);
}
let collab_type = self.collab_type.clone();
@ -88,7 +126,7 @@ impl CollabHistory {
state_vector,
chrono::Utc::now().timestamp(),
);
Ok(Some(SnapshotContext {
Ok(Some(HistoryContext {
collab_type,
state,
snapshots,
@ -116,7 +154,7 @@ impl CollabHistory {
}
}
pub struct SnapshotContext {
pub struct HistoryContext {
pub collab_type: CollabType,
pub state: CollabSnapshotState,
pub snapshots: Vec<CollabSnapshot>,

View File

@ -19,7 +19,7 @@ impl HistoryPersistence {
pg_pool,
}
}
pub async fn save_snapshot(
pub async fn insert_history(
&self,
state: CollabSnapshotState,
snapshots: Vec<CollabSnapshot>,

View File

@ -22,13 +22,18 @@ pub struct SnapshotGenerator {
}
impl SnapshotGenerator {
pub fn new(object_id: &str, mutex_collab: Weak<RwLock<Collab>>, collab_type: CollabType) -> Self {
pub fn new(
object_id: &str,
mutex_collab: Weak<RwLock<Collab>>,
collab_type: CollabType,
edit_count: u32,
) -> Self {
Self {
object_id: object_id.to_string(),
mutex_collab,
collab_type,
current_update_count: Default::default(),
prev_edit_count: Default::default(),
current_update_count: Arc::new(ArcSwap::new(Arc::new(edit_count))),
prev_edit_count: Arc::new(ArcSwap::new(Arc::new(edit_count))),
pending_snapshots: Default::default(),
}
}
@ -43,11 +48,20 @@ impl SnapshotGenerator {
!self.pending_snapshots.lock().await.is_empty()
}
pub async fn num_pending_snapshots(&self) -> usize {
self.pending_snapshots.lock().await.len()
}
/// Generate a snapshot if the current edit count is not zero.
pub async fn generate(&self) {
if let Some(collab) = self.mutex_collab.upgrade() {
let is_change = self.current_update_count.load_full() != self.prev_edit_count.load_full();
if is_change {
let current = self.current_update_count.load_full();
let prev = self.prev_edit_count.load_full();
if current < prev {
return;
}
let threshold_count = *current - *prev;
if threshold_count > snapshot_min_edit_threshold(&self.collab_type) {
self
.prev_edit_count
.store(self.current_update_count.load_full());
@ -60,12 +74,6 @@ impl SnapshotGenerator {
"generate snapshot by periodic tick",
);
self.pending_snapshots.lock().await.push(snapshot);
} else {
#[cfg(feature = "verbose_log")]
tracing::trace!(
"[History]: object:{} no change, skip generating snapshot",
self.object_id
);
}
} else {
warn!("collab is dropped. cannot generate snapshot")
@ -74,28 +82,10 @@ impl SnapshotGenerator {
pub fn did_apply_update<T: ReadTxn>(&self, txn: &T) {
let txn_edit_count = calculate_edit_count(txn);
#[cfg(feature = "verbose_log")]
tracing::trace!(
"[History] object:{} edit count: {}",
self.object_id,
txn_edit_count
);
self
.current_update_count
.store(Arc::new(txn_edit_count as u32));
// keep it simple for now. we just compare the update count to determine if we need to generate a snapshot.
// in the future, we can use a more sophisticated algorithm to determine when to generate a snapshot.
let threshold = gen_snapshot_threshold(&self.collab_type);
#[cfg(feature = "verbose_log")]
tracing::trace!(
"[History] object_id:{}, update count:{}, threshold={}",
self.object_id,
txn_edit_count,
threshold,
);
let current = self.current_update_count.load_full();
let prev = self.prev_edit_count.load_full();
if current < prev {
@ -105,8 +95,16 @@ impl SnapshotGenerator {
);
return;
}
let threshold_count = *current - *prev;
let threshold = snapshot_max_edit_threshold(&self.collab_type);
#[cfg(feature = "verbose_log")]
tracing::trace!(
"[History] object_id:{}, update count:{}, threshold={}",
self.object_id,
threshold_count,
threshold,
);
if threshold_count + 1 >= threshold {
self.prev_edit_count.store(Arc::new(txn_edit_count as u32));
let pending_snapshots = self.pending_snapshots.clone();
@ -129,14 +127,14 @@ impl SnapshotGenerator {
}
#[inline]
fn gen_snapshot_threshold(collab_type: &CollabType) -> u32 {
fn snapshot_max_edit_threshold(collab_type: &CollabType) -> u32 {
match collab_type {
CollabType::Document => 500,
CollabType::Database => 50,
CollabType::WorkspaceDatabase => 50,
CollabType::Folder => 50,
CollabType::DatabaseRow => 50,
CollabType::UserAwareness => 50,
CollabType::Database => 30,
CollabType::WorkspaceDatabase => 10,
CollabType::Folder => 10,
CollabType::DatabaseRow => 10,
CollabType::UserAwareness => 20,
CollabType::Unknown => {
if cfg!(debug_assertions) {
5
@ -147,6 +145,18 @@ fn gen_snapshot_threshold(collab_type: &CollabType) -> u32 {
}
}
#[inline]
fn snapshot_min_edit_threshold(collab_type: &CollabType) -> u32 {
match collab_type {
CollabType::Document => 50,
CollabType::Database => 10,
CollabType::WorkspaceDatabase => 10,
CollabType::Folder => 10,
CollabType::DatabaseRow => 10,
CollabType::UserAwareness => 10,
CollabType::Unknown => 5,
}
}
#[inline]
pub fn gen_snapshot(collab: &Collab, object_id: &str, reason: &str) -> CollabSnapshot {
tracing::trace!(
@ -245,7 +255,7 @@ impl From<CollabSnapshot> for SnapshotMetaPb {
}
#[inline]
fn calculate_edit_count<T: ReadTxn>(txn: &T) -> u64 {
pub(crate) fn calculate_edit_count<T: ReadTxn>(txn: &T) -> u64 {
let snapshot = txn.snapshot();
let mut insert_count = 0;
for (_, &clock) in snapshot.state_map.iter() {

View File

@ -82,15 +82,6 @@ impl OpenCollabHandle {
doc_state_version: 1,
})
}
pub async fn generate_history(&self) -> Result<(), HistoryError> {
if let Some(history_persistence) = &self.history_persistence {
// Generate at least one snapshot if the history is empty.
self.history.generate_snapshot_if_empty().await;
save_history_if_snapshot_exists(self.history.clone(), history_persistence.clone()).await;
}
Ok(())
}
}
/// Spawns an asynchronous task to continuously receive and process updates from a given update stream.
@ -218,49 +209,56 @@ fn apply_updates(
}
Ok(())
}
fn spawn_save_history(history: Weak<CollabHistory>, history_persistence: Weak<HistoryPersistence>) {
tokio::spawn(async move {
let mut interval = if cfg!(debug_assertions) {
// In debug mode, save the history every 10 seconds.
interval(Duration::from_secs(10))
} else {
// In release mode, save the history every 10 minutes.
interval(Duration::from_secs(10 * 60))
interval(Duration::from_secs(5 * 60))
};
interval.tick().await;
interval.tick().await; // Initial delay
let mut tick_count = 1;
loop {
interval.tick().await;
if let (Some(history), Some(history_persistence)) =
(history.upgrade(), history_persistence.upgrade())
interval.tick().await; // Wait for the next interval tick
if let (Some(history), Some(persistence)) = (history.upgrade(), history_persistence.upgrade())
{
history.generate_snapshot_if_empty().await;
save_history_if_snapshot_exists(history, history_persistence).await;
} else {
let min_snapshot_required = if tick_count % 10 == 0 {
history.generate_snapshot_if_empty().await;
None // No limit on snapshots every 3 ticks
} else {
Some(3)
};
#[cfg(feature = "verbose_log")]
tracing::trace!("[History]: exit periodically save history task.");
tracing::trace!(
"[History]: {} periodic save history task. tick count: {}, min_snapshot_required:{:?}",
&history.object_id,
tick_count,
min_snapshot_required
);
// Generate history and attempt to insert it into persistence
match history.gen_history(min_snapshot_required).await {
Ok(Some(ctx)) => {
if let Err(err) = persistence
.insert_history(ctx.state, ctx.snapshots, ctx.collab_type)
.await
{
error!("Failed to save snapshot: {:?}", err);
}
},
Ok(None) => {}, // No history to save
Err(err) => error!("Error generating history: {:?}", err),
}
tick_count += 1;
} else {
// Exit loop if history or persistence has been dropped
#[cfg(feature = "verbose_log")]
tracing::trace!("[History]: exiting periodic save history task");
break;
}
}
});
}
#[inline]
async fn save_history_if_snapshot_exists(
history: Arc<CollabHistory>,
history_persistence: Arc<HistoryPersistence>,
) {
match history.gen_snapshot_context().await {
Ok(Some(ctx)) => {
if let Err(err) = history_persistence
.save_snapshot(ctx.state, ctx.snapshots, ctx.collab_type)
.await
{
error!("Failed to save snapshot: {:?}", err);
}
},
Ok(None) => {},
Err(err) => error!("{:?}", err),
}
}

View File

@ -25,8 +25,8 @@ async fn main() -> Result<()> {
let mut appflowy_history_cmd = Command::new("cargo")
.args([
"run",
"--features",
"verbose_log",
// "--features",
// "verbose_log",
"--manifest-path",
"./services/appflowy-history/Cargo.toml",
])