diff --git a/Cargo.lock b/Cargo.lock index ea7dbf6d..9482d3b0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -533,6 +533,7 @@ dependencies = [ "infra", "itertools 0.11.0", "lazy_static", + "lru", "mime", "once_cell", "opener", @@ -2801,6 +2802,15 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +[[package]] +name = "lru" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db2c024b41519440580066ba82aab04092b333e09066a5eb86c7c4890df31f22" +dependencies = [ + "hashbrown 0.14.3", +] + [[package]] name = "mac" version = "0.1.1" diff --git a/Cargo.toml b/Cargo.toml index b9b75484..01e19fc7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,6 +65,7 @@ casbin = { version = "2.1.0" } dotenvy = "0.15.7" url = "2.5.0" brotli = "3.4.0" +lru.workspace = true # collab collab = { version = "0.1.0", features = ["async-plugin"] } @@ -148,6 +149,7 @@ tracing = { version = "0.1"} collab-entity = { version = "0.1.0" } gotrue = { path = "libs/gotrue" } redis = "0.24.0" +lru = "0.12.2" [profile.release] lto = true diff --git a/libs/database/src/collab/collab_db_ops.rs b/libs/database/src/collab/collab_db_ops.rs index b1880ddf..f4acc1d8 100644 --- a/libs/database/src/collab/collab_db_ops.rs +++ b/libs/database/src/collab/collab_db_ops.rs @@ -11,6 +11,7 @@ use crate::pg_row::AFSnapshotRow; use app_error::AppError; use chrono::{Duration, Utc}; use futures_util::stream::BoxStream; + use sqlx::postgres::PgRow; use sqlx::{Error, Executor, PgPool, Postgres, Row, Transaction}; use std::collections::HashMap; @@ -178,11 +179,14 @@ pub async fn insert_into_af_collab( } #[inline] -pub async fn select_blob_from_af_collab( - pg_pool: &PgPool, +pub async fn select_blob_from_af_collab<'a, E>( + conn: E, collab_type: &CollabType, object_id: &str, -) -> Result, sqlx::Error> { +) -> Result, sqlx::Error> +where + E: Executor<'a, Database = Postgres>, +{ let partition_key = collab_type.value(); sqlx::query_scalar!( r#" @@ -193,7 +197,7 @@ pub async fn select_blob_from_af_collab( object_id, partition_key, ) - .fetch_one(pg_pool) + .fetch_one(conn) .await } @@ -330,18 +334,13 @@ pub async fn should_create_snapshot<'a, E: Executor<'a, Database = Postgres>>( /// of snapshots stored for the specified `oid` does not exceed the provided `snapshot_limit`. If the limit /// is exceeded, the oldest snapshots are deleted to maintain the limit. /// -pub(crate) async fn create_snapshot_and_maintain_limit( - pg_pool: &PgPool, +pub(crate) async fn create_snapshot_and_maintain_limit<'a>( + mut transaction: Transaction<'a, Postgres>, oid: &str, encoded_collab_v1: &[u8], workspace_id: &Uuid, snapshot_limit: i64, ) -> Result { - let mut tx = pg_pool - .begin() - .await - .context("acquire transaction to insert collab snapshot")?; - let snapshot_meta = sqlx::query_as!( AFSnapshotMeta, r#" @@ -355,7 +354,7 @@ pub(crate) async fn create_snapshot_and_maintain_limit( 0, workspace_id, ) - .fetch_one(tx.deref_mut()) + .fetch_one(transaction.deref_mut()) .await?; // When a new snapshot is created that surpasses the preset limit, older snapshots will be deleted to maintain the limit @@ -367,10 +366,11 @@ pub(crate) async fn create_snapshot_and_maintain_limit( ) .bind(oid) .bind(snapshot_limit) - .execute(tx.deref_mut()) + .execute(transaction.deref_mut()) .await?; - tx.commit() + transaction + .commit() .await .context("fail to commit the transaction to insert collab snapshot")?; diff --git a/libs/database/src/collab/collab_storage.rs b/libs/database/src/collab/collab_storage.rs index 7c94ae9c..4dcf4710 100644 --- a/libs/database/src/collab/collab_storage.rs +++ b/libs/database/src/collab/collab_storage.rs @@ -4,7 +4,6 @@ use app_error::AppError; use async_trait::async_trait; use collab::core::collab::MutexCollab; use collab::core::collab_plugin::EncodedCollab; - use database_entity::dto::{ AFAccessLevel, AFRole, AFSnapshotMeta, AFSnapshotMetas, CollabParams, CreateCollabParams, InsertSnapshotParams, QueryCollab, QueryCollabParams, QueryCollabResult, SnapshotData, @@ -14,7 +13,9 @@ use sqlx::types::Uuid; use sqlx::{Executor, PgPool, Postgres, Transaction}; use std::collections::HashMap; use std::sync::{Arc, Weak}; -use tracing::{debug, event, warn}; +use std::time::Duration; +use tokio::time::sleep; +use tracing::{debug, event, warn, Level}; use validator::Validate; pub const COLLAB_SNAPSHOT_LIMIT: i64 = 15; @@ -26,7 +27,7 @@ pub type DatabaseResult = core::result::Result; #[async_trait] pub trait CollabStorageAccessControl: Send + Sync + 'static { /// Checks if the user with the given ID can access the [Collab] with the given ID. - async fn get_collab_access_level<'a, E: Executor<'a, Database = Postgres>>( + async fn get_or_refresh_collab_access_level<'a, E: Executor<'a, Database = Postgres>>( &self, uid: &i64, oid: &str, @@ -246,28 +247,50 @@ impl CollabStoragePgImpl { &self, _uid: &i64, params: QueryCollabParams, - ) -> DatabaseResult { + ) -> Result { event!( - tracing::Level::INFO, + Level::INFO, "Get encoded collab:{} from disk", params.object_id ); - match collab_db_ops::select_blob_from_af_collab( - &self.pg_pool, - ¶ms.collab_type, - ¶ms.object_id, - ) - .await - { - Ok(data) => EncodedCollab::decode_from_bytes(&data) - .map_err(|err| AppError::Internal(anyhow!("fail to decode data to EncodedDoc: {:?}", err))), - Err(e) => match e { - sqlx::Error::RowNotFound => { - let msg = format!("Can't find the row for query: {:?}", params); - Err(AppError::RecordNotFound(msg)) + + const MAX_ATTEMPTS: usize = 3; + let mut attempts = 0; + + loop { + let result = collab_db_ops::select_blob_from_af_collab( + &self.pg_pool, + ¶ms.collab_type, + ¶ms.object_id, + ) + .await; + + match result { + Ok(data) => { + return tokio::task::spawn_blocking(move || { + EncodedCollab::decode_from_bytes(&data).map_err(|err| { + AppError::Internal(anyhow!("fail to decode data to EncodedCollab: {:?}", err)) + }) + }) + .await?; }, - _ => Err(e.into()), - }, + Err(e) => { + // Handle non-retryable errors immediately + if matches!(e, sqlx::Error::RowNotFound) { + let msg = format!("Can't find the row for query: {:?}", params); + return Err(AppError::RecordNotFound(msg)); + } + + // Increment attempts and retry if below MAX_ATTEMPTS and the error is retryable + if attempts < MAX_ATTEMPTS - 1 && matches!(e, sqlx::Error::PoolTimedOut) { + attempts += 1; + sleep(Duration::from_millis(500 * attempts as u64)).await; + continue; + } else { + return Err(e.into()); + } + }, + } } } @@ -302,15 +325,23 @@ impl CollabStoragePgImpl { params.validate()?; debug!("create snapshot for object:{}", params.object_id); - let meta = collab_db_ops::create_snapshot_and_maintain_limit( - &self.pg_pool, - ¶ms.object_id, - ¶ms.encoded_collab_v1, - ¶ms.workspace_id.parse::()?, - COLLAB_SNAPSHOT_LIMIT, - ) - .await?; - Ok(meta) + match self.pg_pool.try_begin().await { + Ok(Some(transaction)) => { + let meta = collab_db_ops::create_snapshot_and_maintain_limit( + transaction, + ¶ms.object_id, + ¶ms.encoded_collab_v1, + ¶ms.workspace_id.parse::()?, + COLLAB_SNAPSHOT_LIMIT, + ) + .await?; + + Ok(meta) + }, + _ => Err(AppError::Internal(anyhow!( + "fail to acquire transaction to create snapshot", + ))), + } } pub async fn get_collab_snapshot(&self, snapshot_id: &i64) -> DatabaseResult { diff --git a/src/api/workspace.rs b/src/api/workspace.rs index 01e3a359..d9109684 100644 --- a/src/api/workspace.rs +++ b/src/api/workspace.rs @@ -312,7 +312,6 @@ async fn create_collab_handler( }; params.validate().map_err(AppError::from)?; - // TODO(nathan): should override the existing collab if it exists state.collab_storage.upsert_collab(&uid, params).await?; Ok(Json(AppResponse::Ok())) } @@ -391,7 +390,6 @@ async fn batch_create_collab_handler( .await .context("acquire transaction to upsert collab") .map_err(AppError::from)?; - for params in collab_params_list { state .collab_storage diff --git a/src/biz/collab/access_control.rs b/src/biz/collab/access_control.rs index 07fcf4ff..5fa9f688 100644 --- a/src/biz/collab/access_control.rs +++ b/src/biz/collab/access_control.rs @@ -68,7 +68,7 @@ where CollabAC: CollabAccessControl, WorkspaceAC: WorkspaceAccessControl, { - async fn get_collab_access_level<'a, E: Executor<'a, Database = Postgres>>( + async fn get_or_refresh_collab_access_level<'a, E: Executor<'a, Database = Postgres>>( &self, uid: &i64, oid: &str, diff --git a/src/biz/collab/mem_cache.rs b/src/biz/collab/mem_cache.rs index 349cf3bd..aa50fc21 100644 --- a/src/biz/collab/mem_cache.rs +++ b/src/biz/collab/mem_cache.rs @@ -1,92 +1,52 @@ use crate::state::RedisClient; - use collab::core::collab_plugin::EncodedCollab; - -use redis::AsyncCommands; - -use rand::{thread_rng, Rng}; +use lru::LruCache; +use std::num::NonZeroUsize; use std::sync::Arc; use tokio::sync::Mutex; -use tracing::{error, trace}; - -const ENCODED_COLLAB_KEY_PREFIX: &str = "encoded_collab"; +use tracing::error; #[derive(Clone)] pub struct CollabMemCache { - /// Workaround for Redis cache - /// This cache instance has a unique identifier. Use only the current ID with the cache to ensure its relevance. - /// Using this ID to avoid outdated cache data after server restarts. - id: u32, - redis_client: Arc>, + lru_cache: Arc>>>, } impl CollabMemCache { - pub fn new(redis_client: RedisClient) -> Self { - let mut rng = thread_rng(); - let id: u32 = rng.gen(); - + pub fn new(_redis_client: RedisClient) -> Self { + let lru = LruCache::new(NonZeroUsize::new(5000).unwrap()); Self { - id, - redis_client: Arc::new(Mutex::new(redis_client)), + lru_cache: Arc::new(Mutex::new(lru)), } } pub async fn get_encoded_collab(&self, object_id: &str) -> Option { - let key = encoded_collab_key(self.id, object_id); - let result = self - .redis_client - .lock() - .await - .get::<_, Option>>(&key) - .await; - match result { - Ok(Some(bytes)) => match EncodedCollab::decode_from_bytes(&bytes) { - Ok(encoded_collab) => Some(encoded_collab), - Err(err) => { - error!("Failed to decode collab from redis cache bytes: {:?}", err); - None - }, - }, - Ok(None) => { - trace!( - "No encoded collab found in cache for object_id: {}", - object_id - ); - None - }, + let cache = self.lru_cache.lock().await.get(object_id)?.clone(); + tokio::task::spawn_blocking(move || match EncodedCollab::decode_from_bytes(&cache) { + Ok(encoded_collab) => Some(encoded_collab), Err(err) => { - error!("Failed to get encoded collab from redis: {:?}", err); + error!("Failed to decode collab from redis cache bytes: {:?}", err); None }, - } + }) + .await + .ok()? } - pub async fn cache_encoded_collab(&self, object_id: &str, encoded_collab: &EncodedCollab) { - match encoded_collab.encode_to_bytes() { + pub fn cache_encoded_collab(&self, object_id: String, encoded_collab: &EncodedCollab) { + let encoded_collab = encoded_collab.clone(); + let cache = self.lru_cache.clone(); + tokio::task::spawn_blocking(move || match encoded_collab.encode_to_bytes() { Ok(bytes) => { - self.cache_encoded_collab_bytes(object_id, bytes).await; + tokio::spawn(async move { cache.lock().await.put(object_id, bytes) }); }, Err(e) => { error!("Failed to encode collab to bytes: {:?}", e); }, - } + }); } - pub async fn cache_encoded_collab_bytes(&self, object_id: &str, bytes: Vec) { - let key = encoded_collab_key(self.id, object_id); - if let Err(err) = self - .redis_client - .lock() - .await - .set::<_, Vec, ()>(&key, bytes) - .await - { - error!("Failed to cache encoded collab: {:?}", err); - } + pub fn cache_encoded_collab_bytes(&self, object_id: String, bytes: Vec) { + let cache = self.lru_cache.clone(); + tokio::spawn(async move { cache.lock().await.put(object_id, bytes) }); } } - -#[inline] -fn encoded_collab_key(id: u32, object_id: &str) -> String { - format!("{}:{}:{}", id, ENCODED_COLLAB_KEY_PREFIX, object_id) -} diff --git a/src/biz/collab/ops.rs b/src/biz/collab/ops.rs index 8d55c497..f9258b93 100644 --- a/src/biz/collab/ops.rs +++ b/src/biz/collab/ops.rs @@ -31,20 +31,24 @@ pub async fn create_collab_member( ) -> Result<(), AppError> { params.validate()?; - let mut txn = pg_pool + let mut transaction = pg_pool .begin() .await .context("acquire transaction to insert collab member")?; - if !database::collab::is_collab_exists(¶ms.object_id, txn.deref_mut()).await? { + if !database::collab::is_collab_exists(¶ms.object_id, transaction.deref_mut()).await? { return Err(AppError::RecordNotFound(format!( "Fail to insert collab member. The Collab with object_id {} does not exist", params.object_id ))); } - if database::collab::is_collab_member_exists(params.uid, ¶ms.object_id, txn.deref_mut()) - .await? + if database::collab::is_collab_member_exists( + params.uid, + ¶ms.object_id, + transaction.deref_mut(), + ) + .await? { return Err(AppError::RecordAlreadyExists(format!( "Collab member with uid {} and object_id {} already exists", @@ -57,11 +61,11 @@ pub async fn create_collab_member( params.uid, ¶ms.object_id, ¶ms.access_level, - &mut txn, + &mut transaction, ) .await?; - txn + transaction .commit() .await .context("fail to commit the transaction to insert collab member")?; @@ -74,12 +78,12 @@ pub async fn upsert_collab_member( params: &UpdateCollabMemberParams, ) -> Result<(), AppError> { params.validate()?; - let mut txn = pg_pool + let mut transaction = pg_pool .begin() .await .context("acquire transaction to upsert collab member")?; - if !database::collab::is_collab_exists(¶ms.object_id, txn.deref_mut()).await? { + if !database::collab::is_collab_exists(¶ms.object_id, transaction.deref_mut()).await? { return Err(AppError::RecordNotFound(format!( "Fail to upsert collab member. The Collab with object_id {} does not exist", params.object_id @@ -90,11 +94,11 @@ pub async fn upsert_collab_member( params.uid, ¶ms.object_id, ¶ms.access_level, - &mut txn, + &mut transaction, ) .await?; - txn + transaction .commit() .await .context("fail to commit the transaction to upsert collab member")?; diff --git a/src/biz/collab/storage.rs b/src/biz/collab/storage.rs index fbf58656..d7e5c667 100644 --- a/src/biz/collab/storage.rs +++ b/src/biz/collab/storage.rs @@ -140,7 +140,7 @@ where // If the collab already exists, check if the user has enough permissions to update collab let level = self .access_control - .get_collab_access_level(uid, ¶ms.object_id, transaction.deref_mut()) + .get_or_refresh_collab_access_level(uid, ¶ms.object_id, transaction.deref_mut()) .await .context(format!( "Can't find the access level when user:{} try to insert collab", @@ -172,15 +172,17 @@ where uid, params.object_id ))); } - - self - .mem_cache - .cache_encoded_collab_bytes(¶ms.object_id, params.encoded_collab_v1.clone()) - .await; + let object_id = params.object_id.clone(); + let encoded_collab = params.encoded_collab_v1.clone(); self .disk_cache .upsert_collab_with_transaction(workspace_id, uid, params, transaction) - .await + .await?; + + self + .mem_cache + .cache_encoded_collab_bytes(object_id, encoded_collab); + Ok(()) } async fn get_collab_encoded( @@ -191,7 +193,7 @@ where params.validate()?; self .access_control - .get_collab_access_level(uid, ¶ms.object_id, &self.disk_cache.pg_pool) + .get_or_refresh_collab_access_level(uid, ¶ms.object_id, &self.disk_cache.pg_pool) .await?; let object_id = params.object_id.clone(); @@ -232,8 +234,7 @@ where let encoded_collab = self.disk_cache.get_collab_encoded(uid, params).await?; self .mem_cache - .cache_encoded_collab(&object_id, &encoded_collab) - .await; + .cache_encoded_collab(object_id, &encoded_collab); Ok(encoded_collab) }, } @@ -285,7 +286,7 @@ where async fn delete_collab(&self, uid: &i64, object_id: &str) -> DatabaseResult<()> { if !self .access_control - .get_collab_access_level(uid, object_id, &self.disk_cache.pg_pool) + .get_or_refresh_collab_access_level(uid, object_id, &self.disk_cache.pg_pool) .await .context(format!( "Can't find the access level when user:{} try to delete {}",