chore: release lock asap (#312)
This commit is contained in:
parent
8769ae62f7
commit
e999999694
|
|
@ -75,7 +75,9 @@ where
|
|||
if let Some(group) = group_by_object_id.get(object_id) {
|
||||
if let Some(mut subscriber) = group.subscribers.try_write()?.remove(user) {
|
||||
trace!("Remove subscriber: {}", subscriber.origin);
|
||||
subscriber.stop().await;
|
||||
tokio::spawn(async move {
|
||||
subscriber.stop().await;
|
||||
});
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
|
@ -104,10 +106,11 @@ where
|
|||
return;
|
||||
},
|
||||
};
|
||||
let group = group_by_object_id.remove(object_id);
|
||||
drop(group_by_object_id);
|
||||
|
||||
if let Some(group) = group_by_object_id.remove(object_id) {
|
||||
if let Some(group) = group {
|
||||
group.flush_collab().await;
|
||||
|
||||
// As we've already removed the group, we directly operate on the removed group's subscribers.
|
||||
if let Ok(mut subscribers) = group.subscribers.try_write() {
|
||||
for (_, subscriber) in subscribers.iter_mut() {
|
||||
|
|
@ -118,7 +121,6 @@ where
|
|||
// Log error if the group doesn't exist
|
||||
error!("Group for object_id:{} not found", object_id);
|
||||
}
|
||||
drop(group_by_object_id);
|
||||
|
||||
self.storage.remove_collab_cache(object_id).await;
|
||||
}
|
||||
|
|
@ -186,9 +188,9 @@ where
|
|||
group
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub async fn number_of_groups(&self) -> usize {
|
||||
self.group_by_object_id.read().await.keys().len()
|
||||
pub async fn number_of_groups(&self) -> Option<usize> {
|
||||
let read_guard = self.group_by_object_id.try_read().ok()?;
|
||||
Some(read_guard.keys().len())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -85,15 +85,13 @@ where
|
|||
interval.tick().await;
|
||||
if let Some(groups) = weak_groups.upgrade() {
|
||||
// Perform operations that require awaiting outside of the synchronous code block
|
||||
let groups_operation = groups.number_of_groups().await;
|
||||
cloned_metrics.record_opening_collab_count(groups_operation);
|
||||
if let Some(groups_operation) = groups.number_of_groups().await {
|
||||
cloned_metrics.record_opening_collab_count(groups_operation);
|
||||
}
|
||||
|
||||
// Minimize the scope of the async lock for connected users
|
||||
let connected_user_count = {
|
||||
let read_guard = cloned_client_stream_by_user.read().await;
|
||||
read_guard.keys().len()
|
||||
};
|
||||
cloned_metrics.record_connected_users(connected_user_count);
|
||||
if let Ok(read_guard) = cloned_client_stream_by_user.try_read() {
|
||||
cloned_metrics.record_connected_users(read_guard.keys().len());
|
||||
}
|
||||
|
||||
// Assuming mem_usage() is synchronous and quick to execute
|
||||
let mem_usage = cloned_storage.mem_usage();
|
||||
|
|
|
|||
Loading…
Reference in New Issue