fix: group might not be ready to receive message (#445)

This commit is contained in:
Nathan.fooo 2024-04-04 15:13:55 +08:00 committed by GitHub
parent aa4df32f6d
commit 40f88a6231
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 22 additions and 9 deletions

View File

@ -28,7 +28,11 @@ pub enum GroupCommand {
pub type GroupCommandSender = tokio::sync::mpsc::Sender<GroupCommand>;
pub type GroupCommandReceiver = tokio::sync::mpsc::Receiver<GroupCommand>;
pub struct GroupCommandRunner<S, AC> {
pub struct GroupCommandRunner<S, AC>
where
AC: RealtimeAccessControl,
S: CollabStorage,
{
pub group_manager: Arc<GroupManager<S, AC>>,
pub client_msg_router_by_user: Arc<DashMap<RealtimeUser, ClientMessageRouter>>,
pub access_control: Arc<AC>,
@ -40,7 +44,7 @@ where
S: CollabStorage,
AC: RealtimeAccessControl,
{
pub async fn run(mut self, object_id: String) {
pub async fn run(mut self, object_id: String, notify: Arc<tokio::sync::Notify>) {
let mut receiver = self.recv.take().expect("Only take once");
let stream = stream! {
while let Some(msg) = receiver.recv().await {
@ -49,6 +53,7 @@ where
trace!("Collab group:{} command runner is stopped", object_id);
};
notify.notify_one();
stream
.for_each(|command| async {
match command {

View File

@ -16,6 +16,7 @@ use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Weak};
use std::time::Duration;
use tokio::sync::Notify;
use tokio::time::interval;
use tracing::{error, trace};
@ -154,6 +155,7 @@ where
Entry::Occupied(entry) => entry.get().clone(),
Entry::Vacant(entry) => {
let (new_sender, recv) = tokio::sync::mpsc::channel(2000);
let notify = Arc::new(Notify::new());
let runner = GroupCommandRunner {
group_manager: group_manager.clone(),
client_msg_router_by_user: client_msg_router_by_user.clone(),
@ -162,8 +164,12 @@ where
};
let object_id = entry.key().clone();
tokio::task::spawn_local(runner.run(object_id));
let clone_notify = notify.clone();
tokio::task::spawn_local(runner.run(object_id, clone_notify));
entry.insert(new_sender.clone());
// wait for the runner to be ready to handle the message.
notify.notified().await;
new_sender
},
},
@ -177,7 +183,9 @@ where
})
.await
{
error!("Send message to group error: {}", err);
// it should not happen. Because the receiver is always running before acquiring the sender.
// Otherwise, the GroupCommandRunner might not be ready to handle the message.
error!("Send message to group fail: {}", err);
}
}

View File

@ -24,7 +24,7 @@ use crate::middleware::metrics_mw::MetricsMiddleware;
use crate::middleware::request_id::RequestIdMiddleware;
use crate::self_signed::create_self_signed_certificate;
use crate::state::{AppMetrics, AppState, GoTrueAdmin, UserCache};
use actix::Actor;
use actix::Supervisor;
use actix_identity::IdentityMiddleware;
use actix_session::storage::RedisSessionStore;
use actix_session::SessionMiddleware;
@ -114,8 +114,7 @@ pub async fn run(
)
.unwrap();
let realtime_server_actor = RealtimeServerActor(realtime_server).start();
let realtime_server_actor = Supervisor::start(|_| RealtimeServerActor(realtime_server));
let mut server = HttpServer::new(move || {
App::new()
// Middleware is registered for each App, scope, or Resource and executed in opposite order as registration

View File

@ -6,7 +6,7 @@ use collab_rt::{CollabRealtimeServer, RealtimeAccessControl};
use collab_rt_entity::user::UserDevice;
use database::collab::CollabStorage;
use std::ops::Deref;
use tracing::{error, warn};
use tracing::{error, info, warn};
#[derive(Clone)]
pub struct RealtimeServerActor<S, AC>(pub CollabRealtimeServer<S, AC>);
@ -27,6 +27,7 @@ where
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
info!("realtime server started");
ctx.set_mailbox_capacity(3000);
}
}
@ -36,7 +37,7 @@ where
AC: RealtimeAccessControl + Unpin,
{
fn restarting(&mut self, _ctx: &mut Context<RealtimeServerActor<S, AC>>) {
warn!("restarting");
error!("realtime server is restarting");
}
}