chore: reduce pool lock (#475)

* chore: reduce pool lock

* chore: clippy
This commit is contained in:
Nathan.fooo 2024-04-16 20:26:26 +08:00 committed by GitHub
parent 4d36f7e9e6
commit 8ab7815740
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 70 additions and 39 deletions

View File

@ -36,6 +36,9 @@ jobs:
with:
workspaces: |
AppFlowy-Cloud
key: ${{ runner.os }}-cargo-clippy-${{ hashFiles('**/Cargo.lock') }}
restore-keys: |
${{ runner.os }}-cargo-clippy-
- name: Copy and rename dev.env to .env
run: cp dev.env .env
@ -49,4 +52,4 @@ jobs:
cargo fmt --check
- name: Clippy
run: cargo clippy --all-targets --all-features -- -D warnings
run: cargo clippy --all-targets --all-features --tests -- -D warnings

View File

@ -75,7 +75,7 @@ pub trait CollabStorage: Send + Sync + 'static {
/// # Returns
///
/// * `Result<()>` - Returns `Ok(())` if the collaboration was created successfully, `Err` otherwise.
async fn insert_or_update_collab_with_transaction(
async fn insert_new_collab_with_transaction(
&self,
workspace_id: &str,
uid: &i64,
@ -152,7 +152,7 @@ where
.await
}
async fn insert_or_update_collab_with_transaction(
async fn insert_new_collab_with_transaction(
&self,
workspace_id: &str,
uid: &i64,
@ -161,7 +161,7 @@ where
) -> AppResult<()> {
self
.as_ref()
.insert_or_update_collab_with_transaction(workspace_id, uid, params, transaction)
.insert_new_collab_with_transaction(workspace_id, uid, params, transaction)
.await
}

View File

@ -5,6 +5,7 @@ use collab_stream::model::{CollabControlEvent, CollabUpdateEvent};
use parking_lot::RwLock;
use std::sync::Arc;
#[allow(dead_code)]
pub struct StreamEventMock {
pub open_event: CollabControlEvent,
pub close_event: CollabControlEvent,
@ -13,6 +14,7 @@ pub struct StreamEventMock {
pub expected_json: serde_json::Value,
}
#[allow(dead_code)]
pub async fn mock_test_data(
workspace_id: &str,
object_id: &str,

View File

@ -71,6 +71,7 @@ pub async fn setup_db(pool: &PgPool) -> anyhow::Result<()> {
}
#[derive(Clone)]
#[allow(dead_code)]
pub struct TestRpcClient {
pub config: Config,
history_rpc: HistoryClient<tonic::transport::Channel>,
@ -104,6 +105,7 @@ impl TestRpcClient {
}
}
#[allow(dead_code)]
pub async fn run_test_server(control_stream_key: String) -> TestRpcClient {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();

View File

@ -516,7 +516,7 @@ async fn batch_create_collab_handler(
{
state
.collab_access_control_storage
.insert_or_update_collab_with_transaction(&workspace_id, &uid, params, &mut transaction)
.insert_new_collab_with_transaction(&workspace_id, &uid, params, &mut transaction)
.await?;
state
@ -591,7 +591,7 @@ async fn create_collab_list_handler(
let _object_id = params.object_id.clone();
state
.collab_access_control_storage
.insert_or_update_collab_with_transaction(&workspace_id, &uid, params, &mut transaction)
.insert_new_collab_with_transaction(&workspace_id, &uid, params, &mut transaction)
.await?;
}

View File

@ -129,7 +129,7 @@ where
trace!("Skip access control for the request");
return Ok(());
}
let collab_exists = self.collab_cache.is_exist_in_disk(oid).await?;
let collab_exists = self.collab_cache.is_exist(oid).await?;
if !collab_exists {
// If the collab does not exist, we should not enforce the access control
return Ok(());
@ -199,7 +199,7 @@ where
uid: &i64,
oid: &str,
) -> Result<bool, AppError> {
let collab_exists = self.cache.is_exist_in_disk(oid).await?;
let collab_exists = self.cache.is_exist(oid).await?;
if !collab_exists {
// If the collab does not exist, we should not enforce the access control. We consider the user
// has the permission to read the collab
@ -217,7 +217,7 @@ where
uid: &i64,
oid: &str,
) -> Result<bool, AppError> {
let collab_exists = self.cache.is_exist_in_disk(oid).await?;
let collab_exists = self.cache.is_exist(oid).await?;
if !collab_exists {
// If the collab does not exist, we should not enforce the access control. we consider the user
// has the permission to write the collab

View File

@ -149,7 +149,13 @@ impl CollabCache {
Ok(())
}
pub async fn is_exist_in_disk(&self, oid: &str) -> Result<bool, AppError> {
pub async fn is_exist(&self, oid: &str) -> Result<bool, AppError> {
if let Ok(value) = self.mem_cache.is_exist(oid).await {
if value {
return Ok(value);
}
}
let is_exist = self.disk_cache.is_exist(oid).await?;
Ok(is_exist)
}

View File

@ -21,6 +21,18 @@ impl CollabMemCache {
}
}
/// Checks if an object with the given ID exists in the cache.
pub async fn is_exist(&self, object_id: &str) -> Result<bool, AppError> {
let exists: bool = self
.connection_manager
.lock()
.await
.exists(object_id)
.await
.map_err(|err| AppError::Internal(err.into()))?;
Ok(exists)
}
pub async fn remove_encode_collab(&self, object_id: &str) -> Result<(), AppError> {
self
.connection_manager

View File

@ -9,7 +9,7 @@ use async_trait::async_trait;
use collab::core::collab_plugin::EncodedCollab;
use collab_rt::command::{RTCommand, RTCommandSender};
use database::collab::{is_collab_exists, AppResult, CollabStorage, CollabStorageAccessControl};
use database::collab::{AppResult, CollabStorage, CollabStorageAccessControl};
use database_entity::dto::{
AFAccessLevel, AFSnapshotMeta, AFSnapshotMetas, CollabParams, InsertSnapshotParams, QueryCollab,
QueryCollabParams, QueryCollabResult, SnapshotData,
@ -19,7 +19,6 @@ use itertools::{Either, Itertools};
use collab_rt::data_validation::CollabValidator;
use sqlx::Transaction;
use std::collections::HashMap;
use std::ops::DerefMut;
use std::time::Duration;
use tokio::sync::oneshot;
use tokio::time::timeout;
@ -159,6 +158,28 @@ where
params.object_id, err
)));
}
let is_exist = self.cache.is_exist(&params.object_id).await?;
if is_exist {
self
.check_write_workspace_permission(workspace_id, uid)
.await?;
self
.check_write_collab_permission(workspace_id, uid, &params.object_id)
.await?;
} else {
self
.check_write_workspace_permission(workspace_id, uid)
.await?;
trace!(
"Update policy for user:{} to create collab:{}",
uid,
params.object_id
);
self
.access_control
.update_policy(uid, &params.object_id, AFAccessLevel::FullAccess)
.await?;
}
let mut transaction = self
.cache
@ -168,7 +189,8 @@ where
.context("acquire transaction to upsert collab")
.map_err(AppError::from)?;
self
.insert_or_update_collab_with_transaction(workspace_id, uid, params, &mut transaction)
.cache
.insert_encode_collab_data(workspace_id, uid, params, &mut transaction)
.await?;
transaction
.commit()
@ -179,7 +201,7 @@ where
}
#[instrument(level = "trace", skip(self, params), oid = %params.oid, ty = %params.collab_type, err)]
async fn insert_or_update_collab_with_transaction(
async fn insert_new_collab_with_transaction(
&self,
workspace_id: &str,
uid: &i64,
@ -187,29 +209,13 @@ where
transaction: &mut Transaction<'_, sqlx::Postgres>,
) -> AppResult<()> {
params.validate()?;
let is_exist = is_collab_exists(&params.object_id, transaction.deref_mut()).await?;
if is_exist {
self
.check_write_workspace_permission(workspace_id, uid)
.await?;
self
.check_write_collab_permission(workspace_id, uid, &params.object_id)
.await?;
trace!(
"Update policy for user:{} to create collab:{}",
uid,
params.object_id
);
} else {
self
.check_write_workspace_permission(workspace_id, uid)
.await?;
self
.access_control
.update_policy(uid, &params.object_id, AFAccessLevel::FullAccess)
.await?;
}
self
.check_write_workspace_permission(workspace_id, uid)
.await?;
self
.access_control
.update_policy(uid, &params.object_id, AFAccessLevel::FullAccess)
.await?;
self
.cache
.insert_encode_collab_data(workspace_id, uid, params, transaction)

View File

@ -59,7 +59,7 @@ where
.map_err(|err| AppError::Internal(anyhow::Error::from(err)))?;
collab_storage
.insert_or_update_collab_with_transaction(
.insert_new_collab_with_transaction(
&workspace_id,
&uid,
CollabParams {
@ -97,7 +97,7 @@ async fn create_workspace_database_collab(
.map_err(|err| AppError::Internal(anyhow::Error::from(err)))?;
storage
.insert_or_update_collab_with_transaction(
.insert_new_collab_with_transaction(
workspace_id,
uid,
CollabParams {