diff --git a/libs/realtime/src/server/rt_server.rs b/libs/realtime/src/server/rt_server.rs index 8d2470ca..39afba9b 100644 --- a/libs/realtime/src/server/rt_server.rs +++ b/libs/realtime/src/server/rt_server.rs @@ -20,6 +20,7 @@ use crate::server::collaborate::all_group::AllGroup; use crate::server::collaborate::group_cmd::{GroupCommand, GroupCommandRunner, GroupCommandSender}; use std::sync::Arc; use std::time::Duration; + use tokio::time::interval; use tokio_stream::wrappers::{BroadcastStream, ReceiverStream}; use tokio_stream::StreamExt; @@ -296,7 +297,7 @@ where remove_user(&groups, &editing_collab_by_user, &msg.user).await; if client_stream_by_user.remove(&msg.user).is_some() { - trace!("remove client stream: {}", &msg.user); + info!("remove client stream: {}", &msg.user); } Ok(()) }) @@ -423,11 +424,17 @@ pub async fn broadcast_client_collab_message( { if let Some(client_stream) = client_streams.get(user) { trace!("[realtime]: receive:{}", collab_message); + let object_id = collab_message.object_id().to_string(); let err = client_stream .stream_tx .send(Ok(RealtimeMessage::ClientCollabV1(vec![collab_message]))); if let Err(err) = err { - warn!("Send message to client error: {}", err); + warn!( + "Send user:{} message to group:{} error: {}", + user.uid(), + err, + object_id, + ); } } } @@ -473,7 +480,7 @@ pub struct CollabClientStream { /// /// The message flow: /// ClientSession(websocket) -> [RealtimeServer] -> [CollabClientStream] -> [CollabBroadcast] 1->* websocket(client) - pub(crate) stream_tx: tokio::sync::broadcast::Sender>, + stream_tx: tokio::sync::broadcast::Sender>, } impl CollabClientStream { @@ -527,8 +534,7 @@ impl CollabClientStream { }); let client_sink = UnboundedSenderSink::::new(client_sink_tx); - // forward the message to the stream that was subscribed by the broadcast group, which will - // send the messages to all connected clients using the client_forward_sink + // forward the message to the stream that was subscribed by the broadcast group let cloned_object_id = object_id.to_string(); let (tx, rx) = tokio::sync::mpsc::channel(100); tokio::spawn(async move {