diff --git a/Cargo.lock b/Cargo.lock index 2a1417c3..81a76e39 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3579,6 +3579,7 @@ dependencies = [ "bytes", "collab", "collab-entity", + "database-entity", "prost", "prost-build", "protoc-bin-vendored", diff --git a/Cargo.toml b/Cargo.toml index a2e74b35..0561794e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -72,7 +72,7 @@ token = { path = "libs/token" } snowflake = { path = "libs/snowflake" } realtime = { path = "libs/realtime" } database = { path = "libs/database" } -database-entity = { path = "libs/database-entity" } +database-entity.workspace = true gotrue = { path = "libs/gotrue" } gotrue-entity = { path = "libs/gotrue-entity" } infra = { path = "libs/infra" } @@ -123,6 +123,7 @@ members = [ [workspace.dependencies] realtime-entity = { path = "libs/realtime-entity" } +database-entity = { path = "libs/database-entity" } app-error = { path = "libs/app_error" } serde_json = "1.0.108" serde = { version = "1.0.108", features = ["derive"] } diff --git a/libs/client-api/Cargo.toml b/libs/client-api/Cargo.toml index 434ffdd6..ae8d8dd2 100644 --- a/libs/client-api/Cargo.toml +++ b/libs/client-api/Cargo.toml @@ -13,7 +13,7 @@ serde_repr = "0.1.16" gotrue = { path = "../gotrue" } gotrue-entity = { path = "../gotrue-entity" } shared_entity = { path = "../shared-entity" } -database-entity = { path = "../database-entity" } +database-entity.workspace = true url = "2.4.1" tokio-stream = { version = "0.1.14" } parking_lot = "0.12.1" diff --git a/libs/client-api/src/lib.rs b/libs/client-api/src/lib.rs index eb18e61d..688646b6 100644 --- a/libs/client-api/src/lib.rs +++ b/libs/client-api/src/lib.rs @@ -18,5 +18,6 @@ pub mod error { pub mod entity { pub use database_entity::dto::*; pub use gotrue_entity::dto::*; + pub use realtime_entity::user::*; pub use shared_entity::dto::*; } diff --git a/libs/database-entity/src/dto.rs b/libs/database-entity/src/dto.rs index 7247f70c..cfe41660 100644 --- a/libs/database-entity/src/dto.rs +++ b/libs/database-entity/src/dto.rs @@ -395,7 +395,7 @@ pub struct AFUserWorkspaceInfo { pub workspaces: Vec, } -#[derive(Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct AFWorkspaceMember { pub name: String, pub email: String, diff --git a/libs/database-entity/src/pg_row.rs b/libs/database-entity/src/pg_row.rs index f1c85a6d..9d5ab77a 100644 --- a/libs/database-entity/src/pg_row.rs +++ b/libs/database-entity/src/pg_row.rs @@ -19,7 +19,7 @@ pub struct AFWorkspaceRow { /// Represent the row of the af_user table #[derive(Debug, FromRow, Deserialize, Serialize, Clone)] pub struct AFUserRow { - pub uid: Option, + pub uid: i64, pub uuid: Option, pub email: Option, pub password: Option, diff --git a/libs/database/Cargo.toml b/libs/database/Cargo.toml index 3649ee95..3117cf24 100644 --- a/libs/database/Cargo.toml +++ b/libs/database/Cargo.toml @@ -9,7 +9,7 @@ edition = "2021" collab = { version = "0.1.0"} collab-entity = { version = "0.1.0" } validator = { version = "0.16", features = ["validator_derive", "derive"] } -database-entity = { path = "../database-entity" } +database-entity.workspace = true app-error = { workspace = true, features = ["sqlx_error", "validation_error", "s3_error"] } tokio = { version = "1.26", features = ["sync"] } diff --git a/libs/realtime-entity/Cargo.toml b/libs/realtime-entity/Cargo.toml index 2bf0c9d1..7e874718 100644 --- a/libs/realtime-entity/Cargo.toml +++ b/libs/realtime-entity/Cargo.toml @@ -16,6 +16,7 @@ actix = { version = "0.13", optional = true } bincode = "1.3.3" tokio-tungstenite = { version = "0.20.1", optional = true } prost = "0.12.1" +database-entity.workspace = true [features] actix_message = ["actix"] diff --git a/libs/realtime-entity/src/user.rs b/libs/realtime-entity/src/user.rs index a6f2ee76..54d10169 100644 --- a/libs/realtime-entity/src/user.rs +++ b/libs/realtime-entity/src/user.rs @@ -1,13 +1,23 @@ +use database_entity::dto::AFWorkspaceMember; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Serialize, Deserialize)] pub enum UserMessage { ProfileChange(AFUserChange), + WorkspaceMemberChange(AFWorkspaceMemberChange), } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AFUserChange { + pub uid: i64, pub name: Option, pub email: Option, pub metadata: Option, } + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AFWorkspaceMemberChange { + added: Vec, + updated: Vec, + removed: Vec, +} diff --git a/libs/realtime/Cargo.toml b/libs/realtime/Cargo.toml index f884cfa6..f5d5fa00 100644 --- a/libs/realtime/Cargo.toml +++ b/libs/realtime/Cargo.toml @@ -28,7 +28,7 @@ app-error = { workspace = true } collab = { version = "0.1.0"} collab-entity = { version = "0.1.0" } database = { path = "../database" } -database-entity = { path = "../database-entity" } +database-entity.workspace = true yrs = "0.16.5" lib0 = "0.16.3" chrono = "0.4.30" diff --git a/libs/realtime/src/client.rs b/libs/realtime/src/client.rs index 9a21d927..fa32690c 100644 --- a/libs/realtime/src/client.rs +++ b/libs/realtime/src/client.rs @@ -12,7 +12,6 @@ use database::collab::CollabStorage; use std::ops::Deref; use std::time::{Duration, Instant}; -use tokio::sync::broadcast::Receiver; use database_entity::pg_row::AFUserNotification; use realtime_entity::user::{AFUserChange, UserMessage}; @@ -28,7 +27,7 @@ pub struct ClientSession< pub server: Addr>, heartbeat_interval: Duration, client_timeout: Duration, - user_change_recv: Option>, + user_change_recv: Option>, } impl ClientSession @@ -39,7 +38,7 @@ where { pub fn new( user: U, - user_change_recv: Receiver, + user_change_recv: tokio::sync::mpsc::Receiver, server: Addr>, heartbeat_interval: Duration, client_timeout: Duration, @@ -97,7 +96,7 @@ where let recipient = ctx.address().recipient(); if let Some(mut recv) = self.user_change_recv.take() { actix::spawn(async move { - while let Ok(notification) = recv.recv().await { + while let Some(notification) = recv.recv().await { if let Some(user) = notification.payload { trace!("Receive user change: {:?}", user); @@ -105,6 +104,7 @@ where // deserialize_any method. So it needs to serialize the metadata to json string. let metadata = serde_json::to_string(&user.metadata).ok(); let msg = UserMessage::ProfileChange(AFUserChange { + uid: user.uid, name: user.name, email: user.email, metadata, diff --git a/libs/realtime/src/collaborate/plugin.rs b/libs/realtime/src/collaborate/plugin.rs index 5a1840cd..7992d3e2 100644 --- a/libs/realtime/src/collaborate/plugin.rs +++ b/libs/realtime/src/collaborate/plugin.rs @@ -98,17 +98,17 @@ where }); } -fn init_collab_with_raw_data( +async fn init_collab_with_raw_data( encoded_collab: EncodedCollabV1, doc: &Doc, ) -> Result<(), RealtimeError> { if encoded_collab.doc_state.is_empty() { return Err(RealtimeError::UnexpectedData("raw data is empty")); } - let mut txn = doc.transact_mut(); if encoded_collab.doc_state.is_empty() { return Err(RealtimeError::UnexpectedData("Document state is empty")); } + let mut txn = doc.transact_mut(); let update = Update::decode_v1(&encoded_collab.doc_state)?; txn.try_apply_update(update)?; Ok(()) @@ -129,7 +129,7 @@ where }; match self.storage.get_collab_encoded_v1(&self.uid, params).await { - Ok(encoded_collab) => match init_collab_with_raw_data(encoded_collab, doc) { + Ok(encoded_collab) => match init_collab_with_raw_data(encoded_collab, doc).await { Ok(_) => {}, Err(e) => error!("🔴Init collab failed: {:?}", e), }, diff --git a/libs/shared-entity/Cargo.toml b/libs/shared-entity/Cargo.toml index 63c82578..1708d8d5 100644 --- a/libs/shared-entity/Cargo.toml +++ b/libs/shared-entity/Cargo.toml @@ -14,7 +14,7 @@ thiserror = "1.0.47" reqwest = "0.11.18" uuid = { version = "1.3.3", features = ["v4"] } gotrue-entity = { path = "../gotrue-entity" } -database-entity = { path = "../database-entity" } +database-entity.workspace = true collab-entity = { version = "0.1.0" } app-error = { workspace = true } diff --git a/src/api/ws.rs b/src/api/ws.rs index c5470b97..b3b007f5 100644 --- a/src/api/ws.rs +++ b/src/api/ws.rs @@ -42,7 +42,7 @@ pub async fn establish_ws_connection( match result { Ok(uid) => { - let user_change_recv = state.pg_listeners.subscribe_user_change(); + let user_change_recv = state.pg_listeners.subscribe_user_change(uid); let realtime_user = Arc::new(RealtimeUserImpl::new(uid, device_id)); let client = ClientSession::new( realtime_user, diff --git a/src/biz/pg_listener.rs b/src/biz/pg_listener.rs index 42066de0..7281961a 100644 --- a/src/biz/pg_listener.rs +++ b/src/biz/pg_listener.rs @@ -44,8 +44,19 @@ impl PgListeners { self.collab_member_listener.notify.subscribe() } - pub fn subscribe_user_change(&self) -> broadcast::Receiver { - self.user_listener.notify.subscribe() + pub fn subscribe_user_change(&self, uid: i64) -> tokio::sync::mpsc::Receiver { + let (tx, rx) = tokio::sync::mpsc::channel(1); + let mut user_notify = self.user_listener.notify.subscribe(); + tokio::spawn(async move { + while let Ok(notification) = user_notify.recv().await { + if let Some(row) = notification.payload.as_ref() { + if row.uid == uid { + let _ = tx.send(notification).await; + } + } + } + }); + rx } }