feat: notify user via websocket after profile changed (#164)

* feat: notify user via websocket after profile changed

* chore: update

* chore: update
This commit is contained in:
Nathan.fooo 2023-11-14 12:47:10 +08:00 committed by GitHub
parent 54ef875f5f
commit 79a02edd00
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 42 additions and 17 deletions

1
Cargo.lock generated
View File

@ -3579,6 +3579,7 @@ dependencies = [
"bytes",
"collab",
"collab-entity",
"database-entity",
"prost",
"prost-build",
"protoc-bin-vendored",

View File

@ -72,7 +72,7 @@ token = { path = "libs/token" }
snowflake = { path = "libs/snowflake" }
realtime = { path = "libs/realtime" }
database = { path = "libs/database" }
database-entity = { path = "libs/database-entity" }
database-entity.workspace = true
gotrue = { path = "libs/gotrue" }
gotrue-entity = { path = "libs/gotrue-entity" }
infra = { path = "libs/infra" }
@ -123,6 +123,7 @@ members = [
[workspace.dependencies]
realtime-entity = { path = "libs/realtime-entity" }
database-entity = { path = "libs/database-entity" }
app-error = { path = "libs/app_error" }
serde_json = "1.0.108"
serde = { version = "1.0.108", features = ["derive"] }

View File

@ -13,7 +13,7 @@ serde_repr = "0.1.16"
gotrue = { path = "../gotrue" }
gotrue-entity = { path = "../gotrue-entity" }
shared_entity = { path = "../shared-entity" }
database-entity = { path = "../database-entity" }
database-entity.workspace = true
url = "2.4.1"
tokio-stream = { version = "0.1.14" }
parking_lot = "0.12.1"

View File

@ -18,5 +18,6 @@ pub mod error {
pub mod entity {
pub use database_entity::dto::*;
pub use gotrue_entity::dto::*;
pub use realtime_entity::user::*;
pub use shared_entity::dto::*;
}

View File

@ -395,7 +395,7 @@ pub struct AFUserWorkspaceInfo {
pub workspaces: Vec<AFWorkspace>,
}
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AFWorkspaceMember {
pub name: String,
pub email: String,

View File

@ -19,7 +19,7 @@ pub struct AFWorkspaceRow {
/// Represent the row of the af_user table
#[derive(Debug, FromRow, Deserialize, Serialize, Clone)]
pub struct AFUserRow {
pub uid: Option<i64>,
pub uid: i64,
pub uuid: Option<Uuid>,
pub email: Option<String>,
pub password: Option<String>,

View File

@ -9,7 +9,7 @@ edition = "2021"
collab = { version = "0.1.0"}
collab-entity = { version = "0.1.0" }
validator = { version = "0.16", features = ["validator_derive", "derive"] }
database-entity = { path = "../database-entity" }
database-entity.workspace = true
app-error = { workspace = true, features = ["sqlx_error", "validation_error", "s3_error"] }
tokio = { version = "1.26", features = ["sync"] }

View File

@ -16,6 +16,7 @@ actix = { version = "0.13", optional = true }
bincode = "1.3.3"
tokio-tungstenite = { version = "0.20.1", optional = true }
prost = "0.12.1"
database-entity.workspace = true
[features]
actix_message = ["actix"]

View File

@ -1,13 +1,23 @@
use database_entity::dto::AFWorkspaceMember;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum UserMessage {
ProfileChange(AFUserChange),
WorkspaceMemberChange(AFWorkspaceMemberChange),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AFUserChange {
pub uid: i64,
pub name: Option<String>,
pub email: Option<String>,
pub metadata: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AFWorkspaceMemberChange {
added: Vec<AFWorkspaceMember>,
updated: Vec<AFWorkspaceMember>,
removed: Vec<AFWorkspaceMember>,
}

View File

@ -28,7 +28,7 @@ app-error = { workspace = true }
collab = { version = "0.1.0"}
collab-entity = { version = "0.1.0" }
database = { path = "../database" }
database-entity = { path = "../database-entity" }
database-entity.workspace = true
yrs = "0.16.5"
lib0 = "0.16.3"
chrono = "0.4.30"

View File

@ -12,7 +12,6 @@ use database::collab::CollabStorage;
use std::ops::Deref;
use std::time::{Duration, Instant};
use tokio::sync::broadcast::Receiver;
use database_entity::pg_row::AFUserNotification;
use realtime_entity::user::{AFUserChange, UserMessage};
@ -28,7 +27,7 @@ pub struct ClientSession<
pub server: Addr<CollabServer<S, U, AC>>,
heartbeat_interval: Duration,
client_timeout: Duration,
user_change_recv: Option<Receiver<AFUserNotification>>,
user_change_recv: Option<tokio::sync::mpsc::Receiver<AFUserNotification>>,
}
impl<U, S, AC> ClientSession<U, S, AC>
@ -39,7 +38,7 @@ where
{
pub fn new(
user: U,
user_change_recv: Receiver<AFUserNotification>,
user_change_recv: tokio::sync::mpsc::Receiver<AFUserNotification>,
server: Addr<CollabServer<S, U, AC>>,
heartbeat_interval: Duration,
client_timeout: Duration,
@ -97,7 +96,7 @@ where
let recipient = ctx.address().recipient();
if let Some(mut recv) = self.user_change_recv.take() {
actix::spawn(async move {
while let Ok(notification) = recv.recv().await {
while let Some(notification) = recv.recv().await {
if let Some(user) = notification.payload {
trace!("Receive user change: {:?}", user);
@ -105,6 +104,7 @@ where
// deserialize_any method. So it needs to serialize the metadata to json string.
let metadata = serde_json::to_string(&user.metadata).ok();
let msg = UserMessage::ProfileChange(AFUserChange {
uid: user.uid,
name: user.name,
email: user.email,
metadata,

View File

@ -98,17 +98,17 @@ where
});
}
fn init_collab_with_raw_data(
async fn init_collab_with_raw_data(
encoded_collab: EncodedCollabV1,
doc: &Doc,
) -> Result<(), RealtimeError> {
if encoded_collab.doc_state.is_empty() {
return Err(RealtimeError::UnexpectedData("raw data is empty"));
}
let mut txn = doc.transact_mut();
if encoded_collab.doc_state.is_empty() {
return Err(RealtimeError::UnexpectedData("Document state is empty"));
}
let mut txn = doc.transact_mut();
let update = Update::decode_v1(&encoded_collab.doc_state)?;
txn.try_apply_update(update)?;
Ok(())
@ -129,7 +129,7 @@ where
};
match self.storage.get_collab_encoded_v1(&self.uid, params).await {
Ok(encoded_collab) => match init_collab_with_raw_data(encoded_collab, doc) {
Ok(encoded_collab) => match init_collab_with_raw_data(encoded_collab, doc).await {
Ok(_) => {},
Err(e) => error!("🔴Init collab failed: {:?}", e),
},

View File

@ -14,7 +14,7 @@ thiserror = "1.0.47"
reqwest = "0.11.18"
uuid = { version = "1.3.3", features = ["v4"] }
gotrue-entity = { path = "../gotrue-entity" }
database-entity = { path = "../database-entity" }
database-entity.workspace = true
collab-entity = { version = "0.1.0" }
app-error = { workspace = true }

View File

@ -42,7 +42,7 @@ pub async fn establish_ws_connection(
match result {
Ok(uid) => {
let user_change_recv = state.pg_listeners.subscribe_user_change();
let user_change_recv = state.pg_listeners.subscribe_user_change(uid);
let realtime_user = Arc::new(RealtimeUserImpl::new(uid, device_id));
let client = ClientSession::new(
realtime_user,

View File

@ -44,8 +44,19 @@ impl PgListeners {
self.collab_member_listener.notify.subscribe()
}
pub fn subscribe_user_change(&self) -> broadcast::Receiver<AFUserNotification> {
self.user_listener.notify.subscribe()
pub fn subscribe_user_change(&self, uid: i64) -> tokio::sync::mpsc::Receiver<AFUserNotification> {
let (tx, rx) = tokio::sync::mpsc::channel(1);
let mut user_notify = self.user_listener.notify.subscribe();
tokio::spawn(async move {
while let Ok(notification) = user_notify.recv().await {
if let Some(row) = notification.payload.as_ref() {
if row.uid == uid {
let _ = tx.send(notification).await;
}
}
}
});
rx
}
}