diff --git a/libs/collab-rt/src/collaborate/group_cmd.rs b/libs/collab-rt/src/collaborate/group_cmd.rs index 8be494f5..34867d81 100644 --- a/libs/collab-rt/src/collaborate/group_cmd.rs +++ b/libs/collab-rt/src/collaborate/group_cmd.rs @@ -28,7 +28,11 @@ pub enum GroupCommand { pub type GroupCommandSender = tokio::sync::mpsc::Sender; pub type GroupCommandReceiver = tokio::sync::mpsc::Receiver; -pub struct GroupCommandRunner { +pub struct GroupCommandRunner +where + AC: RealtimeAccessControl, + S: CollabStorage, +{ pub group_manager: Arc>, pub client_msg_router_by_user: Arc>, pub access_control: Arc, @@ -40,7 +44,7 @@ where S: CollabStorage, AC: RealtimeAccessControl, { - pub async fn run(mut self, object_id: String) { + pub async fn run(mut self, object_id: String, notify: Arc) { let mut receiver = self.recv.take().expect("Only take once"); let stream = stream! { while let Some(msg) = receiver.recv().await { @@ -49,6 +53,7 @@ where trace!("Collab group:{} command runner is stopped", object_id); }; + notify.notify_one(); stream .for_each(|command| async { match command { diff --git a/libs/collab-rt/src/rt_server.rs b/libs/collab-rt/src/rt_server.rs index 873e33ce..26dfc325 100644 --- a/libs/collab-rt/src/rt_server.rs +++ b/libs/collab-rt/src/rt_server.rs @@ -16,6 +16,7 @@ use std::future::Future; use std::pin::Pin; use std::sync::{Arc, Weak}; use std::time::Duration; +use tokio::sync::Notify; use tokio::time::interval; use tracing::{error, trace}; @@ -154,6 +155,7 @@ where Entry::Occupied(entry) => entry.get().clone(), Entry::Vacant(entry) => { let (new_sender, recv) = tokio::sync::mpsc::channel(2000); + let notify = Arc::new(Notify::new()); let runner = GroupCommandRunner { group_manager: group_manager.clone(), client_msg_router_by_user: client_msg_router_by_user.clone(), @@ -162,8 +164,12 @@ where }; let object_id = entry.key().clone(); - tokio::task::spawn_local(runner.run(object_id)); + let clone_notify = notify.clone(); + tokio::task::spawn_local(runner.run(object_id, clone_notify)); entry.insert(new_sender.clone()); + + // wait for the runner to be ready to handle the message. + notify.notified().await; new_sender }, }, @@ -177,7 +183,9 @@ where }) .await { - error!("Send message to group error: {}", err); + // it should not happen. Because the receiver is always running before acquiring the sender. + // Otherwise, the GroupCommandRunner might not be ready to handle the message. + error!("Send message to group fail: {}", err); } } diff --git a/src/application.rs b/src/application.rs index 297cca90..31b9508a 100644 --- a/src/application.rs +++ b/src/application.rs @@ -24,7 +24,7 @@ use crate::middleware::metrics_mw::MetricsMiddleware; use crate::middleware::request_id::RequestIdMiddleware; use crate::self_signed::create_self_signed_certificate; use crate::state::{AppMetrics, AppState, GoTrueAdmin, UserCache}; -use actix::Actor; +use actix::Supervisor; use actix_identity::IdentityMiddleware; use actix_session::storage::RedisSessionStore; use actix_session::SessionMiddleware; @@ -114,8 +114,7 @@ pub async fn run( ) .unwrap(); - let realtime_server_actor = RealtimeServerActor(realtime_server).start(); - + let realtime_server_actor = Supervisor::start(|_| RealtimeServerActor(realtime_server)); let mut server = HttpServer::new(move || { App::new() // Middleware is registered for each App, scope, or Resource and executed in opposite order as registration diff --git a/src/biz/actix_ws/server/rt_actor.rs b/src/biz/actix_ws/server/rt_actor.rs index 25ff6a05..6cd14784 100644 --- a/src/biz/actix_ws/server/rt_actor.rs +++ b/src/biz/actix_ws/server/rt_actor.rs @@ -6,7 +6,7 @@ use collab_rt::{CollabRealtimeServer, RealtimeAccessControl}; use collab_rt_entity::user::UserDevice; use database::collab::CollabStorage; use std::ops::Deref; -use tracing::{error, warn}; +use tracing::{error, info, warn}; #[derive(Clone)] pub struct RealtimeServerActor(pub CollabRealtimeServer); @@ -27,6 +27,7 @@ where type Context = Context; fn started(&mut self, ctx: &mut Self::Context) { + info!("realtime server started"); ctx.set_mailbox_capacity(3000); } } @@ -36,7 +37,7 @@ where AC: RealtimeAccessControl + Unpin, { fn restarting(&mut self, _ctx: &mut Context>) { - warn!("restarting"); + error!("realtime server is restarting"); } }