diff --git a/admin_frontend/src/web_app.rs b/admin_frontend/src/web_app.rs index 4e3796d0..d8873f63 100644 --- a/admin_frontend/src/web_app.rs +++ b/admin_frontend/src/web_app.rs @@ -146,7 +146,7 @@ pub async fn admin_users_handler( ) -> Result, WebAppError> { let users = state .gotrue_client - .admin_list_user(&session.token.access_token) + .admin_list_user(&session.token.access_token, None) .await .map_or_else( |err| { diff --git a/libs/database/src/collab/collab_db_ops.rs b/libs/database/src/collab/collab_db_ops.rs index f4acc1d8..a84117a3 100644 --- a/libs/database/src/collab/collab_db_ops.rs +++ b/libs/database/src/collab/collab_db_ops.rs @@ -318,7 +318,7 @@ pub async fn should_create_snapshot<'a, E: Executor<'a, Database = Postgres>>( ) -> Result { let hours = Utc::now() - Duration::hours(SNAPSHOT_PER_HOUR); let latest_snapshot_time: Option> = sqlx::query_scalar( - "SELECT created_at FROM af_collab_snapshot + "SELECT created_at FROM af_collab_snapshot WHERE oid = $1 ORDER BY created_at DESC LIMIT 1", ) .bind(oid) @@ -344,7 +344,7 @@ pub(crate) async fn create_snapshot_and_maintain_limit<'a>( let snapshot_meta = sqlx::query_as!( AFSnapshotMeta, r#" - INSERT INTO af_collab_snapshot (oid, blob, len, encrypt, workspace_id) + INSERT INTO af_collab_snapshot (oid, blob, len, encrypt, workspace_id) VALUES ($1, $2, $3, $4, $5) RETURNING sid AS snapshot_id, oid AS object_id, created_at "#, @@ -360,7 +360,7 @@ pub(crate) async fn create_snapshot_and_maintain_limit<'a>( // When a new snapshot is created that surpasses the preset limit, older snapshots will be deleted to maintain the limit sqlx::query( r#" - DELETE FROM af_collab_snapshot + DELETE FROM af_collab_snapshot WHERE oid = $1 AND sid NOT IN ( SELECT sid FROM af_collab_snapshot WHERE oid = $1 ORDER BY created_at DESC LIMIT $2) "#, ) @@ -403,7 +403,7 @@ pub async fn get_all_collab_snapshot_meta( AFSnapshotMeta, r#" SELECT sid as "snapshot_id", oid as "object_id", created_at - FROM af_collab_snapshot + FROM af_collab_snapshot WHERE oid = $1 AND deleted_at IS NULL ORDER BY created_at DESC; "#, diff --git a/libs/database/src/workspace.rs b/libs/database/src/workspace.rs index 68c6c851..24594a83 100644 --- a/libs/database/src/workspace.rs +++ b/libs/database/src/workspace.rs @@ -366,6 +366,7 @@ pub async fn select_workspace<'a, E: Executor<'a, Database = Postgres>>( .await?; Ok(workspace) } + #[inline] pub async fn update_updated_at_of_workspace<'a, E: Executor<'a, Database = Postgres>>( executor: E, diff --git a/libs/gotrue/src/api.rs b/libs/gotrue/src/api.rs index 292d2301..d7d487a5 100644 --- a/libs/gotrue/src/api.rs +++ b/libs/gotrue/src/api.rs @@ -1,7 +1,7 @@ use super::grant::Grant; use crate::params::{ AdminDeleteUserParams, AdminUserParams, CreateSSOProviderParams, GenerateLinkParams, - GenerateLinkResponse, MagicLinkParams, + GenerateLinkResponse, InviteUserParams, MagicLinkParams, }; use anyhow::Context; use gotrue_entity::dto::{ @@ -139,12 +139,14 @@ impl Client { pub async fn admin_list_user( &self, access_token: &str, + filter: Option<&str>, ) -> Result { let url = format!("{}/admin/users", self.base_url); - let resp = self - .http_client_with_auth(Method::GET, &url, access_token) - .send() - .await?; + let mut req = self.http_client_with_auth(Method::GET, &url, access_token); + if let Some(filter) = filter { + req = req.query(&[("filter", filter)]); + } + let resp = req.send().await?; to_gotrue_result(resp).await } @@ -191,6 +193,20 @@ impl Client { to_gotrue_result(resp).await } + pub async fn admin_invite_user( + &self, + access_token: &str, + admin_user_params: &InviteUserParams, + ) -> Result { + let url = format!("{}/invite", self.base_url); + let resp = self + .http_client_with_auth(Method::POST, &url, access_token) + .json(&admin_user_params) + .send() + .await?; + to_gotrue_result(resp).await + } + pub async fn admin_add_user( &self, access_token: &str, diff --git a/libs/gotrue/src/params.rs b/libs/gotrue/src/params.rs index 3a1dd390..92988b3d 100644 --- a/libs/gotrue/src/params.rs +++ b/libs/gotrue/src/params.rs @@ -8,6 +8,12 @@ pub struct AdminDeleteUserParams { pub should_soft_delete: bool, } +#[derive(Default, Serialize)] +pub struct InviteUserParams { + pub email: String, + pub data: serde_json::Value, +} + #[derive(Debug, Default, Deserialize, Serialize)] pub struct AdminUserParams { pub aud: String, diff --git a/src/api/workspace.rs b/src/api/workspace.rs index 92e5ac52..a3a37913 100644 --- a/src/api/workspace.rs +++ b/src/api/workspace.rs @@ -176,15 +176,13 @@ async fn add_workspace_members_handler( state: Data, ) -> Result> { let create_members = payload.into_inner(); - let role_by_uid = workspace::ops::add_workspace_members( - &state.pg_pool, - &user_uuid, - &workspace_id, - create_members.0, - ) - .await?; + + let role_by_uid = + workspace::ops::add_workspace_members(&state, &user_uuid, &workspace_id, create_members.0) + .await?; for (uid, role) in role_by_uid { + // TODO: use one single query to insert all roles state .workspace_access_control .insert_workspace_role(&uid, &workspace_id, role) diff --git a/src/application.rs b/src/application.rs index 901ae335..d99f5213 100644 --- a/src/application.rs +++ b/src/application.rs @@ -13,6 +13,8 @@ use actix_web::{dev::Server, web, web::Data, App, HttpServer}; use actix::Actor; use anyhow::{Context, Error}; +use app_error::AppError; +use gotrue::grant::{Grant, PasswordGrant}; use openssl::ssl::{SslAcceptor, SslAcceptorBuilder, SslFiletype, SslMethod}; use openssl::x509::X509; use secrecy::{ExposeSecret, Secret}; @@ -176,6 +178,12 @@ pub async fn init_state(config: &Config) -> Result { let gotrue_client = get_gotrue_client(&config.gotrue).await?; setup_admin_account(&gotrue_client, &pg_pool, &config.gotrue).await?; + // Gotrue Admin + let gotrue_admin = GoTrueAdmin::new( + config.gotrue.admin_email.clone(), + config.gotrue.admin_password.clone(), + ); + // Redis info!("Connecting to Redis..."); let redis_client = get_redis_client(config.redis_uri.expose_secret()).await?; @@ -215,6 +223,7 @@ pub async fn init_state(config: &Config) -> Result { users: Arc::new(users), id_gen: Arc::new(RwLock::new(Snowflake::new(1))), gotrue_client, + gotrue_admin, redis_client, collab_storage, collab_access_control, @@ -225,6 +234,31 @@ pub async fn init_state(config: &Config) -> Result { }) } +#[derive(Debug, Clone)] +pub struct GoTrueAdmin { + pub admin_email: String, + pub password: Secret, +} + +impl GoTrueAdmin { + pub fn new(admin_email: String, password: String) -> Self { + Self { + admin_email, + password: password.into(), + } + } + + pub async fn token(&self, client: &gotrue::api::Client) -> Result { + let token = client + .token(&Grant::Password(PasswordGrant { + email: self.admin_email.clone(), + password: self.password.expose_secret().clone(), + })) + .await?; + Ok(token.access_token) + } +} + async fn setup_admin_account( gotrue_client: &gotrue::api::Client, pg_pool: &PgPool, diff --git a/src/biz/collab/access_control.rs b/src/biz/collab/access_control.rs index 95470033..59fdb091 100644 --- a/src/biz/collab/access_control.rs +++ b/src/biz/collab/access_control.rs @@ -36,7 +36,7 @@ where } #[instrument(level = "debug", skip_all, err)] - #[allow(clippy::blocks_in_if_conditions)] + #[allow(clippy::blocks_in_conditions)] async fn check_collab_permission( &self, oid: &str, diff --git a/src/biz/collab/storage.rs b/src/biz/collab/storage.rs index 26cb9458..3addc347 100644 --- a/src/biz/collab/storage.rs +++ b/src/biz/collab/storage.rs @@ -177,7 +177,7 @@ where } #[instrument(level = "trace", skip(self, params), oid = %params.oid, err)] - #[allow(clippy::blocks_in_if_conditions)] + #[allow(clippy::blocks_in_conditions)] async fn upsert_collab_with_transaction( &self, workspace_id: &str, diff --git a/src/biz/user.rs b/src/biz/user.rs index 743881aa..0f5a501e 100644 --- a/src/biz/user.rs +++ b/src/biz/user.rs @@ -1,4 +1,5 @@ use anyhow::{Context, Result}; +use gotrue_entity::dto::User; use crate::biz::workspace::access_control::WorkspaceAccessControl; use crate::state::AppState; @@ -26,15 +27,30 @@ use workspace_template::{WorkspaceTemplate, WorkspaceTemplateBuilder}; #[instrument(skip_all, err)] pub async fn verify_token(access_token: &str, state: &AppState) -> Result { let user = state.gotrue_client.user_info(access_token).await?; - let user_uuid = uuid::Uuid::parse_str(&user.id)?; - let name = name_from_user_metadata(&user.user_metadata); - let mut txn = state .pg_pool .begin() .await .context("acquire transaction to verify token")?; + let is_new = create_user_if_not_exists(&user, &mut txn, state).await; + txn + .commit() + .await + .context("fail to commit transaction to verify token")?; + + is_new +} + +/// Returns if user is new +pub async fn create_user_if_not_exists( + user: &User, + txn: &mut Transaction<'_, sqlx::Postgres>, + state: &AppState, +) -> Result { + let user_uuid = uuid::Uuid::parse_str(&user.id)?; + let name = name_from_user_metadata(&user.user_metadata); + // To prevent concurrent creation of the same user with the same workspace resources, we lock // the user row when `verify_token` is called. This means that if multiple requests try to // create the same user simultaneously, the first request will acquire the lock, create the user, @@ -67,16 +83,12 @@ pub async fn verify_token(access_token: &str, state: &AppState) -> Result Result { + let users = gotrue_client + .admin_list_user(admin_token, Some(email)) + .await?; + for user in users.users { + if user.email == email { + return Ok(true); + } + } + Ok(false) +} diff --git a/src/biz/workspace/access_control.rs b/src/biz/workspace/access_control.rs index 1ce78b81..c58c92d9 100644 --- a/src/biz/workspace/access_control.rs +++ b/src/biz/workspace/access_control.rs @@ -54,7 +54,7 @@ where } #[instrument(level = "trace", skip_all, err)] - #[allow(clippy::blocks_in_if_conditions)] + #[allow(clippy::blocks_in_conditions)] async fn check_workspace_permission( &self, workspace_id: &Uuid, diff --git a/src/biz/workspace/ops.rs b/src/biz/workspace/ops.rs index d8a02252..18ba7bc5 100644 --- a/src/biz/workspace/ops.rs +++ b/src/biz/workspace/ops.rs @@ -9,6 +9,7 @@ use database::workspace::{ select_workspace_member_list, update_updated_at_of_workspace, upsert_workspace_member, }; use database_entity::dto::{AFAccessLevel, AFRole, AFWorkspace}; +use gotrue::params::InviteUserParams; use shared_entity::dto::workspace_dto::{CreateWorkspaceMember, WorkspaceMemberChangeset}; use shared_entity::response::AppResponseError; use sqlx::{types::uuid, PgPool}; @@ -17,6 +18,10 @@ use std::ops::DerefMut; use tracing::instrument; use uuid::Uuid; +use crate::biz::user::create_user_if_not_exists; +use crate::biz::utils::check_user_exists; +use crate::state::AppState; + pub async fn delete_workspace_for_user( pg_pool: &PgPool, workspace_id: &Uuid, @@ -83,16 +88,52 @@ pub async fn open_workspace( /// #[instrument(level = "debug", skip_all, err)] pub async fn add_workspace_members( - pg_pool: &PgPool, + state: &AppState, _user_uuid: &Uuid, workspace_id: &Uuid, members: Vec, ) -> Result, AppError> { - let mut txn = pg_pool + // Invite user to workspace if user is not registered + let admin_token = state.gotrue_admin.token(&state.gotrue_client).await?; + let mut txn = state + .pg_pool .begin() .await .context("Begin transaction to insert workspace members")?; + let gotrue_client = &state.gotrue_client; + for member in members.iter() { + let user_exists = check_user_exists(&admin_token, gotrue_client, &member.email).await?; + if !user_exists { + let user = gotrue_client + .admin_invite_user( + &admin_token, + &InviteUserParams { + email: member.email.clone(), + ..Default::default() + }, + ) + .await?; + let is_new = create_user_if_not_exists(&user, &mut txn, state).await?; + if !is_new { + tracing::error!("User should be new but is not new. User: {:?}", user); + } + } + } + let role_by_uid = add_workspace_members_db(workspace_id, members, &mut txn).await?; + + txn + .commit() + .await + .context("Commit transaction to insert workspace members")?; + Ok(role_by_uid) +} + +pub async fn add_workspace_members_db( + workspace_id: &Uuid, + members: Vec, + txn: &mut sqlx::Transaction<'_, sqlx::Postgres>, +) -> Result, AppError> { let mut role_by_uid = HashMap::new(); for member in members.into_iter() { let access_level = match &member.role { @@ -106,16 +147,11 @@ pub async fn add_workspace_members( // "Failed to get uid from email {} when adding workspace members", // member.email // ))?; - insert_workspace_member_with_txn(&mut txn, workspace_id, &member.email, member.role.clone()) - .await?; - upsert_collab_member_with_txn(uid, workspace_id.to_string(), &access_level, &mut txn).await?; + insert_workspace_member_with_txn(txn, workspace_id, &member.email, member.role.clone()).await?; + upsert_collab_member_with_txn(uid, workspace_id.to_string(), &access_level, txn).await?; role_by_uid.insert(uid, member.role); } - txn - .commit() - .await - .context("Commit transaction to insert workspace members")?; Ok(role_by_uid) } diff --git a/src/state.rs b/src/state.rs index a8e01f23..a34eaf12 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,3 +1,4 @@ +use crate::application::GoTrueAdmin; use crate::biz::casbin::{CollabAccessControlImpl, WorkspaceAccessControlImpl}; use crate::biz::collab::storage::CollabPostgresDBStorage; use crate::biz::pg_listener::PgListeners; @@ -24,6 +25,7 @@ pub struct AppState { pub users: Arc, pub id_gen: Arc>, pub gotrue_client: gotrue::api::Client, + pub gotrue_admin: GoTrueAdmin, pub redis_client: RedisClient, pub collab_storage: Arc, pub collab_access_control: CollabAccessControlImpl, diff --git a/tests/access_control/collab_ac_test.rs b/tests/access_control/collab_ac_test.rs index 8a394947..1b9be527 100644 --- a/tests/access_control/collab_ac_test.rs +++ b/tests/access_control/collab_ac_test.rs @@ -1,7 +1,6 @@ use crate::access_control::*; use actix_http::Method; use anyhow::{anyhow, Context}; -use appflowy_cloud::biz; use appflowy_cloud::biz::casbin::access_control::{AccessControl, Action, ActionType, ObjectType}; use appflowy_cloud::biz::pg_listener::PgListeners; @@ -52,10 +51,7 @@ async fn test_collab_access_control(pool: PgPool) -> anyhow::Result<()> { role: AFRole::Guest, }, ]; - let _ = - biz::workspace::ops::add_workspace_members(&pool, &user.uuid, &workspace.workspace_id, members) - .await - .context("adding users to workspace")?; + let _ = add_workspace_members_in_tx(&pool, &workspace.workspace_id, members).await; // user that created the workspace should have full access assert_access_level( @@ -174,17 +170,15 @@ async fn test_collab_access_control_access_http_method(pool: PgPool) -> anyhow:: .next() .ok_or(anyhow!("workspace should be created"))?; - let _ = biz::workspace::ops::add_workspace_members( + let _ = add_workspace_members_in_tx( &pool, - &guest.uuid, &workspace.workspace_id, vec![CreateWorkspaceMember { email: guest.email, role: AFRole::Guest, }], ) - .await - .context("adding users to workspace")?; + .await; for method in [Method::GET, Method::POST, Method::PUT, Method::DELETE] { assert_can_access_http_method( @@ -274,17 +268,15 @@ async fn test_collab_access_control_send_receive_collab_update(pool: PgPool) -> .next() .ok_or(anyhow!("workspace should be created"))?; - let _ = biz::workspace::ops::add_workspace_members( + let _ = add_workspace_members_in_tx( &pool, - &guest.uuid, &workspace.workspace_id, vec![CreateWorkspaceMember { email: guest.email, role: AFRole::Guest, }], ) - .await - .context("adding users to workspace")?; + .await; // Need to wait for the listener(spawn_listen_on_workspace_member_change) to receive the event // diff --git a/tests/access_control/member_ac_test.rs b/tests/access_control/member_ac_test.rs index 9642213a..698feb5e 100644 --- a/tests/access_control/member_ac_test.rs +++ b/tests/access_control/member_ac_test.rs @@ -1,5 +1,6 @@ use crate::access_control::{ - assert_workspace_role, assert_workspace_role_error, create_user, setup_db, + add_workspace_members_in_tx, assert_workspace_role, assert_workspace_role_error, create_user, + setup_db, }; use anyhow::{anyhow, Context}; use app_error::ErrorCode; @@ -44,17 +45,16 @@ async fn test_workspace_access_control_get_role(pool: PgPool) -> anyhow::Result< .await; let member = create_user(&pool).await?; - let _ = biz::workspace::ops::add_workspace_members( + + let _ = add_workspace_members_in_tx( &pool, - &member.uuid, &workspace.workspace_id, vec![CreateWorkspaceMember { email: member.email.clone(), role: AFRole::Member, }], ) - .await - .context("adding users to workspace")?; + .await; assert_workspace_role( &access_control, diff --git a/tests/access_control/mod.rs b/tests/access_control/mod.rs index 76ec8bea..472a748b 100644 --- a/tests/access_control/mod.rs +++ b/tests/access_control/mod.rs @@ -1,15 +1,18 @@ use actix_http::Method; use anyhow::Context; use app_error::ErrorCode; +use appflowy_cloud::biz; use appflowy_cloud::biz::casbin::{CollabAccessControlImpl, WorkspaceAccessControlImpl}; use appflowy_cloud::biz::workspace::access_control::WorkspaceAccessControl; use client_api_test_util::setup_log; use database_entity::dto::{AFAccessLevel, AFRole}; use lazy_static::lazy_static; use realtime::collaborate::CollabAccessControl; +use shared_entity::dto::workspace_dto::CreateWorkspaceMember; use snowflake::Snowflake; use sqlx::PgPool; +use std::collections::HashMap; use std::time::Duration; use tokio::sync::RwLock; use tokio::time::{interval, timeout}; @@ -267,3 +270,22 @@ pub async fn assert_can_access_http_method( .await .expect("Operation timed out"); } + +pub async fn add_workspace_members_in_tx( + pool: &PgPool, + workspace_id: &Uuid, + members: Vec, +) -> HashMap { + let mut txn = pool + .begin() + .await + .expect("acquire transaction to add workspace members"); + let res = biz::workspace::ops::add_workspace_members_db(workspace_id, members, &mut txn) + .await + .expect("add workspace members"); + txn + .commit() + .await + .expect("commit transaction to add workspace members"); + res +} diff --git a/tests/access_control/user_ac_test.rs b/tests/access_control/user_ac_test.rs index 5ed801d7..4654cad6 100644 --- a/tests/access_control/user_ac_test.rs +++ b/tests/access_control/user_ac_test.rs @@ -1,6 +1,5 @@ use crate::access_control::*; use anyhow::anyhow; -use appflowy_cloud::biz; use appflowy_cloud::biz::casbin::access_control::{Action, ObjectType, ToCasbinAction, MODEL_CONF}; use appflowy_cloud::biz::casbin::adapter::PgAdapter; use casbin::{CoreApi, DefaultModel, Enforcer}; @@ -103,14 +102,8 @@ async fn test_add_users_to_workspace(pool: PgPool) -> anyhow::Result<()> { role: AFRole::Guest, }, ]; - let _ = biz::workspace::ops::add_workspace_members( - &pool, - &user_main.uuid, - &workspace.workspace_id, - members, - ) - .await - .context("adding users to workspace")?; + + let _ = add_workspace_members_in_tx(&pool, &workspace.workspace_id, members).await; let model = DefaultModel::from_str(MODEL_CONF).await?; let enforcer = Enforcer::new( @@ -251,14 +244,8 @@ async fn test_reload_policy_after_adding_user_to_workspace(pool: PgPool) -> anyh email: user_member.email.clone(), role: AFRole::Member, }]; - let _ = biz::workspace::ops::add_workspace_members( - &pool, - &user_owner.uuid, - &workspace.workspace_id, - members, - ) - .await - .context("adding users to workspace")?; + + let _ = add_workspace_members_in_tx(&pool, &workspace.workspace_id, members).await; assert!(!enforcer .enforce(( diff --git a/tests/gotrue/admin.rs b/tests/gotrue/admin.rs index 5eda6a46..28e91179 100644 --- a/tests/gotrue/admin.rs +++ b/tests/gotrue/admin.rs @@ -47,7 +47,7 @@ async fn admin_user_create_list_edit_delete() { // list users let users = gotrue_client - .admin_list_user(&admin_token.access_token) + .admin_list_user(&admin_token.access_token, None) .await .unwrap() .users; @@ -94,7 +94,7 @@ async fn admin_user_create_list_edit_delete() { .unwrap(); let users = gotrue_client - .admin_list_user(&admin_token.access_token) + .admin_list_user(&admin_token.access_token, None) .await .unwrap() .users;