diff --git a/libs/client-api/src/ws/client.rs b/libs/client-api/src/ws/client.rs index 5967c44a..d0b8367e 100644 --- a/libs/client-api/src/ws/client.rs +++ b/libs/client-api/src/ws/client.rs @@ -50,7 +50,8 @@ pub trait WSClientHttpSender: Send + Sync { async fn send_ws_msg(&self, device_id: &str, message: Message) -> Result<(), WSError>; } -type ChannelByObjectId = HashMap>>; +type WeakChannel = Weak>; +type ChannelByObjectId = HashMap>; pub type WSConnectStateReceiver = Receiver; pub struct WSClient { @@ -167,30 +168,21 @@ impl WSClient { RealtimeMessage::Collab(collab_msg) => { if let Some(collab_channels) = weak_collab_channels.upgrade() { let object_id = collab_msg.object_id().to_owned(); - let is_channel_dropped = - if let Some(channel) = collab_channels.read().get(&object_id) { + + // Iterate all channels and send the message to them. + if let Some(channels) = collab_channels.read().get(&object_id) { + for channel in channels.iter() { match channel.upgrade() { None => { // when calling [WSClient::subscribe], the caller is responsible for keeping // the channel alive as long as it wants to receive messages from the websocket. warn!("channel is dropped"); - true }, Some(channel) => { trace!("receive remote message: {}", collab_msg); - channel.forward_to_stream(collab_msg); - false + channel.forward_to_stream(collab_msg.clone()); }, } - } else { - false - }; - - // Try to remove the channel if it is dropped. If failed, will try again next time. - if is_channel_dropped { - if let Some(mut w) = collab_channels.try_write() { - trace!("remove channel: {}", object_id); - let _ = w.remove(&object_id); } } } else { @@ -267,10 +259,18 @@ impl WSClient { object_id: String, ) -> Result>, WSError> { let channel = Arc::new(WebSocketChannel::new(&object_id, self.sender.clone())); - self - .collab_channels - .write() - .insert(object_id, Arc::downgrade(&channel)); + let mut collab_channels_guard = self.collab_channels.write(); + + // remove the dropped channels + if let Some(channels) = collab_channels_guard.get_mut(&object_id) { + channels.retain(|channel| channel.upgrade().is_some()); + } + + collab_channels_guard + .entry(object_id) + .or_default() + .push(Arc::downgrade(&channel)); + Ok(channel) } diff --git a/libs/realtime-entity/src/collab_msg.rs b/libs/realtime-entity/src/collab_msg.rs index 4a4190ee..305fd9f3 100644 --- a/libs/realtime-entity/src/collab_msg.rs +++ b/libs/realtime-entity/src/collab_msg.rs @@ -299,8 +299,8 @@ impl From for CollabMessage { impl Display for ServerInit { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_fmt(format_args!( - "server init: [uid:{:?}|oid:{}|msg_id:{:?}|len:{}]", - self.origin.client_user_id(), + "server init: [origin:{}|oid:{}|msg_id:{:?}|len:{}]", + self.origin, self.object_id, self.msg_id, self.payload.len(), @@ -372,8 +372,8 @@ impl From for CollabMessage { impl Display for UpdateSync { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_fmt(format_args!( - "client update sync: [uid:{:?}|oid:{}|msg_id:{:?}|len:{}]", - self.origin.client_user_id(), + "client update sync: [origin:{}|oid:{}|msg_id:{:?}|len:{}]", + self.origin, self.object_id, self.msg_id, self.payload.len(), @@ -424,8 +424,8 @@ impl From for CollabMessage { impl Display for CollabAck { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_fmt(format_args!( - "ack: [uid:{:?}|oid:{}|msg_id:{:?}|{}|len:{}]", - self.origin.client_user_id(), + "ack: [origin:{}|oid:{}|msg_id:{:?}|{}|len:{}]", + self.origin, self.object_id, self.source.msg_id, self.source.sync_verbose, diff --git a/libs/realtime/src/collaborate/retry.rs b/libs/realtime/src/collaborate/retry.rs index 76469aed..ef5c7649 100644 --- a/libs/realtime/src/collaborate/retry.rs +++ b/libs/realtime/src/collaborate/retry.rs @@ -248,7 +248,7 @@ where .read() .await .values() - .map(|value| value.origin.client_user_id()) + .map(|value| value.origin.to_string()) .collect::>(), ); } diff --git a/libs/realtime/src/collaborate/server.rs b/libs/realtime/src/collaborate/server.rs index 86fff3d1..fe8e177a 100644 --- a/libs/realtime/src/collaborate/server.rs +++ b/libs/realtime/src/collaborate/server.rs @@ -16,7 +16,7 @@ use tokio::time::interval; use tokio_stream::wrappers::{BroadcastStream, ReceiverStream}; use tokio_stream::StreamExt; -use tracing::{error, info, trace}; +use tracing::{error, event, info, instrument, trace}; use crate::client::ClientWSSink; use crate::collaborate::group::CollabGroupCache; @@ -228,6 +228,7 @@ async fn broadcast_message( } /// Remove the user from the group and remove the group from the cache if the group is empty. +#[instrument(level = "debug", skip_all)] async fn remove_user_from_group( user: &U, groups: &Arc>, @@ -237,15 +238,33 @@ async fn remove_user_from_group( U: RealtimeUser, AC: CollabAccessControl, { + groups.remove_user(&editing.object_id, user).await; if let Some(group) = groups.get_group(&editing.object_id).await { - info!("Remove subscriber: {}", editing.origin); - group.subscribers.write().await.remove(user); + event!( + tracing::Level::INFO, + "Remove group subscriber: {}", + editing.origin + ); + + event!( + tracing::Level::DEBUG, + "{}: Group member: {}. member ids: {:?}", + &editing.object_id, + group.subscribers.read().await.len(), + group + .subscribers + .read() + .await + .values() + .map(|value| value.origin.to_string()) + .collect::>(), + ); // Destroy the group if the group is empty let should_remove = group.is_empty().await; if should_remove { group.flush_collab(); - info!("Remove group: {}", editing.object_id); + event!(tracing::Level::INFO, "Remove group: {}", editing.object_id); groups.remove_group(&editing.object_id).await; } } diff --git a/src/biz/collab/access_control.rs b/src/biz/collab/access_control.rs index b28a494b..c4342f82 100644 --- a/src/biz/collab/access_control.rs +++ b/src/biz/collab/access_control.rs @@ -79,7 +79,7 @@ impl CollabAccessControlImpl { } } - #[inline] + #[instrument(level = "trace", skip(self), err)] async fn get_user_collab_access_level( &self, uid: &i64, @@ -314,7 +314,7 @@ where CollabAC: CollabAccessControl, WorkspaceAC: WorkspaceAccessControl, { - #[instrument(level = "trace", skip_all, err)] + #[instrument(level = "trace", skip(self), err)] async fn get_collab_access_level(&self, uid: &i64, oid: &str) -> Result { self .collab_access_control