feat: add postgres listeners for collab service (#575)

This commit is contained in:
Khor Shu Heng 2024-05-27 09:12:32 +08:00 committed by GitHub
parent 559d924cd1
commit 93b4a1516c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 111 additions and 43 deletions

View File

@ -2,6 +2,7 @@ pub mod chat;
pub mod collab;
pub mod file;
pub mod history;
pub mod listener;
pub mod pg_row;
pub mod resource_usage;
pub mod user;

View File

@ -0,0 +1,42 @@
use anyhow::Error;
use serde::de::DeserializeOwned;
use sqlx::postgres::PgListener;
use sqlx::PgPool;
use tokio::sync::broadcast;
use tracing::{error, trace};
pub struct PostgresDBListener<T: Clone> {
pub notify: broadcast::Sender<T>,
}
impl<T> PostgresDBListener<T>
where
T: Clone + DeserializeOwned + Send + 'static,
{
pub async fn new(pg_pool: &PgPool, channel: &str) -> Result<Self, Error> {
let mut listener = PgListener::connect_with(pg_pool).await?;
// TODO(nathan): using listen_all
listener.listen(channel).await?;
let (tx, _) = broadcast::channel(1000);
let notify = tx.clone();
tokio::spawn(async move {
while let Ok(notification) = listener.recv().await {
trace!("Received notification: {}", notification.payload());
match serde_json::from_str::<T>(notification.payload()) {
Ok(change) => {
let _ = tx.send(change);
},
Err(err) => {
error!(
"Failed to deserialize change: {:?}, payload: {}",
err,
notification.payload()
);
},
}
}
});
Ok(Self { notify })
}
}

View File

@ -312,7 +312,6 @@ impl EditState {
}
}
#[allow(dead_code)]
struct CollabUpdateStreamingImpl {
sender: mpsc::UnboundedSender<Vec<u8>>,
stopped: Arc<AtomicBool>,

View File

@ -7,6 +7,7 @@ pub mod error;
mod group;
pub mod metrics;
mod permission;
mod pg_listener;
mod rt_server;
pub mod shared_state;
pub mod snapshot;

View File

@ -0,0 +1,65 @@
use crate::collab::notification::CollabMemberNotification;
use anyhow::Error;
use database::listener::PostgresDBListener;
use database::pg_row::AFUserNotification;
use sqlx::PgPool;
use tokio::sync::broadcast;
use workspace_access::notification::WorkspaceMemberNotification;
#[allow(dead_code)]
pub struct PgListeners {
user_listener: UserListener,
workspace_member_listener: WorkspaceMemberListener,
collab_member_listener: CollabMemberListener,
}
#[allow(dead_code)]
impl PgListeners {
pub async fn new(pg_pool: &PgPool) -> Result<Self, Error> {
let user_listener = UserListener::new(pg_pool, "af_user_channel").await?;
let workspace_member_listener =
WorkspaceMemberListener::new(pg_pool, "af_workspace_member_channel").await?;
let collab_member_listener =
CollabMemberListener::new(pg_pool, "af_collab_member_channel").await?;
Ok(Self {
user_listener,
workspace_member_listener,
collab_member_listener,
})
}
pub fn subscribe_workspace_member_change(
&self,
) -> broadcast::Receiver<WorkspaceMemberNotification> {
self.workspace_member_listener.notify.subscribe()
}
pub fn subscribe_collab_member_change(&self) -> broadcast::Receiver<CollabMemberNotification> {
self.collab_member_listener.notify.subscribe()
}
pub fn subscribe_user_change(&self, uid: i64) -> tokio::sync::mpsc::Receiver<AFUserNotification> {
let (tx, rx) = tokio::sync::mpsc::channel(100);
let mut user_notify = self.user_listener.notify.subscribe();
tokio::spawn(async move {
while let Ok(notification) = user_notify.recv().await {
if let Some(row) = notification.payload.as_ref() {
if row.uid == uid {
let _ = tx.send(notification).await;
}
}
}
});
rx
}
}
#[allow(dead_code)]
pub type CollabMemberListener = PostgresDBListener<CollabMemberNotification>;
#[allow(dead_code)]
pub type UserListener = PostgresDBListener<AFUserNotification>;
#[allow(dead_code)]
pub type WorkspaceMemberListener = PostgresDBListener<WorkspaceMemberNotification>;

View File

@ -1,12 +1,9 @@
use crate::biz::user::user_verify::UserListener;
use anyhow::Error;
use appflowy_collaborate::collab::notification::CollabMemberNotification;
use database::listener::PostgresDBListener;
use database::pg_row::AFUserNotification;
use serde::de::DeserializeOwned;
use sqlx::postgres::PgListener;
use sqlx::PgPool;
use tokio::sync::broadcast;
use tracing::{error, trace};
use workspace_access::notification::WorkspaceMemberNotification;
pub struct PgListeners {
@ -58,41 +55,6 @@ impl PgListeners {
}
}
pub struct PostgresDBListener<T: Clone> {
notify: broadcast::Sender<T>,
}
impl<T> PostgresDBListener<T>
where
T: Clone + DeserializeOwned + Send + 'static,
{
pub async fn new(pg_pool: &PgPool, channel: &str) -> Result<Self, Error> {
let mut listener = PgListener::connect_with(pg_pool).await?;
// TODO(nathan): using listen_all
listener.listen(channel).await?;
let (tx, _) = broadcast::channel(1000);
let notify = tx.clone();
tokio::spawn(async move {
while let Ok(notification) = listener.recv().await {
trace!("Received notification: {}", notification.payload());
match serde_json::from_str::<T>(notification.payload()) {
Ok(change) => {
let _ = tx.send(change);
},
Err(err) => {
error!(
"Failed to deserialize change: {:?}, payload: {}",
err,
notification.payload()
);
},
}
}
});
Ok(Self { notify })
}
}
pub type CollabMemberListener = PostgresDBListener<CollabMemberNotification>;
pub type UserListener = PostgresDBListener<AFUserNotification>;
pub type WorkspaceMemberListener = PostgresDBListener<WorkspaceMemberNotification>;

View File

@ -6,7 +6,6 @@ use tracing::{event, instrument, trace};
use access_control::workspace::WorkspaceAccessControl;
use app_error::AppError;
use database::pg_row::AFUserNotification;
use database::user::{create_user, is_user_exist};
use database::workspace::select_workspace;
use database_entity::dto::AFRole;
@ -79,7 +78,6 @@ pub async fn verify_token(access_token: &str, state: &AppState) -> Result<bool,
Ok(is_new)
}
pub type UserListener = crate::biz::pg_listener::PostgresDBListener<AFUserNotification>;
// Best effort to get user's name after oauth
fn name_from_user_metadata(value: &serde_json::Value) -> String {
value