diff --git a/libs/realtime/src/collaborate/group.rs b/libs/realtime/src/collaborate/group.rs index 48c5d03b..6510ba66 100644 --- a/libs/realtime/src/collaborate/group.rs +++ b/libs/realtime/src/collaborate/group.rs @@ -75,7 +75,9 @@ where if let Some(group) = group_by_object_id.get(object_id) { if let Some(mut subscriber) = group.subscribers.try_write()?.remove(user) { trace!("Remove subscriber: {}", subscriber.origin); - subscriber.stop().await; + tokio::spawn(async move { + subscriber.stop().await; + }); } } Ok(()) @@ -104,10 +106,11 @@ where return; }, }; + let group = group_by_object_id.remove(object_id); + drop(group_by_object_id); - if let Some(group) = group_by_object_id.remove(object_id) { + if let Some(group) = group { group.flush_collab().await; - // As we've already removed the group, we directly operate on the removed group's subscribers. if let Ok(mut subscribers) = group.subscribers.try_write() { for (_, subscriber) in subscribers.iter_mut() { @@ -118,7 +121,6 @@ where // Log error if the group doesn't exist error!("Group for object_id:{} not found", object_id); } - drop(group_by_object_id); self.storage.remove_collab_cache(object_id).await; } @@ -186,9 +188,9 @@ where group } - #[allow(dead_code)] - pub async fn number_of_groups(&self) -> usize { - self.group_by_object_id.read().await.keys().len() + pub async fn number_of_groups(&self) -> Option { + let read_guard = self.group_by_object_id.try_read().ok()?; + Some(read_guard.keys().len()) } } diff --git a/libs/realtime/src/collaborate/server.rs b/libs/realtime/src/collaborate/server.rs index c7f1127d..77dd3fda 100644 --- a/libs/realtime/src/collaborate/server.rs +++ b/libs/realtime/src/collaborate/server.rs @@ -85,15 +85,13 @@ where interval.tick().await; if let Some(groups) = weak_groups.upgrade() { // Perform operations that require awaiting outside of the synchronous code block - let groups_operation = groups.number_of_groups().await; - cloned_metrics.record_opening_collab_count(groups_operation); + if let Some(groups_operation) = groups.number_of_groups().await { + cloned_metrics.record_opening_collab_count(groups_operation); + } - // Minimize the scope of the async lock for connected users - let connected_user_count = { - let read_guard = cloned_client_stream_by_user.read().await; - read_guard.keys().len() - }; - cloned_metrics.record_connected_users(connected_user_count); + if let Ok(read_guard) = cloned_client_stream_by_user.try_read() { + cloned_metrics.record_connected_users(read_guard.keys().len()); + } // Assuming mem_usage() is synchronous and quick to execute let mem_usage = cloned_storage.mem_usage();