use authentication::jwt::OptionalUserUuid; use collab::core::collab::DataSource; use collab::preclude::Collab; use collab_folder::CollabOrigin; use collab_rt_entity::{ClientCollabMessage, UpdateSync}; use collab_rt_protocol::{Message, SyncMessage}; use database_entity::dto::AFWorkspaceSettingsChange; use std::collections::HashMap; use std::ops::DerefMut; use std::sync::Arc; use anyhow::{anyhow, Context}; use redis::AsyncCommands; use serde_json::json; use sqlx::{types::uuid, PgPool}; use tracing::instrument; use uuid::Uuid; use yrs::updates::encoder::Encode; use access_control::workspace::WorkspaceAccessControl; use app_error::AppError; use appflowy_collaborate::collab::storage::CollabAccessControlStorage; use database::collab::{upsert_collab_member_with_txn, CollabStorage}; use database::file::s3_client_impl::S3BucketStorage; use database::pg_row::AFWorkspaceMemberRow; use database::user::select_uid_from_email; use database::workspace::*; use database_entity::dto::{ AFAccessLevel, AFRole, AFWorkspace, AFWorkspaceInvitation, AFWorkspaceInvitationStatus, AFWorkspaceSettings, GlobalComment, Reaction, WorkspaceUsage, }; use gotrue::params::{GenerateLinkParams, GenerateLinkType}; use shared_entity::dto::workspace_dto::{ CreateWorkspaceMember, WorkspaceMemberChangeset, WorkspaceMemberInvitation, }; use shared_entity::response::AppResponseError; use workspace_template::document::getting_started::GettingStartedTemplate; use crate::biz::user::user_init::{ create_user_awareness, create_workspace_collab, create_workspace_database_collab, initialize_workspace_for_user, }; use crate::mailer::{AFCloudMailer, WorkspaceInviteMailerParam}; use crate::state::{GoTrueAdmin, RedisConnectionManager}; const MAX_COMMENT_LENGTH: usize = 5000; pub async fn delete_workspace_for_user( pg_pool: PgPool, workspace_id: Uuid, bucket_storage: Arc, ) -> Result<(), AppResponseError> { // remove files from s3 bucket_storage .remove_dir(workspace_id.to_string().as_str()) .await?; // remove from postgres delete_from_workspace(&pg_pool, &workspace_id).await?; // TODO: There can be a rare case where user uploads while workspace is being deleted. // We need some routine job to clean up these orphaned files. Ok(()) } /// Create an empty workspace with default folder, workspace database and user awareness collab /// object. pub async fn create_empty_workspace( pg_pool: &PgPool, workspace_access_control: Arc, collab_storage: &Arc, user_uuid: &Uuid, user_uid: i64, workspace_name: &str, ) -> Result { let mut txn = pg_pool.begin().await?; let new_workspace_row = insert_user_workspace(&mut txn, user_uuid, workspace_name, false).await?; workspace_access_control .insert_role(&user_uid, &new_workspace_row.workspace_id, AFRole::Owner) .await?; let workspace_id = new_workspace_row.workspace_id.to_string(); // create CollabType::Folder create_workspace_collab( user_uid, &workspace_id, workspace_name, collab_storage, &mut txn, ) .await?; // create CollabType::WorkspaceDatabase if let Some(database_storage_id) = new_workspace_row.database_storage_id.as_ref() { let workspace_database_object_id = database_storage_id.to_string(); create_workspace_database_collab( &workspace_id, &user_uid, &workspace_database_object_id, collab_storage, &mut txn, vec![], ) .await?; } // create CollabType::UserAwareness create_user_awareness( &user_uid, user_uuid, &workspace_id, collab_storage, &mut txn, ) .await?; let new_workspace = AFWorkspace::try_from(new_workspace_row)?; txn.commit().await?; Ok(new_workspace) } pub async fn create_workspace_for_user( pg_pool: &PgPool, workspace_access_control: Arc, collab_storage: &Arc, user_uuid: &Uuid, user_uid: i64, workspace_name: &str, ) -> Result { let mut txn = pg_pool.begin().await?; let new_workspace_row = insert_user_workspace(&mut txn, user_uuid, workspace_name, true).await?; workspace_access_control .insert_role(&user_uid, &new_workspace_row.workspace_id, AFRole::Owner) .await?; // add create initial collab for user initialize_workspace_for_user( user_uid, user_uuid, &new_workspace_row, &mut txn, vec![GettingStartedTemplate], collab_storage, ) .await?; let new_workspace = AFWorkspace::try_from(new_workspace_row)?; txn.commit().await?; Ok(new_workspace) } pub async fn patch_workspace( pg_pool: &PgPool, workspace_id: &Uuid, workspace_name: Option<&str>, workspace_icon: Option<&str>, ) -> Result<(), AppResponseError> { let mut tx = pg_pool.begin().await?; if let Some(workspace_name) = workspace_name { rename_workspace(&mut tx, workspace_id, workspace_name).await?; } if let Some(workspace_icon) = workspace_icon { change_workspace_icon(&mut tx, workspace_id, workspace_icon).await?; } tx.commit().await?; Ok(()) } pub async fn get_comments_on_published_view( pg_pool: &PgPool, view_id: &Uuid, optional_user_uuid: &OptionalUserUuid, ) -> Result, AppError> { let page_owner_uuid = select_owner_of_published_collab(pg_pool, view_id).await?; let comments = select_comments_for_published_view_ordered_by_recency( pg_pool, view_id, &optional_user_uuid.as_uuid(), &page_owner_uuid, ) .await?; Ok(comments) } pub async fn create_comment_on_published_view( pg_pool: &PgPool, view_id: &Uuid, reply_comment_id: &Option, content: &str, user_uuid: &Uuid, ) -> Result<(), AppError> { if content.len() > MAX_COMMENT_LENGTH { return Err(AppError::StringLengthLimitReached( "comment content exceed limit".to_string(), )); } insert_comment_to_published_view(pg_pool, view_id, user_uuid, content, reply_comment_id).await?; Ok(()) } pub async fn remove_comment_on_published_view( pg_pool: &PgPool, view_id: &Uuid, comment_id: &Uuid, user_uuid: &Uuid, ) -> Result<(), AppError> { check_if_user_is_allowed_to_delete_comment(pg_pool, user_uuid, view_id, comment_id).await?; update_comment_deletion_status(pg_pool, comment_id).await?; Ok(()) } pub async fn get_reactions_on_published_view( pg_pool: &PgPool, view_id: &Uuid, comment_id: &Option, ) -> Result, AppError> { let reaction = match comment_id { Some(comment_id) => { select_reactions_for_comment_ordered_by_reaction_type_creation_time(pg_pool, comment_id) .await? }, None => { select_reactions_for_published_view_ordered_by_reaction_type_creation_time(pg_pool, view_id) .await? }, }; Ok(reaction) } pub async fn create_reaction_on_comment( pg_pool: &PgPool, comment_id: &Uuid, view_id: &Uuid, reaction_type: &str, user_uuid: &Uuid, ) -> Result<(), AppError> { insert_reaction_on_comment(pg_pool, comment_id, view_id, user_uuid, reaction_type).await?; Ok(()) } pub async fn remove_reaction_on_comment( pg_pool: &PgPool, comment_id: &Uuid, reaction_type: &str, user_uuid: &Uuid, ) -> Result<(), AppError> { delete_reaction_from_comment(pg_pool, comment_id, user_uuid, reaction_type).await?; Ok(()) } pub async fn get_all_user_workspaces( pg_pool: &PgPool, user_uuid: &Uuid, include_member_count: bool, ) -> Result, AppResponseError> { let workspaces = select_all_user_workspaces(pg_pool, user_uuid).await?; let mut workspaces = workspaces .into_iter() .flat_map(|row| { let result = AFWorkspace::try_from(row); if let Err(err) = &result { tracing::error!("Failed to convert workspace row to AFWorkspace: {:?}", err); } result }) .collect::>(); if include_member_count { let ids = workspaces .iter() .map(|row| row.workspace_id) .collect::>(); let member_count_by_workspace_id = select_member_count_for_workspaces(pg_pool, &ids).await?; for workspace in workspaces.iter_mut() { if let Some(member_count) = member_count_by_workspace_id.get(&workspace.workspace_id) { workspace.member_count = Some(*member_count); } } } Ok(workspaces) } /// Returns the workspace with the given workspace_id and update the updated_at field of the /// workspace. pub async fn open_workspace( pg_pool: &PgPool, user_uuid: &Uuid, workspace_id: &Uuid, ) -> Result { let mut txn = pg_pool .begin() .await .context("Begin transaction to open workspace")?; let row = select_workspace(txn.deref_mut(), workspace_id).await?; update_updated_at_of_workspace(txn.deref_mut(), user_uuid, workspace_id).await?; txn .commit() .await .context("Commit transaction to open workspace")?; let workspace = AFWorkspace::try_from(row)?; Ok(workspace) } pub async fn accept_workspace_invite( pg_pool: &PgPool, workspace_access_control: Arc, user_uid: i64, user_uuid: &Uuid, invite_id: &Uuid, ) -> Result<(), AppError> { let mut txn = pg_pool.begin().await?; let inv = get_invitation_by_id(&mut txn, invite_id).await?; if let Some(invitee_uid) = inv.invitee_uid { if invitee_uid != user_uid { return Err(AppError::NotInviteeOfWorkspaceInvitation(format!( "User with uid {} is not the invitee for invite_id {}", user_uid, invite_id ))); } } update_workspace_invitation_set_status_accepted(&mut txn, user_uuid, invite_id).await?; let invited_uid = inv .invitee_uid .ok_or_else(|| AppError::Internal(anyhow::anyhow!("Invitee uid is missing for {:?}", inv)))?; workspace_access_control .insert_role(&invited_uid, &inv.workspace_id, inv.role) .await?; txn.commit().await?; Ok(()) } #[instrument(level = "debug", skip_all, err)] #[allow(clippy::too_many_arguments)] pub async fn invite_workspace_members( mailer: &AFCloudMailer, gotrue_admin: &GoTrueAdmin, pg_pool: &PgPool, gotrue_client: &gotrue::api::Client, inviter: &Uuid, workspace_id: &Uuid, invitations: Vec, appflowy_web_url: Option<&str>, ) -> Result<(), AppError> { let mut txn = pg_pool .begin() .await .context("Begin transaction to invite workspace members")?; let admin_token = gotrue_admin.token().await?; let inviter_name = database::user::select_name_from_uuid(pg_pool, inviter).await?; let workspace_name = database::workspace::select_workspace_name_from_workspace_id(pg_pool, workspace_id) .await? .unwrap_or_default(); let workspace_member_count = database::workspace::select_workspace_member_count_from_workspace_id(pg_pool, workspace_id) .await? .unwrap_or_default(); let workspace_members_by_email: HashMap<_, _> = database::workspace::select_workspace_member_list(pg_pool, workspace_id) .await? .into_iter() .map(|row| (row.email, row.role)) .collect(); let pending_invitations = database::workspace::select_workspace_pending_invitations(pg_pool, workspace_id).await?; // check if any of the invited users are already members of the workspace for invitation in &invitations { if workspace_members_by_email.contains_key(&invitation.email) { return Err(AppError::InvalidRequest(format!( "User with email {} is already a member of the workspace", invitation.email ))); } } for invitation in invitations { let inviter_name = inviter_name.clone(); let workspace_name = workspace_name.clone(); let workspace_member_count = workspace_member_count.to_string(); // use default icon until we have workspace icon let workspace_icon_url = "https://miro.medium.com/v2/resize:fit:2400/1*mTPfm7CwU31-tLhtLNkyJw.png".to_string(); let user_icon_url = "https://cdn.pixabay.com/photo/2015/10/05/22/37/blank-profile-picture-973460_1280.png" .to_string(); let invite_id = match pending_invitations.get(&invitation.email) { None => { // user is not invited yet let invite_id = uuid::Uuid::new_v4(); insert_workspace_invitation( &mut txn, &invite_id, workspace_id, inviter, invitation.email.as_str(), invitation.role, ) .await?; invite_id }, Some(invite_id) => { tracing::warn!("User already invited: {}", invitation.email); *invite_id }, }; // Generate a link such that when clicked, the user is added to the workspace. let accept_url = { match appflowy_web_url { Some(appflowy_web_url) => format!("{}/accept-invitation?invited_id={}", appflowy_web_url, invite_id), None => { gotrue_client .admin_generate_link( &admin_token, &GenerateLinkParams { type_: GenerateLinkType::MagicLink, email: invitation.email.clone(), redirect_to: format!( "/web/login-callback?action=accept_workspace_invite&workspace_invitation_id={}&workspace_name={}&workspace_icon={}&user_name={}&user_icon={}&workspace_member_count={}", invite_id, workspace_name, workspace_icon_url, inviter_name, user_icon_url, workspace_member_count, ), ..Default::default() }, ) .await? .action_link }, } }; // send email can be slow, so send email in background let cloned_mailer = mailer.clone(); tokio::spawn(async move { if let Err(err) = cloned_mailer .send_workspace_invite( &invitation.email, WorkspaceInviteMailerParam { user_icon_url, username: inviter_name, workspace_name, workspace_icon_url, workspace_member_count, accept_url, }, ) .await { tracing::error!("Failed to send workspace invite email: {:?}", err); }; }); } txn .commit() .await .context("Commit transaction to invite workspace members")?; Ok(()) } #[instrument(level = "debug", skip_all, err)] pub async fn list_workspace_invitations_for_user( pg_pool: &PgPool, user_uuid: &Uuid, status: Option, ) -> Result, AppError> { let invis = select_workspace_invitations_for_user(pg_pool, user_uuid, status).await?; Ok(invis) } pub async fn get_workspace_invitations_for_user( pg_pool: &PgPool, user_uuid: &Uuid, invite_id: &Uuid, ) -> Result { let user_is_invitee = select_user_is_invitee_for_workspace_invitation(pg_pool, user_uuid, invite_id).await?; if !user_is_invitee { return Err(AppError::NotInviteeOfWorkspaceInvitation(format!( "User with uuid {} is not the invitee for invite_id {}", user_uuid, invite_id ))); } let invitation = select_workspace_invitation_for_user(pg_pool, user_uuid, invite_id).await?; Ok(invitation) } // use in tests only pub async fn add_workspace_members_db_only( pg_pool: &PgPool, _user_uuid: &Uuid, workspace_id: &Uuid, members: Vec, ) -> Result<(), AppError> { let mut txn = pg_pool .begin() .await .context("Begin transaction to insert workspace members")?; for member in members.into_iter() { let access_level = match &member.role { AFRole::Owner => AFAccessLevel::FullAccess, AFRole::Member => AFAccessLevel::ReadAndWrite, AFRole::Guest => AFAccessLevel::ReadOnly, }; let uid = select_uid_from_email(txn.deref_mut(), &member.email).await?; upsert_workspace_member_with_txn(&mut txn, workspace_id, &member.email, member.role.clone()) .await?; upsert_collab_member_with_txn(uid, workspace_id.to_string(), &access_level, &mut txn).await?; } txn .commit() .await .context("Commit transaction to insert workspace members")?; Ok(()) } pub async fn leave_workspace( pg_pool: &PgPool, workspace_id: &Uuid, user_uuid: &Uuid, workspace_access_control: Arc, ) -> Result<(), AppResponseError> { let email = database::user::select_email_from_user_uuid(pg_pool, user_uuid).await?; remove_workspace_members(pg_pool, workspace_id, &[email], workspace_access_control).await } pub async fn remove_workspace_members( pg_pool: &PgPool, workspace_id: &Uuid, member_emails: &[String], workspace_access_control: Arc, ) -> Result<(), AppResponseError> { let mut txn = pg_pool .begin() .await .context("Begin transaction to delete workspace members")?; for email in member_emails { delete_workspace_members(&mut txn, workspace_id, email.as_str()).await?; if let Ok(uid) = select_uid_from_email(txn.deref_mut(), email) .await .map_err(AppResponseError::from) { workspace_access_control .remove_user_from_workspace(&uid, workspace_id) .await?; } } txn .commit() .await .context("Commit transaction to delete workspace members")?; Ok(()) } pub async fn get_workspace_members( pg_pool: &PgPool, workspace_id: &Uuid, ) -> Result, AppResponseError> { Ok(select_workspace_member_list(pg_pool, workspace_id).await?) } pub async fn get_workspace_member( uid: &i64, pg_pool: &PgPool, workspace_id: &Uuid, ) -> Result { Ok(select_workspace_member(pg_pool, uid, workspace_id).await?) } pub async fn update_workspace_member( uid: &i64, pg_pool: &PgPool, workspace_id: &Uuid, changeset: &WorkspaceMemberChangeset, workspace_access_control: Arc, ) -> Result<(), AppError> { if let Some(role) = &changeset.role { upsert_workspace_member(pg_pool, workspace_id, &changeset.email, role.clone()).await?; workspace_access_control .insert_role(uid, workspace_id, role.clone()) .await?; } Ok(()) } pub async fn get_workspace_document_total_bytes( pg_pool: &PgPool, user_uuid: &Uuid, workspace_id: &Uuid, ) -> Result { check_workspace_owner(pg_pool, user_uuid, workspace_id).await?; let byte_count = select_workspace_total_collab_bytes(pg_pool, workspace_id).await?; Ok(WorkspaceUsage { total_document_size: byte_count, }) } pub async fn get_workspace_settings( pg_pool: &PgPool, workspace_id: &Uuid, ) -> Result { let settings = select_workspace_settings(pg_pool, workspace_id).await?; Ok(settings.unwrap_or_default()) } pub async fn update_workspace_settings( pg_pool: &PgPool, workspace_id: &Uuid, change: AFWorkspaceSettingsChange, ) -> Result { let mut tx = pg_pool.begin().await?; let mut setting = select_workspace_settings(tx.deref_mut(), workspace_id) .await? .unwrap_or_default(); if let Some(disable_search_indexing) = change.disable_search_indexing { setting.disable_search_indexing = disable_search_indexing; } if let Some(ai_model) = change.ai_model { setting.ai_model = ai_model; } // Update the workspace settings in the database upsert_workspace_settings(&mut tx, workspace_id, &setting).await?; tx.commit().await?; Ok(setting) } pub async fn check_workspace_owner( pg_pool: &PgPool, user_uuid: &Uuid, workspace_id: &Uuid, ) -> Result<(), AppError> { match select_user_is_workspace_owner(pg_pool, user_uuid, workspace_id).await? { true => Ok(()), false => Err(AppError::UserUnAuthorized( "User is not the owner of the workspace".to_string(), )), } } async fn check_if_user_is_allowed_to_delete_comment( pg_pool: &PgPool, user_uuid: &Uuid, view_id: &Uuid, comment_id: &Uuid, ) -> Result<(), AppError> { let is_allowed = select_user_is_allowed_to_delete_comment(pg_pool, user_uuid, view_id, comment_id).await?; if !is_allowed { return Err(AppError::UserUnAuthorized( "User is not allowed to delete this comment".to_string(), )); } Ok(()) } #[allow(clippy::too_many_arguments)] pub async fn create_upload_task( uid: i64, user_name: &str, user_email: &str, workspace_id: &str, workspace_name: &str, file_size: usize, host: &str, redis_client: &RedisConnectionManager, pg_pool: &PgPool, ) -> Result<(), AppError> { let task_id = Uuid::new_v4(); // Insert the task into the database insert_import_task( task_id, file_size as i64, workspace_id.to_string(), uid, Some(json!({"host": host})), pg_pool, ) .await?; // This task will be deserialized into ImportTask let task = json!({ "notion": { "uid": uid, "user_name": user_name, "user_email": user_email, "task_id": task_id, "workspace_id": workspace_id, "s3_key": workspace_id, "host": host, "workspace_name": workspace_name, } }); let _: () = redis_client .clone() .xadd("import_task_stream", "*", &[("task", task.to_string())]) .await .map_err(|err| AppError::Internal(anyhow!("Failed to push task to Redis stream: {}", err)))?; Ok(()) } /// broadcast updates to collab group if exists pub async fn broadcast_update( collab_storage: &Arc, oid: &str, encoded_update: Vec, ) -> Result<(), AppError> { tracing::info!("broadcasting update to group: {}", oid); let payload = Message::Sync(SyncMessage::Update(encoded_update)).encode_v1(); let msg = ClientCollabMessage::ClientUpdateSync { data: UpdateSync { origin: CollabOrigin::Server, object_id: oid.to_string(), msg_id: chrono::Utc::now().timestamp_millis() as u64, payload: payload.into(), }, }; collab_storage .broadcast_encode_collab(oid.to_string(), vec![msg]) .await?; Ok(()) } pub fn collab_from_doc_state(doc_state: Vec, object_id: &str) -> Result { let collab = Collab::new_with_source( CollabOrigin::Server, object_id, DataSource::DocStateV1(doc_state), vec![], false, ) .map_err(|e| AppError::Unhandled(e.to_string()))?; Ok(collab) }