diff --git a/libs/client-api/src/collab_sync/sink.rs b/libs/client-api/src/collab_sync/sink.rs index 14068570..4533f3cc 100644 --- a/libs/client-api/src/collab_sync/sink.rs +++ b/libs/client-api/src/collab_sync/sink.rs @@ -6,7 +6,7 @@ use std::sync::{Arc, Weak}; use std::time::Duration; use crate::collab_sync::pending_msg::{MessageState, PendingMsgQueue}; -use crate::collab_sync::{SyncError, DEFAULT_SYNC_TIMEOUT}; +use crate::collab_sync::{SyncError, SyncObject, DEFAULT_SYNC_TIMEOUT}; use futures_util::SinkExt; use realtime_entity::collab_msg::{CollabSinkMessage, MsgId}; @@ -41,7 +41,7 @@ pub struct CollabSink { /// The [PendingMsgQueue] is used to queue the messages that are waiting to be sent to the /// remote. It will merge the messages if possible. pending_msg_queue: Arc>>, - msg_id_counter: Arc, + msg_id_counter: Arc, /// The [watch::Sender] is used to notify the [CollabSinkRunner] to process the pending messages. /// Sending `false` will stop the [CollabSinkRunner]. @@ -49,7 +49,6 @@ pub struct CollabSink { config: SinkConfig, /// Stop the [IntervalRunner] if the sink strategy is [SinkStrategy::FixInterval]. - #[allow(dead_code)] interval_runner_stop_tx: Option>, /// Used to calculate the time interval between two messages. Only used when the sink strategy @@ -57,10 +56,17 @@ pub struct CollabSink { instant: Mutex, state_notifier: Arc>, pause: AtomicBool, + object: SyncObject, } impl Drop for CollabSink { fn drop(&mut self) { + trace!("Drop CollabSink {}", self.object.object_id); + if let Some(stop_tx) = self.interval_runner_stop_tx.take() { + spawn(async move { + let _ = stop_tx.send(()).await; + }); + } let _ = self.notifier.send(true); } } @@ -71,18 +77,16 @@ where Sink: SinkExt + Send + Sync + Unpin + 'static, Msg: CollabSinkMessage, { - pub fn new( + pub fn new( uid: i64, + object: SyncObject, sink: Sink, notifier: watch::Sender, sync_state_tx: watch::Sender, - msg_id_counter: C, config: SinkConfig, pause: bool, - ) -> Self - where - C: MsgIdCounter, - { + ) -> Self { + let msg_id_counter = DefaultMsgIdCounter::new(); let notifier = Arc::new(notifier); let state_notifier = Arc::new(sync_state_tx); let sender = Arc::new(Mutex::new(sink)); @@ -109,6 +113,7 @@ where instant, interval_runner_stop_tx, pause: AtomicBool::new(pause), + object, } } @@ -312,7 +317,7 @@ where match self.sender.try_lock() { Ok(mut sender) => { - debug!("ending {}", collab_msg); + debug!("Sending {}", collab_msg); sender.send(collab_msg).await.ok()?; }, Err(_) => { @@ -362,12 +367,6 @@ where pub(crate) fn notify(&self) { let _ = self.notifier.send(false); } - - /// Stop the sink. - #[allow(dead_code)] - fn stop(&self) { - let _ = self.notifier.send(true); - } } fn retry_later(weak_notifier: Weak>) { @@ -491,9 +490,6 @@ impl DefaultMsgIdCounter { pub fn new() -> Self { Self::default() } -} - -impl MsgIdCounter for DefaultMsgIdCounter { fn next(&self) -> MsgId { self.0.fetch_add(1, Ordering::SeqCst) } diff --git a/libs/client-api/src/collab_sync/sync.rs b/libs/client-api/src/collab_sync/sync.rs index f6a7d0da..93759a0c 100644 --- a/libs/client-api/src/collab_sync/sync.rs +++ b/libs/client-api/src/collab_sync/sync.rs @@ -1,5 +1,5 @@ use crate::collab_sync::{ - CollabSink, CollabSinkRunner, DefaultMsgIdCounter, SinkConfig, SinkState, SyncError, SyncObject, + CollabSink, CollabSinkRunner, SinkConfig, SinkState, SyncError, SyncObject, }; use bytes::Bytes; use collab::core::collab::MutexCollab; @@ -67,10 +67,10 @@ where let sink = Arc::new(CollabSink::new( origin.client_user_id().unwrap_or(0), + object.clone(), sink, notifier, sync_state_tx, - DefaultMsgIdCounter::new(), config, pause, )); diff --git a/src/api/workspace.rs b/src/api/workspace.rs index c48e5e89..6846e263 100644 --- a/src/api/workspace.rs +++ b/src/api/workspace.rs @@ -41,6 +41,9 @@ pub fn workspace_scope() -> Scope { ) .service( web::resource("{workspace_id}/collab/{object_id}") + .app_data( + PayloadConfig::new(5 * 1024 * 1024), // 10 MB + ) .route(web::post().to(create_collab_handler)) .route(web::get().to(get_collab_handler)) .route(web::put().to(update_collab_handler)) @@ -68,7 +71,7 @@ pub fn collab_scope() -> Scope { web::scope("/api/realtime").service( web::resource("post") .app_data( - PayloadConfig::new(10 * 1024 * 1024), // 10 MB + PayloadConfig::new(5 * 1024 * 1024), // 10 MB ) .route(web::post().to(post_realtime_message_handler)), )