fix: client api - channel management (#191)

* chore: add logs

* chore: remove dropped channels
This commit is contained in:
Nathan.fooo 2023-11-30 18:52:54 -08:00 committed by GitHub
parent 474a78832a
commit 208a353fef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 51 additions and 32 deletions

View File

@ -50,7 +50,8 @@ pub trait WSClientHttpSender: Send + Sync {
async fn send_ws_msg(&self, device_id: &str, message: Message) -> Result<(), WSError>;
}
type ChannelByObjectId = HashMap<String, Weak<WebSocketChannel<CollabMessage>>>;
type WeakChannel = Weak<WebSocketChannel<CollabMessage>>;
type ChannelByObjectId = HashMap<String, Vec<WeakChannel>>;
pub type WSConnectStateReceiver = Receiver<ConnectState>;
pub struct WSClient {
@ -167,30 +168,21 @@ impl WSClient {
RealtimeMessage::Collab(collab_msg) => {
if let Some(collab_channels) = weak_collab_channels.upgrade() {
let object_id = collab_msg.object_id().to_owned();
let is_channel_dropped =
if let Some(channel) = collab_channels.read().get(&object_id) {
// Iterate all channels and send the message to them.
if let Some(channels) = collab_channels.read().get(&object_id) {
for channel in channels.iter() {
match channel.upgrade() {
None => {
// when calling [WSClient::subscribe], the caller is responsible for keeping
// the channel alive as long as it wants to receive messages from the websocket.
warn!("channel is dropped");
true
},
Some(channel) => {
trace!("receive remote message: {}", collab_msg);
channel.forward_to_stream(collab_msg);
false
channel.forward_to_stream(collab_msg.clone());
},
}
} else {
false
};
// Try to remove the channel if it is dropped. If failed, will try again next time.
if is_channel_dropped {
if let Some(mut w) = collab_channels.try_write() {
trace!("remove channel: {}", object_id);
let _ = w.remove(&object_id);
}
}
} else {
@ -267,10 +259,18 @@ impl WSClient {
object_id: String,
) -> Result<Arc<WebSocketChannel<CollabMessage>>, WSError> {
let channel = Arc::new(WebSocketChannel::new(&object_id, self.sender.clone()));
self
.collab_channels
.write()
.insert(object_id, Arc::downgrade(&channel));
let mut collab_channels_guard = self.collab_channels.write();
// remove the dropped channels
if let Some(channels) = collab_channels_guard.get_mut(&object_id) {
channels.retain(|channel| channel.upgrade().is_some());
}
collab_channels_guard
.entry(object_id)
.or_default()
.push(Arc::downgrade(&channel));
Ok(channel)
}

View File

@ -299,8 +299,8 @@ impl From<ServerInit> for CollabMessage {
impl Display for ServerInit {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!(
"server init: [uid:{:?}|oid:{}|msg_id:{:?}|len:{}]",
self.origin.client_user_id(),
"server init: [origin:{}|oid:{}|msg_id:{:?}|len:{}]",
self.origin,
self.object_id,
self.msg_id,
self.payload.len(),
@ -372,8 +372,8 @@ impl From<UpdateSync> for CollabMessage {
impl Display for UpdateSync {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!(
"client update sync: [uid:{:?}|oid:{}|msg_id:{:?}|len:{}]",
self.origin.client_user_id(),
"client update sync: [origin:{}|oid:{}|msg_id:{:?}|len:{}]",
self.origin,
self.object_id,
self.msg_id,
self.payload.len(),
@ -424,8 +424,8 @@ impl From<CollabAck> for CollabMessage {
impl Display for CollabAck {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!(
"ack: [uid:{:?}|oid:{}|msg_id:{:?}|{}|len:{}]",
self.origin.client_user_id(),
"ack: [origin:{}|oid:{}|msg_id:{:?}|{}|len:{}]",
self.origin,
self.object_id,
self.source.msg_id,
self.source.sync_verbose,

View File

@ -248,7 +248,7 @@ where
.read()
.await
.values()
.map(|value| value.origin.client_user_id())
.map(|value| value.origin.to_string())
.collect::<Vec<_>>(),
);
}

View File

@ -16,7 +16,7 @@ use tokio::time::interval;
use tokio_stream::wrappers::{BroadcastStream, ReceiverStream};
use tokio_stream::StreamExt;
use tracing::{error, info, trace};
use tracing::{error, event, info, instrument, trace};
use crate::client::ClientWSSink;
use crate::collaborate::group::CollabGroupCache;
@ -228,6 +228,7 @@ async fn broadcast_message<U>(
}
/// Remove the user from the group and remove the group from the cache if the group is empty.
#[instrument(level = "debug", skip_all)]
async fn remove_user_from_group<S, U, AC>(
user: &U,
groups: &Arc<CollabGroupCache<S, U, AC>>,
@ -237,15 +238,33 @@ async fn remove_user_from_group<S, U, AC>(
U: RealtimeUser,
AC: CollabAccessControl,
{
groups.remove_user(&editing.object_id, user).await;
if let Some(group) = groups.get_group(&editing.object_id).await {
info!("Remove subscriber: {}", editing.origin);
group.subscribers.write().await.remove(user);
event!(
tracing::Level::INFO,
"Remove group subscriber: {}",
editing.origin
);
event!(
tracing::Level::DEBUG,
"{}: Group member: {}. member ids: {:?}",
&editing.object_id,
group.subscribers.read().await.len(),
group
.subscribers
.read()
.await
.values()
.map(|value| value.origin.to_string())
.collect::<Vec<_>>(),
);
// Destroy the group if the group is empty
let should_remove = group.is_empty().await;
if should_remove {
group.flush_collab();
info!("Remove group: {}", editing.object_id);
event!(tracing::Level::INFO, "Remove group: {}", editing.object_id);
groups.remove_group(&editing.object_id).await;
}
}

View File

@ -79,7 +79,7 @@ impl CollabAccessControlImpl {
}
}
#[inline]
#[instrument(level = "trace", skip(self), err)]
async fn get_user_collab_access_level(
&self,
uid: &i64,
@ -314,7 +314,7 @@ where
CollabAC: CollabAccessControl,
WorkspaceAC: WorkspaceAccessControl,
{
#[instrument(level = "trace", skip_all, err)]
#[instrument(level = "trace", skip(self), err)]
async fn get_collab_access_level(&self, uid: &i64, oid: &str) -> Result<AFAccessLevel, AppError> {
self
.collab_access_control