use crate::collab_sync::{ CollabSink, CollabSinkRunner, SinkConfig, SinkState, SyncError, SyncObject, }; use bytes::Bytes; use collab::core::collab::MutexCollab; use collab::core::collab_state::SyncState; use collab::core::origin::CollabOrigin; use collab::sync_protocol::awareness::Awareness; use collab::sync_protocol::message::{Message, MessageReader, SyncMessage}; use collab::sync_protocol::{handle_msg, ClientSyncProtocol, CollabSyncProtocol}; use futures_util::{SinkExt, StreamExt}; use lib0::decoding::Cursor; use realtime_entity::collab_msg::{ClientCollabInit, CollabMessage, ServerCollabInit, UpdateSync}; use std::marker::PhantomData; use std::ops::Deref; use std::sync::{Arc, Weak}; use tokio::spawn; use tokio::sync::watch; use tokio_stream::wrappers::WatchStream; use tracing::{error, trace, warn, Level}; use yrs::updates::decoder::DecoderV1; use yrs::updates::encoder::{Encoder, EncoderV1}; pub const DEFAULT_SYNC_TIMEOUT: u64 = 4; pub struct SyncQueue { object: SyncObject, origin: CollabOrigin, /// The [CollabSink] is used to send the updates to the remote. It will send the current /// update periodically if the timeout is reached or it will send the next update if /// it receive previous ack from the remote. sink: Arc>, /// The [SyncStream] will be spawned in a separate task It continuously receive /// the updates from the remote. #[allow(dead_code)] stream: SyncStream, protocol: ClientSyncProtocol, sync_state: Arc>, } impl Drop for SyncQueue { fn drop(&mut self) { trace!("Drop SyncQueue {}", self.object.object_id); } } impl SyncQueue where E: Into + Send + Sync + 'static, Sink: SinkExt + Send + Sync + Unpin + 'static, Stream: StreamExt> + Send + Sync + Unpin + 'static, { pub fn new( object: SyncObject, origin: CollabOrigin, sink: Sink, stream: Stream, collab: Weak, config: SinkConfig, pause: bool, ) -> Self { let protocol = ClientSyncProtocol; let (notifier, notifier_rx) = watch::channel(false); let sync_state = Arc::new(watch::channel(SyncState::InitSyncBegin).0); let (sync_state_tx, sink_state_rx) = watch::channel(SinkState::Init); debug_assert!(origin.client_user_id().is_some()); let sink = Arc::new(CollabSink::new( origin.client_user_id().unwrap_or(0), object.clone(), sink, notifier, sync_state_tx, config, pause, )); spawn(CollabSinkRunner::run(Arc::downgrade(&sink), notifier_rx)); let cloned_protocol = protocol.clone(); let object_id = object.object_id.clone(); let stream = SyncStream::new( origin.clone(), object_id, stream, protocol, collab, Arc::downgrade(&sink), ); let weak_sync_state = Arc::downgrade(&sync_state); let mut sink_state_stream = WatchStream::new(sink_state_rx); // Subscribe the sink state stream and update the sync state in the background. spawn(async move { while let Some(collab_state) = sink_state_stream.next().await { if let Some(sync_state) = weak_sync_state.upgrade() { match collab_state { SinkState::Syncing => { let _ = sync_state.send(SyncState::Syncing); }, SinkState::Finished => { let _ = sync_state.send(SyncState::SyncFinished); }, SinkState::Init => { let _ = sync_state.send(SyncState::InitSyncBegin); }, SinkState::Pause => {}, } } } }); Self { object, origin, sink, stream, protocol: cloned_protocol, sync_state, } } pub fn pause(&self) { self.sink.pause(); } pub fn resume(&self) { self.sink.resume(); } pub fn subscribe_sync_state(&self) -> watch::Receiver { self.sync_state.subscribe() } pub fn init_sync(&self, awareness: &Awareness) { if let Some(payload) = doc_init_state(awareness, &self.protocol) { self.sink.queue_init_sync(|msg_id| { ClientCollabInit::new( self.origin.clone(), self.object.object_id.clone(), self.object.collab_type.clone(), self.object.workspace_id.clone(), msg_id, payload, ) .into() }); } else { self.sink.notify(); } } /// Remove all the messages in the sink queue pub fn clear(&self) { self.sink.clear(); } } fn doc_init_state(awareness: &Awareness, protocol: &P) -> Option> { let payload = { let mut encoder = EncoderV1::new(); protocol.start(awareness, &mut encoder).ok()?; encoder.to_vec() }; if payload.is_empty() { None } else { Some(payload) } } impl Deref for SyncQueue { type Target = Arc>; fn deref(&self) -> &Self::Target { &self.sink } } /// Use to continuously receive updates from remote. struct SyncStream { object_id: String, #[allow(dead_code)] weak_collab: Weak, phantom_sink: PhantomData, phantom_stream: PhantomData, } impl Drop for SyncStream { fn drop(&mut self) { trace!("Drop SyncStream {}", self.object_id); } } impl SyncStream where E: Into + Send + Sync + 'static, Sink: SinkExt + Send + Sync + Unpin + 'static, Stream: StreamExt> + Send + Sync + Unpin + 'static, { pub fn new

( origin: CollabOrigin, object_id: String, stream: Stream, protocol: P, weak_collab: Weak, sink: Weak>, ) -> Self where P: CollabSyncProtocol + Send + Sync + 'static, { let cloned_weak_collab = weak_collab.clone(); spawn(SyncStream::::spawn_doc_stream::

( origin, object_id.clone(), stream, cloned_weak_collab, sink, protocol, )); Self { object_id, weak_collab, phantom_sink: Default::default(), phantom_stream: Default::default(), } } // Spawn the stream that continuously reads the doc's updates from remote. async fn spawn_doc_stream

( origin: CollabOrigin, object_id: String, mut stream: Stream, weak_collab: Weak, weak_sink: Weak>, protocol: P, ) where P: CollabSyncProtocol + Send + Sync + 'static, { while let Some(collab_message) = stream.next().await { match collab_message { Ok(msg) => match (weak_collab.upgrade(), weak_sink.upgrade()) { (Some(collab), Some(sink)) => { let span = tracing::span!(Level::TRACE, "doc_stream", object_id = %msg.object_id()); let _enter = span.enter(); if let Err(error) = SyncStream::::process_message::

( &origin, &object_id, &protocol, &collab, &sink, msg, ) .await { error!("Error while processing message: {}", error); } }, _ => { // The collab or sink is dropped, stop the stream. warn!("Stop receive doc incoming changes."); break; }, }, Err(e) => { warn!("Stream error: {},stop receive incoming changes", e.into()); break; }, } } } /// Continuously handle messages from the remote doc async fn process_message

( origin: &CollabOrigin, object_id: &str, protocol: &P, collab: &Arc, sink: &Arc>, msg: CollabMessage, ) -> Result<(), SyncError> where P: CollabSyncProtocol + Send + Sync + 'static, { let should_process = match msg.msg_id() { // The msg_id is None if the message is [ServerBroadcast] or [ServerAwareness] None => true, Some(msg_id) => sink.ack_msg(msg.origin(), msg.object_id(), msg_id).await, }; if should_process { if let Some(payload) = msg.payload() { if !payload.is_empty() { trace!("start process message:{:?}", msg.msg_id()); SyncStream::::process_payload( origin, payload, object_id, protocol, collab, sink, ) .await?; trace!("end process message: {:?}", msg.msg_id()); } } } Ok(()) } async fn process_payload

( origin: &CollabOrigin, payload: &Bytes, object_id: &str, protocol: &P, collab: &Arc, sink: &Arc>, ) -> Result<(), SyncError> where P: CollabSyncProtocol + Send + Sync + 'static, { let mut decoder = DecoderV1::new(Cursor::new(payload)); let reader = MessageReader::new(&mut decoder); for msg in reader { let msg = msg?; trace!(" {}", msg); let is_sync_step_1 = matches!(msg, Message::Sync(SyncMessage::SyncStep1(_))); if let Some(payload) = handle_msg(&Some(origin), protocol, collab, msg)? { if is_sync_step_1 { // flush match collab.try_lock() { None => warn!("Failed to acquire lock for flushing collab"), Some(collab_guard) => collab_guard.flush(), } } let object_id = object_id.to_string(); sink.queue_msg(|msg_id| { if is_sync_step_1 { ServerCollabInit::new(origin.clone(), object_id, payload, msg_id).into() } else { UpdateSync::new(origin.clone(), object_id, payload, msg_id).into() } }); } } Ok(()) } }