chore: flush collab base on edit count (#330)

This commit is contained in:
Nathan.fooo 2024-02-20 06:32:19 +08:00 committed by GitHub
parent 5cd16d7544
commit 1841dc21e2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 77 additions and 31 deletions

View File

@ -2,6 +2,7 @@ use crate::entities::RealtimeUser;
use crate::error::RealtimeError;
use app_error::AppError;
use async_trait::async_trait;
use std::fmt::Display;
use crate::collaborate::CollabAccessControl;
use anyhow::anyhow;
@ -21,7 +22,7 @@ use parking_lot::Mutex;
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, Ordering};
use std::sync::{Arc, Weak};
use tracing::{debug, error, event, info, instrument, trace};
use tracing::{debug, error, event, info, instrument, trace, warn};
use crate::collaborate::group_control::CollabGroup;
use yrs::updates::decoder::Decode;
@ -212,23 +213,33 @@ where
self.edit_state.set_did_load()
}
fn receive_update(&self, _object_id: &str, _txn: &TransactionMut, _update: &[u8]) {
let count = self.edit_state.increment_edit_count();
fn receive_update(&self, object_id: &str, _txn: &TransactionMut, _update: &[u8]) {
let _ = self.edit_state.increment_edit_count();
if !self.edit_state.did_load() {
return;
}
if count >= self.storage.config().flush_per_update {
self.edit_state.flush_edit();
trace!("number of updates reach flush_per_update, start flushing");
match self.group.upgrade() {
None => error!("Group is dropped, skip flush collab"),
Some(group) => {
trace!("{} edit state:{}", object_id, self.edit_state);
if self
.edit_state
.should_flush(self.storage.config().flush_per_update, 3 * 60)
{
self.edit_state.tick();
let object_id = object_id.to_string();
let weak_group = self.group.clone();
tokio::spawn(async move {
match weak_group.upgrade() {
None => warn!("{}: Group is dropped, skip flush collab", object_id),
Some(group) => {
info!(
"{}: number of updates reach flush_per_update, start flushing",
object_id
);
group.flush_collab().await;
});
},
}
});
}
}
@ -286,17 +297,30 @@ where
struct CollabEditState {
edit_count: AtomicU32,
flush_edit_count: AtomicU32,
flush_interval: AtomicI64,
prev_edit_count: AtomicU32,
prev_flush_timestamp: AtomicI64,
did_load_collab: AtomicBool,
}
impl Display for CollabEditState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"CollabEditState {{ edit_count: {}, prev_edit_count: {}, prev_flush_timestamp: {}, did_load_collab: {} }}",
self.edit_count.load(Ordering::SeqCst),
self.prev_edit_count.load(Ordering::SeqCst),
self.prev_flush_timestamp.load(Ordering::SeqCst),
self.did_load_collab.load(Ordering::SeqCst)
)
}
}
impl CollabEditState {
fn new() -> Self {
Self {
edit_count: AtomicU32::new(0),
flush_edit_count: Default::default(),
flush_interval: AtomicI64::new(chrono::Utc::now().timestamp()),
prev_edit_count: Default::default(),
prev_flush_timestamp: AtomicI64::new(chrono::Utc::now().timestamp()),
did_load_collab: AtomicBool::new(false),
}
}
@ -313,12 +337,12 @@ impl CollabEditState {
self.edit_count.fetch_add(1, Ordering::SeqCst)
}
fn flush_edit(&self) {
fn tick(&self) {
self
.flush_edit_count
.prev_edit_count
.store(self.edit_count.load(Ordering::SeqCst), Ordering::SeqCst);
self
.flush_interval
.prev_flush_timestamp
.store(chrono::Utc::now().timestamp(), Ordering::SeqCst);
}
@ -334,21 +358,19 @@ impl CollabEditState {
/// # Arguments
/// * `max_edit_count` - The maximum number of edits allowed before a flush is triggered.
/// * `max_interval` - The maximum time interval (in seconds) allowed before a flush is triggered.
#[allow(dead_code)]
fn should_flush(&self, max_edit_count: u32, max_interval: i64) -> bool {
// compare current time with last flush time
let current = chrono::Utc::now().timestamp();
let prev = self.flush_interval.load(Ordering::SeqCst);
let current_edit_count = self.edit_count.load(Ordering::SeqCst);
let prev_flush_edit_count = self.flush_edit_count.load(Ordering::SeqCst);
if current > prev && current_edit_count != prev_flush_edit_count {
return current - prev >= max_interval;
}
let prev_edit_count = self.prev_edit_count.load(Ordering::SeqCst);
// compare current edit count with last flush edit count
if current_edit_count > prev_flush_edit_count {
return current_edit_count - prev_flush_edit_count >= max_edit_count;
if current_edit_count > prev_edit_count {
return (current_edit_count - prev_edit_count) >= max_edit_count;
}
let now = chrono::Utc::now().timestamp();
let prev = self.prev_flush_timestamp.load(Ordering::SeqCst);
if now > prev && current_edit_count != prev_edit_count {
return now - prev >= max_interval;
}
false

View File

@ -502,3 +502,27 @@ async fn post_realtime_message_test() {
drop(client);
}
}
#[tokio::test]
async fn collab_flush_test() {
let mut new_user = TestClient::new_user().await;
let object_id = Uuid::new_v4().to_string();
let workspace_id = new_user.workspace_id().await;
new_user
.open_collab(&workspace_id, &object_id, CollabType::Document)
.await;
// the default flush_per_update is 100 that defined in [WriteConfig]
// so we need to write 200 times to trigger the flush
for i in 0..200 {
new_user
.collab_by_object_id
.get_mut(&object_id)
.unwrap()
.collab
.lock()
.insert(&i.to_string(), i.to_string());
sleep(Duration::from_millis(300)).await;
}
// TODO(nathan): assert the collab content in disk
}