From 93b4a1516cb61ac27fb277e301c362e0fd3447e4 Mon Sep 17 00:00:00 2001 From: Khor Shu Heng <32997938+khorshuheng@users.noreply.github.com> Date: Mon, 27 May 2024 09:12:32 +0800 Subject: [PATCH] feat: add postgres listeners for collab service (#575) --- libs/database/src/lib.rs | 1 + libs/database/src/listener.rs | 42 ++++++++++++ .../src/group/group_init.rs | 1 - services/appflowy-collaborate/src/lib.rs | 1 + .../appflowy-collaborate/src/pg_listener.rs | 65 +++++++++++++++++++ src/biz/pg_listener.rs | 42 +----------- src/biz/user/user_verify.rs | 2 - 7 files changed, 111 insertions(+), 43 deletions(-) create mode 100644 libs/database/src/listener.rs create mode 100644 services/appflowy-collaborate/src/pg_listener.rs diff --git a/libs/database/src/lib.rs b/libs/database/src/lib.rs index cdc651fc..d267e6ba 100644 --- a/libs/database/src/lib.rs +++ b/libs/database/src/lib.rs @@ -2,6 +2,7 @@ pub mod chat; pub mod collab; pub mod file; pub mod history; +pub mod listener; pub mod pg_row; pub mod resource_usage; pub mod user; diff --git a/libs/database/src/listener.rs b/libs/database/src/listener.rs new file mode 100644 index 00000000..d7235851 --- /dev/null +++ b/libs/database/src/listener.rs @@ -0,0 +1,42 @@ +use anyhow::Error; +use serde::de::DeserializeOwned; +use sqlx::postgres::PgListener; +use sqlx::PgPool; +use tokio::sync::broadcast; +use tracing::{error, trace}; + +pub struct PostgresDBListener { + pub notify: broadcast::Sender, +} + +impl PostgresDBListener +where + T: Clone + DeserializeOwned + Send + 'static, +{ + pub async fn new(pg_pool: &PgPool, channel: &str) -> Result { + let mut listener = PgListener::connect_with(pg_pool).await?; + // TODO(nathan): using listen_all + listener.listen(channel).await?; + + let (tx, _) = broadcast::channel(1000); + let notify = tx.clone(); + tokio::spawn(async move { + while let Ok(notification) = listener.recv().await { + trace!("Received notification: {}", notification.payload()); + match serde_json::from_str::(notification.payload()) { + Ok(change) => { + let _ = tx.send(change); + }, + Err(err) => { + error!( + "Failed to deserialize change: {:?}, payload: {}", + err, + notification.payload() + ); + }, + } + } + }); + Ok(Self { notify }) + } +} diff --git a/services/appflowy-collaborate/src/group/group_init.rs b/services/appflowy-collaborate/src/group/group_init.rs index 0c7d0c70..7ac8220b 100644 --- a/services/appflowy-collaborate/src/group/group_init.rs +++ b/services/appflowy-collaborate/src/group/group_init.rs @@ -312,7 +312,6 @@ impl EditState { } } -#[allow(dead_code)] struct CollabUpdateStreamingImpl { sender: mpsc::UnboundedSender>, stopped: Arc, diff --git a/services/appflowy-collaborate/src/lib.rs b/services/appflowy-collaborate/src/lib.rs index eb065dd7..9d6df64b 100644 --- a/services/appflowy-collaborate/src/lib.rs +++ b/services/appflowy-collaborate/src/lib.rs @@ -7,6 +7,7 @@ pub mod error; mod group; pub mod metrics; mod permission; +mod pg_listener; mod rt_server; pub mod shared_state; pub mod snapshot; diff --git a/services/appflowy-collaborate/src/pg_listener.rs b/services/appflowy-collaborate/src/pg_listener.rs new file mode 100644 index 00000000..4043cd99 --- /dev/null +++ b/services/appflowy-collaborate/src/pg_listener.rs @@ -0,0 +1,65 @@ +use crate::collab::notification::CollabMemberNotification; +use anyhow::Error; +use database::listener::PostgresDBListener; +use database::pg_row::AFUserNotification; +use sqlx::PgPool; +use tokio::sync::broadcast; +use workspace_access::notification::WorkspaceMemberNotification; + +#[allow(dead_code)] +pub struct PgListeners { + user_listener: UserListener, + workspace_member_listener: WorkspaceMemberListener, + collab_member_listener: CollabMemberListener, +} + +#[allow(dead_code)] +impl PgListeners { + pub async fn new(pg_pool: &PgPool) -> Result { + let user_listener = UserListener::new(pg_pool, "af_user_channel").await?; + + let workspace_member_listener = + WorkspaceMemberListener::new(pg_pool, "af_workspace_member_channel").await?; + + let collab_member_listener = + CollabMemberListener::new(pg_pool, "af_collab_member_channel").await?; + + Ok(Self { + user_listener, + workspace_member_listener, + collab_member_listener, + }) + } + + pub fn subscribe_workspace_member_change( + &self, + ) -> broadcast::Receiver { + self.workspace_member_listener.notify.subscribe() + } + + pub fn subscribe_collab_member_change(&self) -> broadcast::Receiver { + self.collab_member_listener.notify.subscribe() + } + + pub fn subscribe_user_change(&self, uid: i64) -> tokio::sync::mpsc::Receiver { + let (tx, rx) = tokio::sync::mpsc::channel(100); + let mut user_notify = self.user_listener.notify.subscribe(); + tokio::spawn(async move { + while let Ok(notification) = user_notify.recv().await { + if let Some(row) = notification.payload.as_ref() { + if row.uid == uid { + let _ = tx.send(notification).await; + } + } + } + }); + rx + } +} + +#[allow(dead_code)] +pub type CollabMemberListener = PostgresDBListener; +#[allow(dead_code)] +pub type UserListener = PostgresDBListener; +#[allow(dead_code)] +pub type WorkspaceMemberListener = PostgresDBListener; diff --git a/src/biz/pg_listener.rs b/src/biz/pg_listener.rs index 8b798559..5ecd4053 100644 --- a/src/biz/pg_listener.rs +++ b/src/biz/pg_listener.rs @@ -1,12 +1,9 @@ -use crate::biz::user::user_verify::UserListener; use anyhow::Error; use appflowy_collaborate::collab::notification::CollabMemberNotification; +use database::listener::PostgresDBListener; use database::pg_row::AFUserNotification; -use serde::de::DeserializeOwned; -use sqlx::postgres::PgListener; use sqlx::PgPool; use tokio::sync::broadcast; -use tracing::{error, trace}; use workspace_access::notification::WorkspaceMemberNotification; pub struct PgListeners { @@ -58,41 +55,6 @@ impl PgListeners { } } -pub struct PostgresDBListener { - notify: broadcast::Sender, -} - -impl PostgresDBListener -where - T: Clone + DeserializeOwned + Send + 'static, -{ - pub async fn new(pg_pool: &PgPool, channel: &str) -> Result { - let mut listener = PgListener::connect_with(pg_pool).await?; - // TODO(nathan): using listen_all - listener.listen(channel).await?; - - let (tx, _) = broadcast::channel(1000); - let notify = tx.clone(); - tokio::spawn(async move { - while let Ok(notification) = listener.recv().await { - trace!("Received notification: {}", notification.payload()); - match serde_json::from_str::(notification.payload()) { - Ok(change) => { - let _ = tx.send(change); - }, - Err(err) => { - error!( - "Failed to deserialize change: {:?}, payload: {}", - err, - notification.payload() - ); - }, - } - } - }); - Ok(Self { notify }) - } -} - pub type CollabMemberListener = PostgresDBListener; +pub type UserListener = PostgresDBListener; pub type WorkspaceMemberListener = PostgresDBListener; diff --git a/src/biz/user/user_verify.rs b/src/biz/user/user_verify.rs index d68765ec..4a35173a 100644 --- a/src/biz/user/user_verify.rs +++ b/src/biz/user/user_verify.rs @@ -6,7 +6,6 @@ use tracing::{event, instrument, trace}; use access_control::workspace::WorkspaceAccessControl; use app_error::AppError; -use database::pg_row::AFUserNotification; use database::user::{create_user, is_user_exist}; use database::workspace::select_workspace; use database_entity::dto::AFRole; @@ -79,7 +78,6 @@ pub async fn verify_token(access_token: &str, state: &AppState) -> Result; // Best effort to get user's name after oauth fn name_from_user_metadata(value: &serde_json::Value) -> String { value