From cd2c0a31147adaad234bc31cb97c4f402a865618 Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Sat, 16 Dec 2023 08:27:57 +0800 Subject: [PATCH] fix: casbin access control (#214) * chore: fix access control * fix: create user update collab permissions * chore: select workspace role it's not found in cache * fix: encode buffer error --------- Co-authored-by: Jiraffe7 --- libs/database/src/collab/collab_db_ops.rs | 2 +- libs/realtime/src/collaborate/permission.rs | 2 +- libs/realtime/src/collaborate/plugin.rs | 31 ++-- src/api/user.rs | 2 + src/api/workspace.rs | 15 +- src/biz/casbin/access_control.rs | 150 ++++++++++++++------ src/biz/collab/access_control.rs | 2 +- src/biz/collab/ops.rs | 23 ++- src/biz/user.rs | 53 +++++-- src/biz/workspace/access_control.rs | 23 +++ 10 files changed, 222 insertions(+), 81 deletions(-) diff --git a/libs/database/src/collab/collab_db_ops.rs b/libs/database/src/collab/collab_db_ops.rs index 78ff6879..19b17308 100644 --- a/libs/database/src/collab/collab_db_ops.rs +++ b/libs/database/src/collab/collab_db_ops.rs @@ -446,7 +446,7 @@ pub async fn select_collab_member( Ok(member) } -#[instrument(level = "debug", skip(row), err)] +#[instrument(level = "trace", skip(row), err)] fn collab_member_try_from_row(row: PgRow) -> Result { let access_level = AFAccessLevel::from(row.try_get::(4)?); let permission = AFPermission { diff --git a/libs/realtime/src/collaborate/permission.rs b/libs/realtime/src/collaborate/permission.rs index 5d0ec4e2..8b8153d6 100644 --- a/libs/realtime/src/collaborate/permission.rs +++ b/libs/realtime/src/collaborate/permission.rs @@ -71,7 +71,7 @@ impl CollabAccessControl for Arc where T: CollabAccessControl, { - #[instrument(level = "debug", skip_all, err)] + #[instrument(level = "debug", skip_all)] async fn get_collab_access_level( &self, user: CollabUserId<'_>, diff --git a/libs/realtime/src/collaborate/plugin.rs b/libs/realtime/src/collaborate/plugin.rs index 2fb4f24f..f6a04cf7 100644 --- a/libs/realtime/src/collaborate/plugin.rs +++ b/libs/realtime/src/collaborate/plugin.rs @@ -17,7 +17,7 @@ use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, Ordering}; use std::sync::{Arc, Weak}; use std::time::Duration; use tokio::time::interval; -use tracing::{error, info, trace}; +use tracing::{debug, error, info, trace}; use yrs::updates::decoder::Decode; use yrs::updates::encoder::Encode; use yrs::{ReadTxn, StateVector, Transact, Update}; @@ -135,10 +135,9 @@ where }, Err(err) => match &err { AppError::RecordNotFound(_) => { - trace!( + debug!( "create new collab, cache full access of {} for user:{}", - object_id, - self.uid + object_id, self.uid ); let _ = self .access_control @@ -149,15 +148,8 @@ where ) .await; - let result = { - let txn = doc.transact(); - let doc_state = txn.encode_diff_v1(&StateVector::default()); - let state_vector = txn.state_vector().encode_v1(); - EncodedCollabV1::new(doc_state, state_vector).encode_to_bytes() - }; - // - match result { + match encoded_v1_from_doc(doc).encode_to_bytes() { Ok(encoded_collab_v1) => { let params = InsertCollabParams::from_raw_data( object_id.to_string(), @@ -205,12 +197,8 @@ where let workspace_id = self.workspace_id.clone(); let collab_type = self.collab_type.clone(); let object_id = object_id.to_string(); - if let Ok(encoded_collab_v1) = { - let txn = doc.transact(); - let doc_state = txn.encode_state_as_update_v1(&StateVector::default()); - let state_vector = txn.state_vector().encode_v1(); - EncodedCollabV1::new(doc_state, state_vector).encode_to_bytes() - } { + + if let Ok(encoded_collab_v1) = encoded_v1_from_doc(doc).encode_to_bytes() { let params = InsertCollabParams::from_raw_data(object_id, collab_type, encoded_collab_v1, &workspace_id); let object_id = params.object_id.clone(); @@ -224,6 +212,13 @@ where } } +fn encoded_v1_from_doc(doc: &Doc) -> EncodedCollabV1 { + let txn = doc.transact(); + let doc_state = txn.encode_state_as_update_v1(&StateVector::default()); + let state_vector = txn.state_vector().encode_v1(); + EncodedCollabV1::new(state_vector, doc_state) +} + struct CollabEditState { edit_count: AtomicU32, flush_edit_count: AtomicU32, diff --git a/src/api/user.rs b/src/api/user.rs index e3937e1b..94a222f6 100644 --- a/src/api/user.rs +++ b/src/api/user.rs @@ -47,6 +47,8 @@ async fn verify_user_handler( &state.id_gen, &state.gotrue_client, &access_token, + &state.workspace_access_control, + &state.collab_access_control, ) .await .map_err(AppResponseError::from)?; diff --git a/src/api/workspace.rs b/src/api/workspace.rs index be9d8823..01df6c10 100644 --- a/src/api/workspace.rs +++ b/src/api/workspace.rs @@ -2,6 +2,7 @@ use crate::api::ws::CollabServerImpl; use crate::biz; use crate::biz::user::RealtimeUserImpl; use crate::biz::workspace; +use crate::biz::workspace::access_control::WorkspaceAccessControl; use crate::component::auth::jwt::UserUuid; use crate::state::AppState; use actix_web::web::Bytes; @@ -121,7 +122,7 @@ async fn add_workspace_members_handler( state .workspace_access_control .update_member(&uid, &workspace_id, role) - .await; + .await?; } Ok(AppResponse::Ok().into()) } @@ -175,7 +176,7 @@ async fn remove_workspace_member_handler( state .workspace_access_control .remove_member(&uid, &workspace_id) - .await; + .await?; } } @@ -210,7 +211,7 @@ async fn update_workspace_member_handler( state .workspace_access_control .update_member(&uid, &workspace_id, role) - .await; + .await?; } Ok(AppResponse::Ok().into()) @@ -222,7 +223,13 @@ async fn create_collab_handler( payload: Json, state: Data, ) -> Result>> { - biz::collab::ops::create_collab(&state.pg_pool, &user_uuid, &payload.into_inner()).await?; + biz::collab::ops::create_collab( + &state.pg_pool, + &user_uuid, + &payload.into_inner(), + &state.collab_access_control, + ) + .await?; Ok(Json(AppResponse::Ok())) } diff --git a/src/biz/casbin/access_control.rs b/src/biz/casbin/access_control.rs index 37e58c1e..b1524270 100644 --- a/src/biz/casbin/access_control.rs +++ b/src/biz/casbin/access_control.rs @@ -8,6 +8,7 @@ use database::user::select_uid_from_uuid; use sqlx::PgPool; use tokio::sync::{broadcast, RwLock}; use tracing::log::warn; +use tracing::{event, instrument}; use uuid::Uuid; use crate::biz::{ @@ -18,7 +19,8 @@ use crate::biz::{ }, }; use app_error::AppError; -use database_entity::dto::{AFAccessLevel, AFRole}; +use database_entity::dto::{AFAccessLevel, AFCollabMember, AFRole}; + use realtime::collaborate::{CollabAccessControl, CollabUserId}; use super::{ @@ -80,6 +82,7 @@ impl CasbinAccessControl { /// /// [`ObjectType::Workspace`] has to be paired with [`ActionType::Role`], /// [`ObjectType::Collab`] has to be paired with [`ActionType::Level`], + #[instrument(level = "trace", skip(self, obj, act), err)] async fn update( &self, uid: &i64, @@ -114,6 +117,13 @@ impl CasbinAccessControl { .map_err(|e| AppError::Internal(anyhow!("casbin error removing policy: {e:?}")))?; } + event!( + tracing::Level::TRACE, + "updating policy: object={}, user={},action={}", + obj_id, + uid, + action + ); enforcer .add_policy(vec![uid.to_string(), obj_id, action]) .await @@ -145,6 +155,20 @@ impl CasbinAccessControl { }; Ok(uid) } + + async fn get_collab_member(&self, uid: &i64, oid: &str) -> Result { + database::collab::select_collab_member(uid, oid, &self.pg_pool).await + } + + async fn get_workspace_member_role( + &self, + uid: &i64, + workspace_id: &Uuid, + ) -> Result { + database::workspace::select_workspace_member(&self.pg_pool, uid, workspace_id) + .await + .map(|r| r.role) + } } fn spawn_listen_on_collab_member_change( @@ -245,22 +269,44 @@ impl CollabAccessControl for CasbinCollabAccessControl { oid: &str, ) -> Result { let uid = self.casbin_access_control.get_uid(&user).await?; - let enforcer = self.casbin_access_control.enforcer.read().await; let collab_id = ObjectType::Collab(oid).to_string(); - let policies = enforcer.get_filtered_policy(POLICY_FIELD_INDEX_OBJECT, vec![collab_id]); + let policies = self + .casbin_access_control + .enforcer + .read() + .await + .get_filtered_policy(POLICY_FIELD_INDEX_OBJECT, vec![collab_id]); + // There should only be one entry per user per object, which is enforced in [CasbinAccessControl], so just take one using next. - let access_level = policies + let mut access_level = policies .into_iter() .find(|p| p[POLICY_FIELD_INDEX_USER] == uid.to_string()) .map(|p| p[POLICY_FIELD_INDEX_ACTION].clone()) .and_then(|s| i32::from_str(s.as_str()).ok()) - .map(AFAccessLevel::from) - .ok_or(AppError::RecordNotFound(format!( - "user:{} is not a member of collab:{}", - uid, oid - ))); + .map(AFAccessLevel::from); - access_level + if access_level.is_none() { + if let Ok(member) = self + .casbin_access_control + .get_collab_member(&uid, oid) + .await + { + access_level = Some(member.permission.access_level); + self + .casbin_access_control + .update( + &uid, + &ObjectType::Collab(oid), + &ActionType::Level(member.permission.access_level), + ) + .await?; + } + } + + access_level.ok_or(AppError::RecordNotFound(format!( + "collab:{} does not exist or user:{} is not a member", + uid, oid + ))) } async fn cache_collab_access_level( @@ -352,25 +398,6 @@ pub struct CasbinWorkspaceAccessControl { casbin_access_control: CasbinAccessControl, } -impl CasbinWorkspaceAccessControl { - pub async fn update_member(&self, uid: &i64, workspace_id: &Uuid, role: AFRole) { - let _ = self - .casbin_access_control - .update( - uid, - &ObjectType::Workspace(&workspace_id.to_string()), - &ActionType::Role(role), - ) - .await; - } - pub async fn remove_member(&self, uid: &i64, workspace_id: &Uuid) { - let _ = self - .casbin_access_control - .remove(uid, &ObjectType::Workspace(&workspace_id.to_string())) - .await; - } -} - #[async_trait] impl WorkspaceAccessControl for CasbinWorkspaceAccessControl { async fn get_role_from_uuid( @@ -384,27 +411,62 @@ impl WorkspaceAccessControl for CasbinWorkspaceAccessControl { .await?; self.get_role_from_uid(&uid, workspace_id).await } - async fn get_role_from_uid(&self, uid: &i64, workspace_id: &Uuid) -> Result { - let enforcer = self.casbin_access_control.enforcer.read().await; - let workspace_id = workspace_id.to_string(); - let policies = enforcer.get_filtered_policy( - POLICY_FIELD_INDEX_OBJECT, - vec![ObjectType::Workspace(&workspace_id).to_string()], - ); - // There should only be one entry per user per object, which is enforced in [CasbinAccessControl], so just take one using next. - let role = policies + let policies_future = self + .casbin_access_control + .enforcer + .read() + .await + .get_filtered_policy( + POLICY_FIELD_INDEX_OBJECT, + vec![ObjectType::Workspace(&workspace_id.to_string()).to_string()], + ); + + let role = match policies_future .into_iter() .find(|p| p[POLICY_FIELD_INDEX_USER] == uid.to_string()) - .map(|p| p[POLICY_FIELD_INDEX_ACTION].clone()) - .and_then(|s| i32::from_str(s.as_str()).ok()) - .map(AFRole::from) - .ok_or(AppError::NotEnoughPermissions(format!( + { + Some(policy) => i32::from_str(policy[POLICY_FIELD_INDEX_ACTION].as_str()) + .ok() + .map(AFRole::from), + None => self + .casbin_access_control + .get_workspace_member_role(uid, workspace_id) + .await + .ok(), + }; + + role.ok_or_else(|| { + AppError::NotEnoughPermissions(format!( "user:{} is not a member of workspace:{}", uid, workspace_id - ))); + )) + }) + } - role + async fn update_member( + &self, + uid: &i64, + workspace_id: &Uuid, + role: AFRole, + ) -> Result<(), AppError> { + let _ = self + .casbin_access_control + .update( + uid, + &ObjectType::Workspace(&workspace_id.to_string()), + &ActionType::Role(role), + ) + .await?; + Ok(()) + } + + async fn remove_member(&self, uid: &i64, workspace_id: &Uuid) -> Result<(), AppError> { + let _ = self + .casbin_access_control + .remove(uid, &ObjectType::Workspace(&workspace_id.to_string())) + .await?; + Ok(()) } } diff --git a/src/biz/collab/access_control.rs b/src/biz/collab/access_control.rs index c8703b63..dce055a8 100644 --- a/src/biz/collab/access_control.rs +++ b/src/biz/collab/access_control.rs @@ -316,7 +316,7 @@ where CollabAC: CollabAccessControl, WorkspaceAC: WorkspaceAccessControl, { - #[instrument(level = "debug", skip(self), err)] + #[instrument(level = "debug", skip(self))] async fn get_collab_access_level(&self, uid: &i64, oid: &str) -> Result { self .collab_access_control diff --git a/src/biz/collab/ops.rs b/src/biz/collab/ops.rs index 34584b44..8c149e4f 100644 --- a/src/biz/collab/ops.rs +++ b/src/biz/collab/ops.rs @@ -4,28 +4,43 @@ use std::ops::DerefMut; use app_error::AppError; use database_entity::dto::{ - AFCollabMember, AFCollabSnapshots, CollabMemberIdentify, DeleteCollabParams, + AFAccessLevel, AFCollabMember, AFCollabSnapshots, CollabMemberIdentify, DeleteCollabParams, InsertCollabMemberParams, InsertCollabParams, QueryCollabMembers, QueryObjectSnapshotParams, QuerySnapshotParams, UpdateCollabMemberParams, }; +use realtime::collaborate::{CollabAccessControl, CollabUserId}; use sqlx::{types::Uuid, PgPool}; use tracing::{event, trace}; use validator::Validate; -pub async fn create_collab( +pub async fn create_collab( pg_pool: &PgPool, user_uuid: &Uuid, params: &InsertCollabParams, -) -> Result<(), AppError> { + collab_access_control: &C, +) -> Result<(), AppError> +where + C: CollabAccessControl, +{ params.validate()?; if database::collab::collab_exists(pg_pool, ¶ms.object_id).await? { + // When calling this function, the caller should have already checked if the collab exists. return Err(AppError::RecordAlreadyExists(format!( "Collab with object_id {} already exists", params.object_id ))); } - upsert_collab(pg_pool, user_uuid, params).await + + upsert_collab(pg_pool, user_uuid, params).await?; + collab_access_control + .cache_collab_access_level( + CollabUserId::UserUuid(user_uuid), + ¶ms.object_id, + AFAccessLevel::FullAccess, + ) + .await?; + Ok(()) } pub async fn upsert_collab( diff --git a/src/biz/user.rs b/src/biz/user.rs index e4a25173..c8910511 100644 --- a/src/biz/user.rs +++ b/src/biz/user.rs @@ -1,6 +1,7 @@ use anyhow::{Context, Result}; use gotrue::api::Client; +use realtime::collaborate::CollabAccessControl; use serde_json::json; use shared_entity::response::AppResponseError; use std::fmt::{Display, Formatter}; @@ -9,8 +10,11 @@ use std::sync::Arc; use uuid::Uuid; use database::workspace::{select_user_profile, select_user_workspace, select_workspace}; -use database_entity::dto::{AFUserProfile, AFUserWorkspaceInfo, AFWorkspace, InsertCollabParams}; +use database_entity::dto::{ + AFAccessLevel, AFRole, AFUserProfile, AFUserWorkspaceInfo, AFWorkspace, InsertCollabParams, +}; +use crate::biz::workspace::access_control::WorkspaceAccessControl; use app_error::AppError; use database::collab::insert_into_af_collab; use database::user::{create_user, is_user_exist}; @@ -27,12 +31,18 @@ use workspace_template::WorkspaceTemplateBuilder; /// Return true if the user is a new user /// #[instrument(skip_all, err)] -pub async fn verify_token( +pub async fn verify_token( pg_pool: &PgPool, id_gen: &Arc>, gotrue_client: &Client, access_token: &str, -) -> Result { + workspace_access_control: &W, + collab_access_control: &C, +) -> Result +where + W: WorkspaceAccessControl, + C: CollabAccessControl, +{ let user = gotrue_client.user_info(access_token).await?; let user_uuid = uuid::Uuid::parse_str(&user.id)?; let name = name_from_user_metadata(&user.user_metadata); @@ -60,6 +70,22 @@ pub async fn verify_token( let workspace_id = create_user(txn.deref_mut(), new_uid, &user_uuid, &user.email, &name).await?; + workspace_access_control + .update_member( + &new_uid, + &Uuid::parse_str(&workspace_id).unwrap(), + AFRole::Owner, + ) + .await?; + + collab_access_control + .cache_collab_access_level( + realtime::collaborate::CollabUserId::UserId(&new_uid), + &workspace_id, + AFAccessLevel::FullAccess, + ) + .await?; + // Create the default workspace for the user. A default workspace might contain multiple // templates, e.g. a document template, a database template, etc. let templates = WorkspaceTemplateBuilder::new(new_uid, &workspace_id) @@ -68,15 +94,26 @@ pub async fn verify_token( debug!("create {} templates for user:{}", templates.len(), new_uid); for template in templates { + let object_id = template.object_id; + let encoded_collab_v1 = template + .object_data + .encode_to_bytes() + .map_err(|err| AppError::Internal(anyhow::Error::from(err)))?; + + collab_access_control + .cache_collab_access_level( + realtime::collaborate::CollabUserId::UserId(&new_uid), + &object_id, + AFAccessLevel::FullAccess, + ) + .await?; + insert_into_af_collab( &mut txn, &new_uid, &InsertCollabParams { - object_id: template.object_id, - encoded_collab_v1: template - .object_data - .encode_to_bytes() - .map_err(|err| AppError::Internal(anyhow::Error::from(err)))?, + object_id, + encoded_collab_v1, workspace_id: workspace_id.clone(), collab_type: template.object_type, }, diff --git a/src/biz/workspace/access_control.rs b/src/biz/workspace/access_control.rs index b843887b..9c9aed5a 100644 --- a/src/biz/workspace/access_control.rs +++ b/src/biz/workspace/access_control.rs @@ -10,6 +10,7 @@ use sqlx::PgPool; use std::collections::hash_map::Entry; use std::collections::HashMap; +use anyhow::anyhow; use app_error::AppError; use database_entity::dto::AFRole; use std::sync::Arc; @@ -25,6 +26,15 @@ pub trait WorkspaceAccessControl: Send + Sync + 'static { workspace_id: &Uuid, ) -> Result; async fn get_role_from_uid(&self, uid: &i64, workspace_id: &Uuid) -> Result; + + async fn update_member( + &self, + uid: &i64, + workspace_id: &Uuid, + role: AFRole, + ) -> Result<(), AppError>; + + async fn remove_member(&self, uid: &i64, workspace_id: &Uuid) -> Result<(), AppError>; } /// Represents the role of the user in the workspace by the workspace id. @@ -194,6 +204,19 @@ impl WorkspaceAccessControl for WorkspaceAccessControlImpl { let role = self.get_user_workspace_role(uid, workspace_id).await?; Ok(role) } + + async fn update_member( + &self, + uid: &i64, + workspace_id: &Uuid, + role: AFRole, + ) -> Result<(), AppError> { + Err(AppError::Internal(anyhow!("Not support"))) + } + + async fn remove_member(&self, uid: &i64, workspace_id: &Uuid) -> Result<(), AppError> { + Err(AppError::Internal(anyhow!("Not support"))) + } } #[derive(Clone)]