From 8ab7815740dc6da80060a6fe07b284735488aee8 Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Tue, 16 Apr 2024 20:26:26 +0800 Subject: [PATCH] chore: reduce pool lock (#475) * chore: reduce pool lock * chore: clippy --- .github/workflows/rustlint.yml | 5 +- libs/database/src/collab/collab_storage.rs | 6 +- .../appflowy-history/tests/edit_test/mock.rs | 2 + services/appflowy-history/tests/util.rs | 2 + src/api/workspace.rs | 4 +- src/biz/collab/access_control.rs | 6 +- src/biz/collab/cache.rs | 8 ++- src/biz/collab/mem_cache.rs | 12 ++++ src/biz/collab/storage.rs | 60 ++++++++++--------- src/biz/user/user_init.rs | 4 +- 10 files changed, 70 insertions(+), 39 deletions(-) diff --git a/.github/workflows/rustlint.yml b/.github/workflows/rustlint.yml index 12ce685e..7270844b 100644 --- a/.github/workflows/rustlint.yml +++ b/.github/workflows/rustlint.yml @@ -36,6 +36,9 @@ jobs: with: workspaces: | AppFlowy-Cloud + key: ${{ runner.os }}-cargo-clippy-${{ hashFiles('**/Cargo.lock') }} + restore-keys: | + ${{ runner.os }}-cargo-clippy- - name: Copy and rename dev.env to .env run: cp dev.env .env @@ -49,4 +52,4 @@ jobs: cargo fmt --check - name: Clippy - run: cargo clippy --all-targets --all-features -- -D warnings + run: cargo clippy --all-targets --all-features --tests -- -D warnings diff --git a/libs/database/src/collab/collab_storage.rs b/libs/database/src/collab/collab_storage.rs index f801c564..4ae2318d 100644 --- a/libs/database/src/collab/collab_storage.rs +++ b/libs/database/src/collab/collab_storage.rs @@ -75,7 +75,7 @@ pub trait CollabStorage: Send + Sync + 'static { /// # Returns /// /// * `Result<()>` - Returns `Ok(())` if the collaboration was created successfully, `Err` otherwise. - async fn insert_or_update_collab_with_transaction( + async fn insert_new_collab_with_transaction( &self, workspace_id: &str, uid: &i64, @@ -152,7 +152,7 @@ where .await } - async fn insert_or_update_collab_with_transaction( + async fn insert_new_collab_with_transaction( &self, workspace_id: &str, uid: &i64, @@ -161,7 +161,7 @@ where ) -> AppResult<()> { self .as_ref() - .insert_or_update_collab_with_transaction(workspace_id, uid, params, transaction) + .insert_new_collab_with_transaction(workspace_id, uid, params, transaction) .await } diff --git a/services/appflowy-history/tests/edit_test/mock.rs b/services/appflowy-history/tests/edit_test/mock.rs index f61020dc..9d706e96 100644 --- a/services/appflowy-history/tests/edit_test/mock.rs +++ b/services/appflowy-history/tests/edit_test/mock.rs @@ -5,6 +5,7 @@ use collab_stream::model::{CollabControlEvent, CollabUpdateEvent}; use parking_lot::RwLock; use std::sync::Arc; +#[allow(dead_code)] pub struct StreamEventMock { pub open_event: CollabControlEvent, pub close_event: CollabControlEvent, @@ -13,6 +14,7 @@ pub struct StreamEventMock { pub expected_json: serde_json::Value, } +#[allow(dead_code)] pub async fn mock_test_data( workspace_id: &str, object_id: &str, diff --git a/services/appflowy-history/tests/util.rs b/services/appflowy-history/tests/util.rs index 1e44b190..1384b621 100644 --- a/services/appflowy-history/tests/util.rs +++ b/services/appflowy-history/tests/util.rs @@ -71,6 +71,7 @@ pub async fn setup_db(pool: &PgPool) -> anyhow::Result<()> { } #[derive(Clone)] +#[allow(dead_code)] pub struct TestRpcClient { pub config: Config, history_rpc: HistoryClient, @@ -104,6 +105,7 @@ impl TestRpcClient { } } +#[allow(dead_code)] pub async fn run_test_server(control_stream_key: String) -> TestRpcClient { let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); diff --git a/src/api/workspace.rs b/src/api/workspace.rs index f9f40c21..8c638d27 100644 --- a/src/api/workspace.rs +++ b/src/api/workspace.rs @@ -516,7 +516,7 @@ async fn batch_create_collab_handler( { state .collab_access_control_storage - .insert_or_update_collab_with_transaction(&workspace_id, &uid, params, &mut transaction) + .insert_new_collab_with_transaction(&workspace_id, &uid, params, &mut transaction) .await?; state @@ -591,7 +591,7 @@ async fn create_collab_list_handler( let _object_id = params.object_id.clone(); state .collab_access_control_storage - .insert_or_update_collab_with_transaction(&workspace_id, &uid, params, &mut transaction) + .insert_new_collab_with_transaction(&workspace_id, &uid, params, &mut transaction) .await?; } diff --git a/src/biz/collab/access_control.rs b/src/biz/collab/access_control.rs index 24046df4..dbfeb6e2 100644 --- a/src/biz/collab/access_control.rs +++ b/src/biz/collab/access_control.rs @@ -129,7 +129,7 @@ where trace!("Skip access control for the request"); return Ok(()); } - let collab_exists = self.collab_cache.is_exist_in_disk(oid).await?; + let collab_exists = self.collab_cache.is_exist(oid).await?; if !collab_exists { // If the collab does not exist, we should not enforce the access control return Ok(()); @@ -199,7 +199,7 @@ where uid: &i64, oid: &str, ) -> Result { - let collab_exists = self.cache.is_exist_in_disk(oid).await?; + let collab_exists = self.cache.is_exist(oid).await?; if !collab_exists { // If the collab does not exist, we should not enforce the access control. We consider the user // has the permission to read the collab @@ -217,7 +217,7 @@ where uid: &i64, oid: &str, ) -> Result { - let collab_exists = self.cache.is_exist_in_disk(oid).await?; + let collab_exists = self.cache.is_exist(oid).await?; if !collab_exists { // If the collab does not exist, we should not enforce the access control. we consider the user // has the permission to write the collab diff --git a/src/biz/collab/cache.rs b/src/biz/collab/cache.rs index 96183639..04cd2bf7 100644 --- a/src/biz/collab/cache.rs +++ b/src/biz/collab/cache.rs @@ -149,7 +149,13 @@ impl CollabCache { Ok(()) } - pub async fn is_exist_in_disk(&self, oid: &str) -> Result { + pub async fn is_exist(&self, oid: &str) -> Result { + if let Ok(value) = self.mem_cache.is_exist(oid).await { + if value { + return Ok(value); + } + } + let is_exist = self.disk_cache.is_exist(oid).await?; Ok(is_exist) } diff --git a/src/biz/collab/mem_cache.rs b/src/biz/collab/mem_cache.rs index 24e01132..5dcb3d54 100644 --- a/src/biz/collab/mem_cache.rs +++ b/src/biz/collab/mem_cache.rs @@ -21,6 +21,18 @@ impl CollabMemCache { } } + /// Checks if an object with the given ID exists in the cache. + pub async fn is_exist(&self, object_id: &str) -> Result { + let exists: bool = self + .connection_manager + .lock() + .await + .exists(object_id) + .await + .map_err(|err| AppError::Internal(err.into()))?; + Ok(exists) + } + pub async fn remove_encode_collab(&self, object_id: &str) -> Result<(), AppError> { self .connection_manager diff --git a/src/biz/collab/storage.rs b/src/biz/collab/storage.rs index 140891b8..6e5e86e5 100644 --- a/src/biz/collab/storage.rs +++ b/src/biz/collab/storage.rs @@ -9,7 +9,7 @@ use async_trait::async_trait; use collab::core::collab_plugin::EncodedCollab; use collab_rt::command::{RTCommand, RTCommandSender}; -use database::collab::{is_collab_exists, AppResult, CollabStorage, CollabStorageAccessControl}; +use database::collab::{AppResult, CollabStorage, CollabStorageAccessControl}; use database_entity::dto::{ AFAccessLevel, AFSnapshotMeta, AFSnapshotMetas, CollabParams, InsertSnapshotParams, QueryCollab, QueryCollabParams, QueryCollabResult, SnapshotData, @@ -19,7 +19,6 @@ use itertools::{Either, Itertools}; use collab_rt::data_validation::CollabValidator; use sqlx::Transaction; use std::collections::HashMap; -use std::ops::DerefMut; use std::time::Duration; use tokio::sync::oneshot; use tokio::time::timeout; @@ -159,6 +158,28 @@ where params.object_id, err ))); } + let is_exist = self.cache.is_exist(¶ms.object_id).await?; + if is_exist { + self + .check_write_workspace_permission(workspace_id, uid) + .await?; + self + .check_write_collab_permission(workspace_id, uid, ¶ms.object_id) + .await?; + } else { + self + .check_write_workspace_permission(workspace_id, uid) + .await?; + trace!( + "Update policy for user:{} to create collab:{}", + uid, + params.object_id + ); + self + .access_control + .update_policy(uid, ¶ms.object_id, AFAccessLevel::FullAccess) + .await?; + } let mut transaction = self .cache @@ -168,7 +189,8 @@ where .context("acquire transaction to upsert collab") .map_err(AppError::from)?; self - .insert_or_update_collab_with_transaction(workspace_id, uid, params, &mut transaction) + .cache + .insert_encode_collab_data(workspace_id, uid, params, &mut transaction) .await?; transaction .commit() @@ -179,7 +201,7 @@ where } #[instrument(level = "trace", skip(self, params), oid = %params.oid, ty = %params.collab_type, err)] - async fn insert_or_update_collab_with_transaction( + async fn insert_new_collab_with_transaction( &self, workspace_id: &str, uid: &i64, @@ -187,29 +209,13 @@ where transaction: &mut Transaction<'_, sqlx::Postgres>, ) -> AppResult<()> { params.validate()?; - let is_exist = is_collab_exists(¶ms.object_id, transaction.deref_mut()).await?; - if is_exist { - self - .check_write_workspace_permission(workspace_id, uid) - .await?; - self - .check_write_collab_permission(workspace_id, uid, ¶ms.object_id) - .await?; - trace!( - "Update policy for user:{} to create collab:{}", - uid, - params.object_id - ); - } else { - self - .check_write_workspace_permission(workspace_id, uid) - .await?; - self - .access_control - .update_policy(uid, ¶ms.object_id, AFAccessLevel::FullAccess) - .await?; - } - + self + .check_write_workspace_permission(workspace_id, uid) + .await?; + self + .access_control + .update_policy(uid, ¶ms.object_id, AFAccessLevel::FullAccess) + .await?; self .cache .insert_encode_collab_data(workspace_id, uid, params, transaction) diff --git a/src/biz/user/user_init.rs b/src/biz/user/user_init.rs index 544d458c..71828cef 100644 --- a/src/biz/user/user_init.rs +++ b/src/biz/user/user_init.rs @@ -59,7 +59,7 @@ where .map_err(|err| AppError::Internal(anyhow::Error::from(err)))?; collab_storage - .insert_or_update_collab_with_transaction( + .insert_new_collab_with_transaction( &workspace_id, &uid, CollabParams { @@ -97,7 +97,7 @@ async fn create_workspace_database_collab( .map_err(|err| AppError::Internal(anyhow::Error::from(err)))?; storage - .insert_or_update_collab_with_transaction( + .insert_new_collab_with_transaction( workspace_id, uid, CollabParams {