feat: send email and create user if adding member but member not exist

This commit is contained in:
Zack Fu Zi Xiang 2024-02-19 15:40:36 +08:00
parent f713ebedaa
commit 32fe1cc24d
No known key found for this signature in database
GPG Key ID: 39DE600AFEEED522
19 changed files with 197 additions and 74 deletions

View File

@ -146,7 +146,7 @@ pub async fn admin_users_handler(
) -> Result<Html<String>, WebAppError> {
let users = state
.gotrue_client
.admin_list_user(&session.token.access_token)
.admin_list_user(&session.token.access_token, None)
.await
.map_or_else(
|err| {

View File

@ -318,7 +318,7 @@ pub async fn should_create_snapshot<'a, E: Executor<'a, Database = Postgres>>(
) -> Result<bool, sqlx::Error> {
let hours = Utc::now() - Duration::hours(SNAPSHOT_PER_HOUR);
let latest_snapshot_time: Option<chrono::DateTime<Utc>> = sqlx::query_scalar(
"SELECT created_at FROM af_collab_snapshot
"SELECT created_at FROM af_collab_snapshot
WHERE oid = $1 ORDER BY created_at DESC LIMIT 1",
)
.bind(oid)
@ -344,7 +344,7 @@ pub(crate) async fn create_snapshot_and_maintain_limit<'a>(
let snapshot_meta = sqlx::query_as!(
AFSnapshotMeta,
r#"
INSERT INTO af_collab_snapshot (oid, blob, len, encrypt, workspace_id)
INSERT INTO af_collab_snapshot (oid, blob, len, encrypt, workspace_id)
VALUES ($1, $2, $3, $4, $5)
RETURNING sid AS snapshot_id, oid AS object_id, created_at
"#,
@ -360,7 +360,7 @@ pub(crate) async fn create_snapshot_and_maintain_limit<'a>(
// When a new snapshot is created that surpasses the preset limit, older snapshots will be deleted to maintain the limit
sqlx::query(
r#"
DELETE FROM af_collab_snapshot
DELETE FROM af_collab_snapshot
WHERE oid = $1 AND sid NOT IN ( SELECT sid FROM af_collab_snapshot WHERE oid = $1 ORDER BY created_at DESC LIMIT $2)
"#,
)
@ -403,7 +403,7 @@ pub async fn get_all_collab_snapshot_meta(
AFSnapshotMeta,
r#"
SELECT sid as "snapshot_id", oid as "object_id", created_at
FROM af_collab_snapshot
FROM af_collab_snapshot
WHERE oid = $1 AND deleted_at IS NULL
ORDER BY created_at DESC;
"#,

View File

@ -366,6 +366,7 @@ pub async fn select_workspace<'a, E: Executor<'a, Database = Postgres>>(
.await?;
Ok(workspace)
}
#[inline]
pub async fn update_updated_at_of_workspace<'a, E: Executor<'a, Database = Postgres>>(
executor: E,

View File

@ -1,7 +1,7 @@
use super::grant::Grant;
use crate::params::{
AdminDeleteUserParams, AdminUserParams, CreateSSOProviderParams, GenerateLinkParams,
GenerateLinkResponse, MagicLinkParams,
GenerateLinkResponse, InviteUserParams, MagicLinkParams,
};
use anyhow::Context;
use gotrue_entity::dto::{
@ -139,12 +139,14 @@ impl Client {
pub async fn admin_list_user(
&self,
access_token: &str,
filter: Option<&str>,
) -> Result<AdminListUsersResponse, GoTrueError> {
let url = format!("{}/admin/users", self.base_url);
let resp = self
.http_client_with_auth(Method::GET, &url, access_token)
.send()
.await?;
let mut req = self.http_client_with_auth(Method::GET, &url, access_token);
if let Some(filter) = filter {
req = req.query(&[("filter", filter)]);
}
let resp = req.send().await?;
to_gotrue_result(resp).await
}
@ -191,6 +193,20 @@ impl Client {
to_gotrue_result(resp).await
}
pub async fn admin_invite_user(
&self,
access_token: &str,
admin_user_params: &InviteUserParams,
) -> Result<User, GoTrueError> {
let url = format!("{}/invite", self.base_url);
let resp = self
.http_client_with_auth(Method::POST, &url, access_token)
.json(&admin_user_params)
.send()
.await?;
to_gotrue_result(resp).await
}
pub async fn admin_add_user(
&self,
access_token: &str,

View File

@ -8,6 +8,12 @@ pub struct AdminDeleteUserParams {
pub should_soft_delete: bool,
}
#[derive(Default, Serialize)]
pub struct InviteUserParams {
pub email: String,
pub data: serde_json::Value,
}
#[derive(Debug, Default, Deserialize, Serialize)]
pub struct AdminUserParams {
pub aud: String,

View File

@ -176,15 +176,13 @@ async fn add_workspace_members_handler(
state: Data<AppState>,
) -> Result<JsonAppResponse<()>> {
let create_members = payload.into_inner();
let role_by_uid = workspace::ops::add_workspace_members(
&state.pg_pool,
&user_uuid,
&workspace_id,
create_members.0,
)
.await?;
let role_by_uid =
workspace::ops::add_workspace_members(&state, &user_uuid, &workspace_id, create_members.0)
.await?;
for (uid, role) in role_by_uid {
// TODO: use one single query to insert all roles
state
.workspace_access_control
.insert_workspace_role(&uid, &workspace_id, role)

View File

@ -13,6 +13,8 @@ use actix_web::{dev::Server, web, web::Data, App, HttpServer};
use actix::Actor;
use anyhow::{Context, Error};
use app_error::AppError;
use gotrue::grant::{Grant, PasswordGrant};
use openssl::ssl::{SslAcceptor, SslAcceptorBuilder, SslFiletype, SslMethod};
use openssl::x509::X509;
use secrecy::{ExposeSecret, Secret};
@ -176,6 +178,12 @@ pub async fn init_state(config: &Config) -> Result<AppState, Error> {
let gotrue_client = get_gotrue_client(&config.gotrue).await?;
setup_admin_account(&gotrue_client, &pg_pool, &config.gotrue).await?;
// Gotrue Admin
let gotrue_admin = GoTrueAdmin::new(
config.gotrue.admin_email.clone(),
config.gotrue.admin_password.clone(),
);
// Redis
info!("Connecting to Redis...");
let redis_client = get_redis_client(config.redis_uri.expose_secret()).await?;
@ -215,6 +223,7 @@ pub async fn init_state(config: &Config) -> Result<AppState, Error> {
users: Arc::new(users),
id_gen: Arc::new(RwLock::new(Snowflake::new(1))),
gotrue_client,
gotrue_admin,
redis_client,
collab_storage,
collab_access_control,
@ -225,6 +234,31 @@ pub async fn init_state(config: &Config) -> Result<AppState, Error> {
})
}
#[derive(Debug, Clone)]
pub struct GoTrueAdmin {
pub admin_email: String,
pub password: Secret<String>,
}
impl GoTrueAdmin {
pub fn new(admin_email: String, password: String) -> Self {
Self {
admin_email,
password: password.into(),
}
}
pub async fn token(&self, client: &gotrue::api::Client) -> Result<String, AppError> {
let token = client
.token(&Grant::Password(PasswordGrant {
email: self.admin_email.clone(),
password: self.password.expose_secret().clone(),
}))
.await?;
Ok(token.access_token)
}
}
async fn setup_admin_account(
gotrue_client: &gotrue::api::Client,
pg_pool: &PgPool,

View File

@ -36,7 +36,7 @@ where
}
#[instrument(level = "debug", skip_all, err)]
#[allow(clippy::blocks_in_if_conditions)]
#[allow(clippy::blocks_in_conditions)]
async fn check_collab_permission(
&self,
oid: &str,

View File

@ -177,7 +177,7 @@ where
}
#[instrument(level = "trace", skip(self, params), oid = %params.oid, err)]
#[allow(clippy::blocks_in_if_conditions)]
#[allow(clippy::blocks_in_conditions)]
async fn upsert_collab_with_transaction(
&self,
workspace_id: &str,

View File

@ -1,4 +1,5 @@
use anyhow::{Context, Result};
use gotrue_entity::dto::User;
use crate::biz::workspace::access_control::WorkspaceAccessControl;
use crate::state::AppState;
@ -26,15 +27,30 @@ use workspace_template::{WorkspaceTemplate, WorkspaceTemplateBuilder};
#[instrument(skip_all, err)]
pub async fn verify_token(access_token: &str, state: &AppState) -> Result<bool, AppError> {
let user = state.gotrue_client.user_info(access_token).await?;
let user_uuid = uuid::Uuid::parse_str(&user.id)?;
let name = name_from_user_metadata(&user.user_metadata);
let mut txn = state
.pg_pool
.begin()
.await
.context("acquire transaction to verify token")?;
let is_new = create_user_if_not_exists(&user, &mut txn, state).await;
txn
.commit()
.await
.context("fail to commit transaction to verify token")?;
is_new
}
/// Returns if user is new
pub async fn create_user_if_not_exists(
user: &User,
txn: &mut Transaction<'_, sqlx::Postgres>,
state: &AppState,
) -> Result<bool, AppError> {
let user_uuid = uuid::Uuid::parse_str(&user.id)?;
let name = name_from_user_metadata(&user.user_metadata);
// To prevent concurrent creation of the same user with the same workspace resources, we lock
// the user row when `verify_token` is called. This means that if multiple requests try to
// create the same user simultaneously, the first request will acquire the lock, create the user,
@ -67,16 +83,12 @@ pub async fn verify_token(access_token: &str, state: &AppState) -> Result<bool,
create_workspace_for_user(
new_uid,
&workspace_id,
&mut txn,
txn,
vec![GetStartedDocumentTemplate],
state,
)
.await?;
}
txn
.commit()
.await
.context("fail to commit transaction to verify token")?;
Ok(is_new)
}

View File

@ -1,3 +1,4 @@
use app_error::AppError;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{self, AsyncRead, ReadBuf};
@ -42,3 +43,19 @@ where
&self.reader
}
}
pub async fn check_user_exists(
admin_token: &str,
gotrue_client: &gotrue::api::Client,
email: &str,
) -> Result<bool, AppError> {
let users = gotrue_client
.admin_list_user(admin_token, Some(email))
.await?;
for user in users.users {
if user.email == email {
return Ok(true);
}
}
Ok(false)
}

View File

@ -54,7 +54,7 @@ where
}
#[instrument(level = "trace", skip_all, err)]
#[allow(clippy::blocks_in_if_conditions)]
#[allow(clippy::blocks_in_conditions)]
async fn check_workspace_permission(
&self,
workspace_id: &Uuid,

View File

@ -9,6 +9,7 @@ use database::workspace::{
select_workspace_member_list, update_updated_at_of_workspace, upsert_workspace_member,
};
use database_entity::dto::{AFAccessLevel, AFRole, AFWorkspace};
use gotrue::params::InviteUserParams;
use shared_entity::dto::workspace_dto::{CreateWorkspaceMember, WorkspaceMemberChangeset};
use shared_entity::response::AppResponseError;
use sqlx::{types::uuid, PgPool};
@ -17,6 +18,10 @@ use std::ops::DerefMut;
use tracing::instrument;
use uuid::Uuid;
use crate::biz::user::create_user_if_not_exists;
use crate::biz::utils::check_user_exists;
use crate::state::AppState;
pub async fn delete_workspace_for_user(
pg_pool: &PgPool,
workspace_id: &Uuid,
@ -83,16 +88,52 @@ pub async fn open_workspace(
///
#[instrument(level = "debug", skip_all, err)]
pub async fn add_workspace_members(
pg_pool: &PgPool,
state: &AppState,
_user_uuid: &Uuid,
workspace_id: &Uuid,
members: Vec<CreateWorkspaceMember>,
) -> Result<HashMap<i64, AFRole>, AppError> {
let mut txn = pg_pool
// Invite user to workspace if user is not registered
let admin_token = state.gotrue_admin.token(&state.gotrue_client).await?;
let mut txn = state
.pg_pool
.begin()
.await
.context("Begin transaction to insert workspace members")?;
let gotrue_client = &state.gotrue_client;
for member in members.iter() {
let user_exists = check_user_exists(&admin_token, gotrue_client, &member.email).await?;
if !user_exists {
let user = gotrue_client
.admin_invite_user(
&admin_token,
&InviteUserParams {
email: member.email.clone(),
..Default::default()
},
)
.await?;
let is_new = create_user_if_not_exists(&user, &mut txn, state).await?;
if !is_new {
tracing::error!("User should be new but is not new. User: {:?}", user);
}
}
}
let role_by_uid = add_workspace_members_db(workspace_id, members, &mut txn).await?;
txn
.commit()
.await
.context("Commit transaction to insert workspace members")?;
Ok(role_by_uid)
}
pub async fn add_workspace_members_db(
workspace_id: &Uuid,
members: Vec<CreateWorkspaceMember>,
txn: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> Result<HashMap<i64, AFRole>, AppError> {
let mut role_by_uid = HashMap::new();
for member in members.into_iter() {
let access_level = match &member.role {
@ -106,16 +147,11 @@ pub async fn add_workspace_members(
// "Failed to get uid from email {} when adding workspace members",
// member.email
// ))?;
insert_workspace_member_with_txn(&mut txn, workspace_id, &member.email, member.role.clone())
.await?;
upsert_collab_member_with_txn(uid, workspace_id.to_string(), &access_level, &mut txn).await?;
insert_workspace_member_with_txn(txn, workspace_id, &member.email, member.role.clone()).await?;
upsert_collab_member_with_txn(uid, workspace_id.to_string(), &access_level, txn).await?;
role_by_uid.insert(uid, member.role);
}
txn
.commit()
.await
.context("Commit transaction to insert workspace members")?;
Ok(role_by_uid)
}

View File

@ -1,3 +1,4 @@
use crate::application::GoTrueAdmin;
use crate::biz::casbin::{CollabAccessControlImpl, WorkspaceAccessControlImpl};
use crate::biz::collab::storage::CollabPostgresDBStorage;
use crate::biz::pg_listener::PgListeners;
@ -24,6 +25,7 @@ pub struct AppState {
pub users: Arc<UserCache>,
pub id_gen: Arc<RwLock<Snowflake>>,
pub gotrue_client: gotrue::api::Client,
pub gotrue_admin: GoTrueAdmin,
pub redis_client: RedisClient,
pub collab_storage: Arc<CollabPostgresDBStorage>,
pub collab_access_control: CollabAccessControlImpl,

View File

@ -1,7 +1,6 @@
use crate::access_control::*;
use actix_http::Method;
use anyhow::{anyhow, Context};
use appflowy_cloud::biz;
use appflowy_cloud::biz::casbin::access_control::{AccessControl, Action, ActionType, ObjectType};
use appflowy_cloud::biz::pg_listener::PgListeners;
@ -52,10 +51,7 @@ async fn test_collab_access_control(pool: PgPool) -> anyhow::Result<()> {
role: AFRole::Guest,
},
];
let _ =
biz::workspace::ops::add_workspace_members(&pool, &user.uuid, &workspace.workspace_id, members)
.await
.context("adding users to workspace")?;
let _ = add_workspace_members_in_tx(&pool, &workspace.workspace_id, members).await;
// user that created the workspace should have full access
assert_access_level(
@ -174,17 +170,15 @@ async fn test_collab_access_control_access_http_method(pool: PgPool) -> anyhow::
.next()
.ok_or(anyhow!("workspace should be created"))?;
let _ = biz::workspace::ops::add_workspace_members(
let _ = add_workspace_members_in_tx(
&pool,
&guest.uuid,
&workspace.workspace_id,
vec![CreateWorkspaceMember {
email: guest.email,
role: AFRole::Guest,
}],
)
.await
.context("adding users to workspace")?;
.await;
for method in [Method::GET, Method::POST, Method::PUT, Method::DELETE] {
assert_can_access_http_method(
@ -274,17 +268,15 @@ async fn test_collab_access_control_send_receive_collab_update(pool: PgPool) ->
.next()
.ok_or(anyhow!("workspace should be created"))?;
let _ = biz::workspace::ops::add_workspace_members(
let _ = add_workspace_members_in_tx(
&pool,
&guest.uuid,
&workspace.workspace_id,
vec![CreateWorkspaceMember {
email: guest.email,
role: AFRole::Guest,
}],
)
.await
.context("adding users to workspace")?;
.await;
// Need to wait for the listener(spawn_listen_on_workspace_member_change) to receive the event
//

View File

@ -1,5 +1,6 @@
use crate::access_control::{
assert_workspace_role, assert_workspace_role_error, create_user, setup_db,
add_workspace_members_in_tx, assert_workspace_role, assert_workspace_role_error, create_user,
setup_db,
};
use anyhow::{anyhow, Context};
use app_error::ErrorCode;
@ -44,17 +45,16 @@ async fn test_workspace_access_control_get_role(pool: PgPool) -> anyhow::Result<
.await;
let member = create_user(&pool).await?;
let _ = biz::workspace::ops::add_workspace_members(
let _ = add_workspace_members_in_tx(
&pool,
&member.uuid,
&workspace.workspace_id,
vec![CreateWorkspaceMember {
email: member.email.clone(),
role: AFRole::Member,
}],
)
.await
.context("adding users to workspace")?;
.await;
assert_workspace_role(
&access_control,

View File

@ -1,15 +1,18 @@
use actix_http::Method;
use anyhow::Context;
use app_error::ErrorCode;
use appflowy_cloud::biz;
use appflowy_cloud::biz::casbin::{CollabAccessControlImpl, WorkspaceAccessControlImpl};
use appflowy_cloud::biz::workspace::access_control::WorkspaceAccessControl;
use client_api_test_util::setup_log;
use database_entity::dto::{AFAccessLevel, AFRole};
use lazy_static::lazy_static;
use realtime::collaborate::CollabAccessControl;
use shared_entity::dto::workspace_dto::CreateWorkspaceMember;
use snowflake::Snowflake;
use sqlx::PgPool;
use std::collections::HashMap;
use std::time::Duration;
use tokio::sync::RwLock;
use tokio::time::{interval, timeout};
@ -267,3 +270,22 @@ pub async fn assert_can_access_http_method(
.await
.expect("Operation timed out");
}
pub async fn add_workspace_members_in_tx(
pool: &PgPool,
workspace_id: &Uuid,
members: Vec<CreateWorkspaceMember>,
) -> HashMap<i64, AFRole> {
let mut txn = pool
.begin()
.await
.expect("acquire transaction to add workspace members");
let res = biz::workspace::ops::add_workspace_members_db(workspace_id, members, &mut txn)
.await
.expect("add workspace members");
txn
.commit()
.await
.expect("commit transaction to add workspace members");
res
}

View File

@ -1,6 +1,5 @@
use crate::access_control::*;
use anyhow::anyhow;
use appflowy_cloud::biz;
use appflowy_cloud::biz::casbin::access_control::{Action, ObjectType, ToCasbinAction, MODEL_CONF};
use appflowy_cloud::biz::casbin::adapter::PgAdapter;
use casbin::{CoreApi, DefaultModel, Enforcer};
@ -103,14 +102,8 @@ async fn test_add_users_to_workspace(pool: PgPool) -> anyhow::Result<()> {
role: AFRole::Guest,
},
];
let _ = biz::workspace::ops::add_workspace_members(
&pool,
&user_main.uuid,
&workspace.workspace_id,
members,
)
.await
.context("adding users to workspace")?;
let _ = add_workspace_members_in_tx(&pool, &workspace.workspace_id, members).await;
let model = DefaultModel::from_str(MODEL_CONF).await?;
let enforcer = Enforcer::new(
@ -251,14 +244,8 @@ async fn test_reload_policy_after_adding_user_to_workspace(pool: PgPool) -> anyh
email: user_member.email.clone(),
role: AFRole::Member,
}];
let _ = biz::workspace::ops::add_workspace_members(
&pool,
&user_owner.uuid,
&workspace.workspace_id,
members,
)
.await
.context("adding users to workspace")?;
let _ = add_workspace_members_in_tx(&pool, &workspace.workspace_id, members).await;
assert!(!enforcer
.enforce((

View File

@ -47,7 +47,7 @@ async fn admin_user_create_list_edit_delete() {
// list users
let users = gotrue_client
.admin_list_user(&admin_token.access_token)
.admin_list_user(&admin_token.access_token, None)
.await
.unwrap()
.users;
@ -94,7 +94,7 @@ async fn admin_user_create_list_edit_delete() {
.unwrap();
let users = gotrue_client
.admin_list_user(&admin_token.access_token)
.admin_list_user(&admin_token.access_token, None)
.await
.unwrap()
.users;