From 1841dc21e2a3b610c4e2603afb7d985ade0343e6 Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Tue, 20 Feb 2024 06:32:19 +0800 Subject: [PATCH] chore: flush collab base on edit count (#330) --- libs/realtime/src/collaborate/plugin.rs | 84 ++++++++++++++++--------- tests/collab/single_device_edit.rs | 24 +++++++ 2 files changed, 77 insertions(+), 31 deletions(-) diff --git a/libs/realtime/src/collaborate/plugin.rs b/libs/realtime/src/collaborate/plugin.rs index 6719be8e..4acdc0fc 100644 --- a/libs/realtime/src/collaborate/plugin.rs +++ b/libs/realtime/src/collaborate/plugin.rs @@ -2,6 +2,7 @@ use crate::entities::RealtimeUser; use crate::error::RealtimeError; use app_error::AppError; use async_trait::async_trait; +use std::fmt::Display; use crate::collaborate::CollabAccessControl; use anyhow::anyhow; @@ -21,7 +22,7 @@ use parking_lot::Mutex; use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, Ordering}; use std::sync::{Arc, Weak}; -use tracing::{debug, error, event, info, instrument, trace}; +use tracing::{debug, error, event, info, instrument, trace, warn}; use crate::collaborate::group_control::CollabGroup; use yrs::updates::decoder::Decode; @@ -212,23 +213,33 @@ where self.edit_state.set_did_load() } - fn receive_update(&self, _object_id: &str, _txn: &TransactionMut, _update: &[u8]) { - let count = self.edit_state.increment_edit_count(); + fn receive_update(&self, object_id: &str, _txn: &TransactionMut, _update: &[u8]) { + let _ = self.edit_state.increment_edit_count(); if !self.edit_state.did_load() { return; } - if count >= self.storage.config().flush_per_update { - self.edit_state.flush_edit(); - trace!("number of updates reach flush_per_update, start flushing"); - match self.group.upgrade() { - None => error!("Group is dropped, skip flush collab"), - Some(group) => { - tokio::spawn(async move { + trace!("{} edit state:{}", object_id, self.edit_state); + if self + .edit_state + .should_flush(self.storage.config().flush_per_update, 3 * 60) + { + self.edit_state.tick(); + + let object_id = object_id.to_string(); + let weak_group = self.group.clone(); + tokio::spawn(async move { + match weak_group.upgrade() { + None => warn!("{}: Group is dropped, skip flush collab", object_id), + Some(group) => { + info!( + "{}: number of updates reach flush_per_update, start flushing", + object_id + ); group.flush_collab().await; - }); - }, - } + }, + } + }); } } @@ -286,17 +297,30 @@ where struct CollabEditState { edit_count: AtomicU32, - flush_edit_count: AtomicU32, - flush_interval: AtomicI64, + prev_edit_count: AtomicU32, + prev_flush_timestamp: AtomicI64, did_load_collab: AtomicBool, } +impl Display for CollabEditState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "CollabEditState {{ edit_count: {}, prev_edit_count: {}, prev_flush_timestamp: {}, did_load_collab: {} }}", + self.edit_count.load(Ordering::SeqCst), + self.prev_edit_count.load(Ordering::SeqCst), + self.prev_flush_timestamp.load(Ordering::SeqCst), + self.did_load_collab.load(Ordering::SeqCst) + ) + } +} + impl CollabEditState { fn new() -> Self { Self { edit_count: AtomicU32::new(0), - flush_edit_count: Default::default(), - flush_interval: AtomicI64::new(chrono::Utc::now().timestamp()), + prev_edit_count: Default::default(), + prev_flush_timestamp: AtomicI64::new(chrono::Utc::now().timestamp()), did_load_collab: AtomicBool::new(false), } } @@ -313,12 +337,12 @@ impl CollabEditState { self.edit_count.fetch_add(1, Ordering::SeqCst) } - fn flush_edit(&self) { + fn tick(&self) { self - .flush_edit_count + .prev_edit_count .store(self.edit_count.load(Ordering::SeqCst), Ordering::SeqCst); self - .flush_interval + .prev_flush_timestamp .store(chrono::Utc::now().timestamp(), Ordering::SeqCst); } @@ -334,21 +358,19 @@ impl CollabEditState { /// # Arguments /// * `max_edit_count` - The maximum number of edits allowed before a flush is triggered. /// * `max_interval` - The maximum time interval (in seconds) allowed before a flush is triggered. - #[allow(dead_code)] fn should_flush(&self, max_edit_count: u32, max_interval: i64) -> bool { - // compare current time with last flush time - let current = chrono::Utc::now().timestamp(); - let prev = self.flush_interval.load(Ordering::SeqCst); let current_edit_count = self.edit_count.load(Ordering::SeqCst); - let prev_flush_edit_count = self.flush_edit_count.load(Ordering::SeqCst); - - if current > prev && current_edit_count != prev_flush_edit_count { - return current - prev >= max_interval; - } + let prev_edit_count = self.prev_edit_count.load(Ordering::SeqCst); // compare current edit count with last flush edit count - if current_edit_count > prev_flush_edit_count { - return current_edit_count - prev_flush_edit_count >= max_edit_count; + if current_edit_count > prev_edit_count { + return (current_edit_count - prev_edit_count) >= max_edit_count; + } + + let now = chrono::Utc::now().timestamp(); + let prev = self.prev_flush_timestamp.load(Ordering::SeqCst); + if now > prev && current_edit_count != prev_edit_count { + return now - prev >= max_interval; } false diff --git a/tests/collab/single_device_edit.rs b/tests/collab/single_device_edit.rs index 76207c33..6be14115 100644 --- a/tests/collab/single_device_edit.rs +++ b/tests/collab/single_device_edit.rs @@ -502,3 +502,27 @@ async fn post_realtime_message_test() { drop(client); } } + +#[tokio::test] +async fn collab_flush_test() { + let mut new_user = TestClient::new_user().await; + let object_id = Uuid::new_v4().to_string(); + let workspace_id = new_user.workspace_id().await; + new_user + .open_collab(&workspace_id, &object_id, CollabType::Document) + .await; + + // the default flush_per_update is 100 that defined in [WriteConfig] + // so we need to write 200 times to trigger the flush + for i in 0..200 { + new_user + .collab_by_object_id + .get_mut(&object_id) + .unwrap() + .collab + .lock() + .insert(&i.to_string(), i.to_string()); + sleep(Duration::from_millis(300)).await; + } + // TODO(nathan): assert the collab content in disk +}