From 3433cad5cf6fb70a2af780ae9d3b084b76db4a7c Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Tue, 1 Oct 2024 08:57:02 +0200 Subject: [PATCH] chore: draft of stateless collab group --- Cargo.lock | 1 + Cargo.toml | 1 + libs/client-api/Cargo.toml | 2 +- services/appflowy-collaborate/Cargo.toml | 1 + .../src/group/broadcast.rs | 66 +-- .../src/group/group_init.rs | 405 ++++++++---------- .../appflowy-collaborate/src/group/manager.rs | 3 - .../appflowy-collaborate/src/group/mod.rs | 1 - 8 files changed, 215 insertions(+), 265 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dcbec97b..802d7c1c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -709,6 +709,7 @@ dependencies = [ "anyhow", "app-error", "appflowy-ai-client", + "arc-swap", "async-stream", "async-trait", "authentication", diff --git a/Cargo.toml b/Cargo.toml index 17438810..ec1a4834 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -282,6 +282,7 @@ sanitize-filename = "0.5.0" base64 = "0.22" md5 = "0.7.0" pin-project = "1.1.5" +arc-swap = { version = "1.7" } # collaboration yrs = { version = "0.21.3", features = ["sync"] } diff --git a/libs/client-api/Cargo.toml b/libs/client-api/Cargo.toml index 758397be..3778ae03 100644 --- a/libs/client-api/Cargo.toml +++ b/libs/client-api/Cargo.toml @@ -38,7 +38,7 @@ serde_json.workspace = true serde.workspace = true app-error = { workspace = true, features = ["tokio_error", "bincode_error"] } scraper = { version = "0.17.1", optional = true } -arc-swap = "1.7" +arc-swap.workspace = true shared-entity = { workspace = true } collab-rt-entity = { workspace = true } diff --git a/services/appflowy-collaborate/Cargo.toml b/services/appflowy-collaborate/Cargo.toml index 2e3151a8..d1ae9982 100644 --- a/services/appflowy-collaborate/Cargo.toml +++ b/services/appflowy-collaborate/Cargo.toml @@ -61,6 +61,7 @@ thiserror = "1.0.56" tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } anyhow = "1" bytes.workspace = true +arc-swap.workspace = true collab = { workspace = true } collab-entity = { workspace = true } diff --git a/services/appflowy-collaborate/src/group/broadcast.rs b/services/appflowy-collaborate/src/group/broadcast.rs index 4eda7943..954c89b8 100644 --- a/services/appflowy-collaborate/src/group/broadcast.rs +++ b/services/appflowy-collaborate/src/group/broadcast.rs @@ -1,11 +1,14 @@ use std::borrow::BorrowMut; +use std::pin::Pin; use std::sync::{Arc, Weak}; use anyhow::anyhow; +use arc_swap::{ArcSwap, ArcSwapOption}; use bytes::Bytes; use collab::core::origin::CollabOrigin; use collab::lock::RwLock; use collab::preclude::Collab; +use futures::Stream; use futures_util::{SinkExt, StreamExt}; use tokio::select; use tokio::sync::broadcast::{channel, Sender}; @@ -30,8 +33,13 @@ use crate::group::group_init::EditState; use crate::group::protocol::ServerSyncProtocol; use crate::metrics::CollabRealtimeMetrics; +pub type DataStream = + Pin, RealtimeError>> + Send + Sync + 'static>>; + pub trait CollabUpdateStreaming: 'static + Send + Sync { fn send_update(&self, update: Vec) -> Result<(), RealtimeError>; + fn receive_updates(&self) -> DataStream; + fn receive_awareness_updates(&self) -> DataStream; } /// A broadcast can be used to propagate updates produced by yrs [yrs::Doc] and [Awareness] /// to subscribes. One broadcast can be used to propagate updates for a single document with @@ -40,19 +48,12 @@ pub trait CollabUpdateStreaming: 'static + Send + Sync { pub struct CollabBroadcast { object_id: String, broadcast_sender: Sender, - awareness_sub: Option, - /// Keep the lifetime of the document observer subscription. The subscription will be stopped - /// when the broadcast is dropped. - doc_subscription: Option, edit_state: Arc, /// The last modified time of the document. - pub modified_at: Arc>, + modified_at: Arc>, update_streaming: Arc, } -unsafe impl Send for CollabBroadcast {} -unsafe impl Sync for CollabBroadcast {} - impl Drop for CollabBroadcast { fn drop(&mut self) { trace!("Drop collab broadcast:{}", self.object_id); @@ -70,27 +71,25 @@ impl CollabBroadcast { object_id: &str, buffer_capacity: usize, edit_state: Arc, - collab: &Collab, - update_streaming: impl CollabUpdateStreaming, + update_streaming: Arc, ) -> Self { - let update_streaming = Arc::new(update_streaming); + let update_stream = update_streaming.receive_updates(); + let awareness_update_stream = update_streaming.receive_awareness_updates(); let object_id = object_id.to_owned(); // broadcast channel let (sender, _) = channel(buffer_capacity); let mut this = CollabBroadcast { object_id, broadcast_sender: sender, - awareness_sub: Default::default(), - doc_subscription: Default::default(), edit_state, - modified_at: Arc::new(parking_lot::Mutex::new(Instant::now())), + modified_at: Arc::new(ArcSwap::new(Arc::new(Instant::now()))), update_streaming, }; - this.observe_collab_changes(collab); + this.observe_collab_changes(update_stream, awareness_update_stream); this } - fn observe_collab_changes(&mut self, collab: &Collab) { + fn observe_collab_changes(&mut self, mut updates: DataStream, mut awareness_updates: DataStream) { let (doc_sub, awareness_sub) = { // Observer the document's update and broadcast it to all subscribers. let cloned_oid = self.object_id.clone(); @@ -145,9 +144,10 @@ impl CollabBroadcast { }); (doc_sub, awareness_sub) }; + } - self.doc_subscription = Some(doc_sub); - self.awareness_sub = Some(awareness_sub); + pub fn modified_at(&self) -> Instant { + *self.modified_at.load_full() } /// Subscribes a new connection to a broadcast group @@ -184,7 +184,6 @@ impl CollabBroadcast { subscriber_origin: CollabOrigin, mut sink: Sink, mut stream: Stream, - collab: Weak>, metrics_calculate: Arc, ) -> Subscription where @@ -252,16 +251,7 @@ impl CollabBroadcast { break } let message_map = result.unwrap(); - match collab.upgrade() { - None => { - trace!("{} stop receiving user:{} messages because of collab is drop", user.user_device(), object_id); - // break the loop if the collab is dropped - break - }, - Some(collab) => { - handle_client_messages(&object_id, message_map, &mut sink, collab, &metrics_calculate, &edit_state).await; - } - } + handle_client_messages(&object_id, message_map, &mut sink, &metrics_calculate, &edit_state).await; } } } @@ -281,7 +271,6 @@ async fn handle_client_messages( object_id: &str, message_map: MessageByObjectId, sink: &mut Sink, - collab: Arc + Send + Sync + 'static>>, metrics_calculate: &Arc, edit_state: &Arc, ) where @@ -304,14 +293,8 @@ async fn handle_client_messages( } for collab_message in collab_messages { - match handle_one_client_message( - object_id, - &collab_message, - &collab, - metrics_calculate, - edit_state, - ) - .await + match handle_one_client_message(object_id, &collab_message, metrics_calculate, edit_state) + .await { Ok(response) => { trace!("[realtime]: sending response: {}", response); @@ -339,7 +322,6 @@ async fn handle_client_messages( async fn handle_one_client_message( object_id: &str, collab_msg: &ClientCollabMessage, - collab: &Arc + Send + Sync + 'static>>, metrics_calculate: &Arc, edit_state: &Arc, ) -> Result { @@ -347,7 +329,7 @@ async fn handle_one_client_message( let message_origin = collab_msg.origin().clone(); // If the payload is empty, we don't need to apply any updates . - // Currently, only the ping message should has an empty payload. + // Currently, only the ping message should have an empty payload. if collab_msg.payload().is_empty() { if !matches!(collab_msg, ClientCollabMessage::ClientCollabStateCheck(_)) { error!("receive unexpected empty payload message:{}", collab_msg); @@ -371,7 +353,6 @@ async fn handle_one_client_message( message_origin.clone(), msg_id, collab_msg.payload(), - collab, metrics_calculate, edit_state, ) @@ -384,7 +365,6 @@ async fn handle_one_message_payload( message_origin: CollabOrigin, msg_id: MsgId, payload: &Bytes, - collab: &Arc + Send + Sync + 'static>>, metrics_calculate: &Arc, edit_state: &Arc, ) -> Result { @@ -395,7 +375,6 @@ async fn handle_one_message_payload( let result = handle_message( &payload, &message_origin, - collab, metrics_calculate, object_id, msg_id, @@ -418,7 +397,6 @@ async fn handle_one_message_payload( async fn handle_message( payload: &Bytes, message_origin: &CollabOrigin, - collab: &Arc + Send + Sync + 'static>>, metrics_calculate: &Arc, object_id: &str, msg_id: MsgId, diff --git a/services/appflowy-collaborate/src/group/group_init.rs b/services/appflowy-collaborate/src/group/group_init.rs index 04a3aeea..49071eb8 100644 --- a/services/appflowy-collaborate/src/group/group_init.rs +++ b/services/appflowy-collaborate/src/group/group_init.rs @@ -1,9 +1,4 @@ -use std::collections::VecDeque; -use std::fmt::Display; -use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, Ordering}; -use std::sync::Arc; -use std::time::Duration; - +use arc_swap::{ArcSwap, ArcSwapOption}; use collab::core::origin::CollabOrigin; use collab::entity::EncodedCollab; use collab::lock::RwLock; @@ -11,7 +6,14 @@ use collab::preclude::Collab; use collab_entity::CollabType; use dashmap::DashMap; use futures_util::{SinkExt, StreamExt}; +use std::collections::VecDeque; +use std::fmt::Display; +use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, Ordering}; +use std::sync::Arc; +use std::time::{Duration, Instant}; use tokio::sync::mpsc; +use tokio::time::{interval, MissedTickBehavior}; +use tokio_util::sync::CancellationToken; use tracing::{error, event, info, trace}; use yrs::updates::decoder::Decode; use yrs::updates::encoder::Encode; @@ -27,30 +29,28 @@ use collab_stream::stream_group::StreamGroup; use database::collab::CollabStorage; use crate::error::RealtimeError; -use crate::group::broadcast::{CollabBroadcast, CollabUpdateStreaming, Subscription}; -use crate::group::persistence::GroupPersistence; +use crate::group::broadcast::{CollabBroadcast, CollabUpdateStreaming, DataStream, Subscription}; use crate::indexer::Indexer; use crate::metrics::CollabRealtimeMetrics; /// A group used to manage a single [Collab] object pub struct CollabGroup { - pub workspace_id: String, - pub object_id: String, - collab: Arc>, + workspace_id: String, + object_id: String, collab_type: CollabType, - /// A broadcast used to propagate updates produced by yrs [yrs::Doc] and [Awareness] - /// to subscribes. - broadcast: CollabBroadcast, /// A list of subscribers to this group. Each subscriber will receive updates from the /// broadcast. - subscribers: DashMap, + subscribers: Arc>, + persister: Arc, metrics_calculate: Arc, - destroy_group_tx: mpsc::Sender>>, + /// Cancellation token triggered when current collab group is about to be stopped. + /// This will also shut down all subsequent [Subscription]s. + shutdown: CancellationToken, } impl Drop for CollabGroup { fn drop(&mut self) { - trace!("Drop collab group:{}", self.object_id); + self.shutdown.cancel(); } } @@ -61,65 +61,107 @@ impl CollabGroup { workspace_id: String, object_id: String, collab_type: CollabType, - collab: Arc>, metrics_calculate: Arc, storage: Arc, is_new_collab: bool, collab_redis_stream: Arc, persistence_interval: Duration, - edit_state_max_count: u32, - edit_state_max_secs: i64, indexer: Option>, ) -> Result where S: CollabStorage, { - let edit_state = Arc::new(EditState::new( - edit_state_max_count, - edit_state_max_secs, - is_new_collab, + let persister = Arc::new(CollabPersister::new( + workspace_id.clone(), + object_id.clone(), + collab_type.clone(), + storage, + collab_redis_stream, + indexer, )); - let broadcast = { - let lock = collab.read().await; - CollabBroadcast::new( - &object_id, - 10, - edit_state.clone(), - &lock, - CollabUpdateStreamingImpl::new(&workspace_id, &object_id, &collab_redis_stream).await?, - ) - }; - let (destroy_group_tx, rx) = mpsc::channel(1); - tokio::spawn( - GroupPersistence::new( - workspace_id.clone(), - object_id.clone(), - uid, - storage, - edit_state.clone(), - Arc::downgrade(&collab), - collab_type.clone(), + let shutdown = CancellationToken::new(); + let subscribers = Arc::new(DashMap::new()); + + // setup task used to receive messages from Redis + { + let oid = object_id.clone(); + let shutdown = shutdown.clone(); + tokio::spawn(async move { + if let Err(err) = Self::inbound_task(shutdown).await { + tracing::warn!("`{}` failed to receive message: {}", oid, err); + } + }); + } + + // setup task used to send messages to Redis + { + let oid = object_id.clone(); + let shutdown = shutdown.clone(); + tokio::spawn(async move { + if let Err(err) = Self::outbound_task(shutdown).await { + tracing::warn!("`{}` failed to send message: {}", oid, err); + } + }); + } + + // setup periodic snapshot + { + let shutdown = shutdown.clone(); + tokio::spawn(Self::snapshot_task( persistence_interval, - indexer, - ) - .run(rx), - ); + persister.clone(), + shutdown, + )); + } Ok(Self { workspace_id, object_id, collab_type, - collab, - broadcast, - subscribers: Default::default(), + subscribers, metrics_calculate, - destroy_group_tx, + shutdown, + persister, }) } + /// Task used to receive messages from Redis. + async fn inbound_task(shutdown: CancellationToken) -> Result<(), RealtimeError> { + todo!() + } + + /// Task used to send messages to Redis. + async fn outbound_task(shutdown: CancellationToken) -> Result<(), RealtimeError> { + todo!() + } + + async fn snapshot_task( + interval: Duration, + persister: Arc, + shutdown: CancellationToken, + ) { + let mut snapshot_tick = tokio::time::interval(interval); + snapshot_tick.set_missed_tick_behavior(MissedTickBehavior::Skip); + + loop { + tokio::select! { + _ = snapshot_tick.tick() => { + if let Err(err) = persister.save().await { + tracing::warn!("failed to persist document `{}`: {}", persister.object_id, err); + } + }, + _ = shutdown.cancelled() => { + if let Err(err) = persister.save().await { + tracing::warn!("failed to persist document `{}`: {}", persister.object_id, err); + } + } + } + } + } + pub async fn encode_collab(&self) -> Result { - let lock = self.collab.read().await; + let lock = self.load().await?; let encode_collab = lock.encode_collab_v1(|collab| { self .collab_type @@ -158,14 +200,10 @@ impl CollabGroup { >::Error: std::error::Error + Send + Sync, { // create new subscription for new subscriber - let sub = self.broadcast.subscribe( - user, - subscriber_origin, - sink, - stream, - Arc::downgrade(&self.collab), - self.metrics_calculate.clone(), - ); + let metrics = self.metrics_calculate.clone(); + let sub = self + .broadcast + .subscribe(user, subscriber_origin, sink, stream, metrics); if let Some(mut old) = self.subscribers.insert((*user).clone(), sub) { tracing::warn!("{}: remove old subscriber: {}", &self.object_id, user); @@ -173,8 +211,7 @@ impl CollabGroup { } if cfg!(debug_assertions) { - event!( - tracing::Level::TRACE, + trace!( "{}: add new subscriber, current group member: {}", &self.object_id, self.user_count(), @@ -193,7 +230,7 @@ impl CollabGroup { /// Check if the group is active. A group is considered active if it has at least one /// subscriber pub async fn is_inactive(&self) -> bool { - let modified_at = self.broadcast.modified_at.lock(); + let modified_at = self.modified_at(); // In debug mode, we set the timeout to 60 seconds if cfg!(debug_assertions) { @@ -235,7 +272,7 @@ impl CollabGroup { for mut entry in self.subscribers.iter_mut() { entry.value_mut().stop().await; } - let _ = self.destroy_group_tx.send(self.collab.clone()).await; + self.shutdown.cancel(); } /// Returns the timeout duration in seconds for different collaboration types. @@ -260,131 +297,8 @@ impl CollabGroup { } } -pub(crate) struct EditState { - /// Clients rely on `edit_count` to verify message ordering. A non-continuous sequence suggests - /// missing updates, prompting the client to request an initial synchronization. - /// Continuous sequence numbers ensure the client receives and displays updates in the correct order. - /// - edit_counter: AtomicU32, - prev_edit_count: AtomicU32, - prev_flush_timestamp: AtomicI64, - - max_edit_count: u32, - max_secs: i64, - /// Indicate the collab object is just created in the client and not exist in server database. - is_new: AtomicBool, - /// Indicate the collab is ready to save to disk. - /// If is_ready_to_save is true, which means the collab contains the requirement data and ready to save to disk. - is_ready_to_save: AtomicBool, -} - -impl Display for EditState { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "EditState {{ edit_counter: {}, prev_edit_count: {}, max_edit_count: {}, max_secs: {}, is_new: {}, is_ready_to_save: {}", - self.edit_counter.load(Ordering::SeqCst), - self.prev_edit_count.load(Ordering::SeqCst), - self.max_edit_count, - self.max_secs, - self.is_new.load(Ordering::SeqCst), - self.is_ready_to_save.load(Ordering::SeqCst), - ) - } -} - -impl EditState { - fn new(max_edit_count: u32, max_secs: i64, is_new: bool) -> Self { - Self { - edit_counter: AtomicU32::new(0), - prev_edit_count: Default::default(), - prev_flush_timestamp: AtomicI64::new(chrono::Utc::now().timestamp()), - max_edit_count, - max_secs, - is_new: AtomicBool::new(is_new), - is_ready_to_save: AtomicBool::new(false), - } - } - - pub(crate) fn edit_count(&self) -> u32 { - self.edit_counter.load(Ordering::SeqCst) - } - - /// Increments the edit count and returns the old value - pub(crate) fn increment_edit_count(&self) -> u32 { - self - .edit_counter - .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| { - Some(current + 1) - }) - // safety: unwrap when returning the new value - .unwrap() - } - - pub(crate) fn tick(&self) { - self - .prev_edit_count - .store(self.edit_counter.load(Ordering::SeqCst), Ordering::SeqCst); - self - .prev_flush_timestamp - .store(chrono::Utc::now().timestamp(), Ordering::SeqCst); - } - - pub(crate) fn is_edit(&self) -> bool { - self.edit_counter.load(Ordering::SeqCst) != self.prev_edit_count.load(Ordering::SeqCst) - } - - pub(crate) fn is_new(&self) -> bool { - self.is_new.load(Ordering::SeqCst) - } - - pub(crate) fn set_is_new(&self, is_new: bool) { - self.is_new.store(is_new, Ordering::SeqCst); - } - - pub(crate) fn set_ready_to_save(&self) { - self.is_ready_to_save.store(true, Ordering::Relaxed); - } - - pub(crate) fn should_save_to_disk(&self) -> bool { - if !self.is_ready_to_save.load(Ordering::Relaxed) { - return false; - } - - if self.is_new.load(Ordering::Relaxed) { - return true; - } - - let current_edit_count = self.edit_counter.load(Ordering::SeqCst); - let prev_edit_count = self.prev_edit_count.load(Ordering::SeqCst); - - // If the collab is new, save it to disk and reset the flag - if self.is_new.load(Ordering::SeqCst) { - return true; - } - - if current_edit_count == prev_edit_count { - return false; - } - - // Check if the edit count exceeds the maximum allowed since the last save - let edit_count_exceeded = (current_edit_count > prev_edit_count) - && ((current_edit_count - prev_edit_count) >= self.max_edit_count); - - // Calculate the time since the last flush and check if it exceeds the maximum allowed - let now = chrono::Utc::now().timestamp(); - let prev_flush_timestamp = self.prev_flush_timestamp.load(Ordering::SeqCst); - let time_exceeded = - (now > prev_flush_timestamp) && (now - prev_flush_timestamp >= self.max_secs); - - // Determine if we should save based on either condition being met - edit_count_exceeded || (current_edit_count != prev_edit_count && time_exceeded) - } -} - struct CollabUpdateStreamingImpl { sender: mpsc::UnboundedSender>, - stopped: Arc, } impl CollabUpdateStreamingImpl { @@ -396,16 +310,13 @@ impl CollabUpdateStreamingImpl { let stream = collab_redis_stream .collab_update_stream(workspace_id, object_id, "collaborate_update_producer") .await?; - let stopped = Arc::new(AtomicBool::new(false)); let (sender, receiver) = mpsc::unbounded_channel(); - let cloned_stopped = stopped.clone(); tokio::spawn(async move { if let Err(err) = Self::consume_messages(receiver, stream).await { error!("Failed to consume incoming updates: {}", err); } - cloned_stopped.store(true, Ordering::SeqCst); }); - Ok(Self { sender, stopped }) + Ok(Self { sender }) } async fn consume_messages( @@ -438,42 +349,104 @@ impl CollabUpdateStreamingImpl { } Ok(()) } - - pub fn is_stopped(&self) -> bool { - self.stopped.load(Ordering::SeqCst) - } } impl CollabUpdateStreaming for CollabUpdateStreamingImpl { fn send_update(&self, update: Vec) -> Result<(), RealtimeError> { - if self.is_stopped() { + if let Err(_) = self.sender.send(update) { Err(RealtimeError::Internal(anyhow::anyhow!( "stream stopped processing incoming updates" ))) - } else if let Err(err) = self.sender.send(update) { - Err(RealtimeError::Internal(err.into())) } else { Ok(()) } } -} -#[cfg(test)] -mod tests { - use crate::group::group_init::EditState; + fn receive_updates(&self) -> DataStream { + todo!() + } - #[test] - fn edit_state_test() { - let edit_state = EditState::new(10, 10, false); - edit_state.set_ready_to_save(); - edit_state.increment_edit_count(); - - for _ in 0..10 { - edit_state.increment_edit_count(); - } - assert!(edit_state.should_save_to_disk()); - assert!(edit_state.should_save_to_disk()); - edit_state.tick(); - assert!(!edit_state.should_save_to_disk()); + fn receive_awareness_updates(&self) -> DataStream { + todo!() } } + +struct CollabPersister { + workspace_id: String, + object_id: String, + collab_type: CollabType, + storage: Arc, + collab_redis_stream: Arc, + indexer: Option>, + /// Collab stored temporarily. + temp_collab: ArcSwapOption, +} + +impl CollabPersister { + pub fn new( + workspace_id: String, + object_id: String, + collab_type: CollabType, + storage: Arc, + collab_redis_stream: Arc, + indexer: Option>, + ) -> Self { + Self { + workspace_id, + object_id, + collab_type, + storage, + collab_redis_stream, + indexer, + temp_collab: Default::default(), + } + } + + /// Drop temp collab i.e. because it was no longer up to date or was not accessed for too long. + fn reset(&self) { + self.temp_collab.store(None); // cleanup temp collab + } + + async fn send_update(&self, update: Vec) { + // send updates to redis queue + todo!() + } + + async fn send_awareness(&self, awareness_update: Vec) { + // send awareness updates to redis queue: is it needed? What are we using awareness for here? + todo!() + } + + async fn load(&self) -> Result, RealtimeError> { + match self.temp_collab.load_full() { + Some(collab) => Ok(collab), // return cached collab + None => self.force_load().await, + } + } + + async fn force_load(&self) -> Result, RealtimeError> { + // 1. Try to load the latest snapshot from storage + // 2. consume all Redis updates on top of it (keep redis msg id) + todo!() + } + + async fn receive_updates(&self) -> DataStream { + // 1. loop with yield on incoming Redis stream updates + todo!() + } + + async fn save(&self) -> Result<(), RealtimeError> { + // 1. try to acquire lock + // 2. if successful -> self.load() + // 3. if collab has any changes (any redis updates were applied): + // 4. generate embeddings + // 5. store collab + // 6. prune any redis msg ids older than 5 min. since collab snapshot time + todo!() + } +} + +pub struct CollabSnapshot { + pub collab: Collab, + pub last_msg_id: String, +} diff --git a/services/appflowy-collaborate/src/group/manager.rs b/services/appflowy-collaborate/src/group/manager.rs index 9f2543ca..8887c3d3 100644 --- a/services/appflowy-collaborate/src/group/manager.rs +++ b/services/appflowy-collaborate/src/group/manager.rs @@ -229,14 +229,11 @@ where workspace_id.to_string(), object_id.to_string(), collab_type, - collab, self.metrics_calculate.clone(), self.storage.clone(), is_new_collab, self.collab_redis_stream.clone(), self.persistence_interval, - self.edit_state_max_count, - self.edit_state_max_secs, indexer, ) .await?, diff --git a/services/appflowy-collaborate/src/group/mod.rs b/services/appflowy-collaborate/src/group/mod.rs index b93825eb..369ac030 100644 --- a/services/appflowy-collaborate/src/group/mod.rs +++ b/services/appflowy-collaborate/src/group/mod.rs @@ -2,7 +2,6 @@ pub(crate) mod broadcast; pub(crate) mod cmd; pub(crate) mod group_init; pub(crate) mod manager; -mod persistence; mod plugin; pub(crate) mod protocol; mod state;