From 365c64c5c25fedfaedadb3c881773365cb617878 Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Mon, 2 Sep 2024 21:25:21 +0800 Subject: [PATCH] chore: use colla storage instead of select from db directly (#782) --- libs/database/src/collab/collab_storage.rs | 15 ++++--- .../src/index/collab_embeddings_ops.rs | 16 +++++++- .../appflowy-collaborate/src/collab/cache.rs | 22 +++------- .../src/collab/disk_cache.rs | 1 - .../appflowy-collaborate/src/collab/queue.rs | 1 - .../src/collab/storage.rs | 40 +++++++++++-------- .../appflowy-collaborate/src/group/manager.rs | 4 +- .../src/indexer/provider.rs | 16 ++++---- .../appflowy-collaborate/src/rt_server.rs | 12 ++++-- src/api/workspace.rs | 10 ++--- tests/collab/storage_test.rs | 4 +- 11 files changed, 81 insertions(+), 60 deletions(-) diff --git a/libs/database/src/collab/collab_storage.rs b/libs/database/src/collab/collab_storage.rs index 01a6c29c..5bf357f2 100644 --- a/libs/database/src/collab/collab_storage.rs +++ b/libs/database/src/collab/collab_storage.rs @@ -53,6 +53,11 @@ pub trait CollabStorageAccessControl: Send + Sync + 'static { ) -> Result; } +pub enum GetCollabOrigin { + User { uid: i64 }, + Server, +} + /// Represents a storage mechanism for collaborations. /// /// This trait provides asynchronous methods for CRUD operations related to collaborations. @@ -113,9 +118,9 @@ pub trait CollabStorage: Send + Sync + 'static { /// * `Result` - Returns the data of the collaboration if found, `Err` otherwise. async fn get_encode_collab( &self, - uid: &i64, + origin: GetCollabOrigin, params: QueryCollabParams, - is_collab_init: bool, + from_editing_collab: bool, ) -> AppResult; async fn batch_get_collab( @@ -208,13 +213,13 @@ where async fn get_encode_collab( &self, - uid: &i64, + origin: GetCollabOrigin, params: QueryCollabParams, - is_collab_init: bool, + from_editing_collab: bool, ) -> AppResult { self .as_ref() - .get_encode_collab(uid, params, is_collab_init) + .get_encode_collab(origin, params, from_editing_collab) .await } diff --git a/libs/database/src/index/collab_embeddings_ops.rs b/libs/database/src/index/collab_embeddings_ops.rs index 016a2642..ee60901b 100644 --- a/libs/database/src/index/collab_embeddings_ops.rs +++ b/libs/database/src/index/collab_embeddings_ops.rs @@ -5,7 +5,9 @@ use pgvector::Vector; use sqlx::{Error, Executor, Postgres, Transaction}; use uuid::Uuid; -use database_entity::dto::{AFCollabEmbeddingParams, IndexingStatus}; +use database_entity::dto::{ + AFCollabEmbeddingParams, IndexingStatus, QueryCollab, QueryCollabParams, +}; pub async fn get_index_status<'a, E>( tx: E, @@ -143,3 +145,15 @@ pub struct CollabId { pub workspace_id: Uuid, pub object_id: String, } + +impl From for QueryCollabParams { + fn from(value: CollabId) -> Self { + QueryCollabParams { + workspace_id: value.workspace_id.to_string(), + inner: QueryCollab { + object_id: value.object_id, + collab_type: value.collab_type, + }, + } + } +} diff --git a/services/appflowy-collaborate/src/collab/cache.rs b/services/appflowy-collaborate/src/collab/cache.rs index 5759e7ef..66183e4e 100644 --- a/services/appflowy-collaborate/src/collab/cache.rs +++ b/services/appflowy-collaborate/src/collab/cache.rs @@ -67,11 +67,7 @@ impl CollabCache { } } - pub async fn get_encode_collab( - &self, - uid: &i64, - query: QueryCollab, - ) -> Result { + pub async fn get_encode_collab(&self, query: QueryCollab) -> Result { self.total_attempts.fetch_add(1, Ordering::Relaxed); // Attempt to retrieve encoded collab from memory cache, falling back to disk cache if necessary. if let Some(encoded_collab) = self.mem_cache.get_encode_collab(&query.object_id).await { @@ -86,10 +82,7 @@ impl CollabCache { // Retrieve from disk cache as fallback. After retrieval, the value is inserted into the memory cache. let object_id = query.object_id.clone(); - let encode_collab = self - .disk_cache - .get_collab_encoded_from_disk(uid, query) - .await?; + let encode_collab = self.disk_cache.get_collab_encoded_from_disk(query).await?; // spawn a task to insert the encoded collab into the memory cache let cloned_encode_collab = encode_collab.clone(); @@ -180,13 +173,9 @@ impl CollabCache { pub async fn get_encode_collab_from_disk( &self, - uid: &i64, query: QueryCollab, ) -> Result { - let encode_collab = self - .disk_cache - .get_collab_encoded_from_disk(uid, query) - .await?; + let encode_collab = self.disk_cache.get_collab_encoded_from_disk(query).await?; Ok(encode_collab) } @@ -219,11 +208,11 @@ impl CollabCache { } pub fn query_state(&self) -> QueryState { - let successful_attempts = self.success_attempts.load(Ordering::Relaxed); + let success_attempts = self.success_attempts.load(Ordering::Relaxed); let total_attempts = self.total_attempts.load(Ordering::Relaxed); QueryState { total_attempts, - success_attempts: successful_attempts, + success_attempts, } } @@ -249,6 +238,7 @@ impl CollabCache { } } +#[derive(Debug)] pub struct QueryState { pub total_attempts: u64, pub success_attempts: u64, diff --git a/services/appflowy-collaborate/src/collab/disk_cache.rs b/services/appflowy-collaborate/src/collab/disk_cache.rs index e5619cbb..8fa66774 100644 --- a/services/appflowy-collaborate/src/collab/disk_cache.rs +++ b/services/appflowy-collaborate/src/collab/disk_cache.rs @@ -73,7 +73,6 @@ impl CollabDiskCache { #[instrument(level = "trace", skip_all)] pub async fn get_collab_encoded_from_disk( &self, - _uid: &i64, query: QueryCollab, ) -> Result { event!( diff --git a/services/appflowy-collaborate/src/collab/queue.rs b/services/appflowy-collaborate/src/collab/queue.rs index 99958aaf..bcabca87 100644 --- a/services/appflowy-collaborate/src/collab/queue.rs +++ b/services/appflowy-collaborate/src/collab/queue.rs @@ -100,7 +100,6 @@ impl StorageQueue { priority: WritePriority, ) -> Result<(), AppError> { trace!("queuing {} object to pending write queue", params.object_id,); - // TODO(nathan): compress the data before storing it in Redis self .collab_cache .insert_encode_collab_data_in_mem(params) diff --git a/services/appflowy-collaborate/src/collab/storage.rs b/services/appflowy-collaborate/src/collab/storage.rs index 097eda11..618ac23a 100644 --- a/services/appflowy-collaborate/src/collab/storage.rs +++ b/services/appflowy-collaborate/src/collab/storage.rs @@ -17,7 +17,9 @@ use crate::collab::cache::CollabCache; use crate::command::{CLCommandSender, CollaborationCommand}; use crate::shared_state::RealtimeSharedState; use app_error::AppError; -use database::collab::{AppResult, CollabMetadata, CollabStorage, CollabStorageAccessControl}; +use database::collab::{ + AppResult, CollabMetadata, CollabStorage, CollabStorageAccessControl, GetCollabOrigin, +}; use database_entity::dto::{ AFAccessLevel, AFSnapshotMeta, AFSnapshotMetas, CollabParams, InsertSnapshotParams, QueryCollab, QueryCollabParams, QueryCollabResult, SnapshotData, @@ -277,30 +279,34 @@ where Ok(()) } - #[instrument(level = "trace", skip_all, fields(oid = %params.object_id, is_collab_init = %is_collab_init))] + #[instrument(level = "trace", skip_all, fields(oid = %params.object_id, from_editing_collab = %from_editing_collab))] async fn get_encode_collab( &self, - uid: &i64, + origin: GetCollabOrigin, params: QueryCollabParams, - is_collab_init: bool, + from_editing_collab: bool, ) -> AppResult { params.validate()?; + match origin { + GetCollabOrigin::User { uid } => { + // Check if the user has enough permissions to access the collab + let can_read = self + .access_control + .enforce_read_collab(¶ms.workspace_id, &uid, ¶ms.object_id) + .await?; - // Check if the user has enough permissions to access the collab - let can_read = self - .access_control - .enforce_read_collab(¶ms.workspace_id, uid, ¶ms.object_id) - .await?; - - if !can_read { - return Err(AppError::NotEnoughPermissions { - user: uid.to_string(), - action: format!("read collab:{}", params.object_id), - }); + if !can_read { + return Err(AppError::NotEnoughPermissions { + user: uid.to_string(), + action: format!("read collab:{}", params.object_id), + }); + } + }, + GetCollabOrigin::Server => {}, } // Early return if editing collab is initialized, as it indicates no need to query further. - if !is_collab_init { + if from_editing_collab { // Attempt to retrieve encoded collab from the editing collab if let Some(value) = self.get_encode_collab_from_editing(¶ms.object_id).await { trace!( @@ -311,7 +317,7 @@ where } } - let encode_collab = self.cache.get_encode_collab(uid, params.inner).await?; + let encode_collab = self.cache.get_encode_collab(params.inner).await?; Ok(encode_collab) } diff --git a/services/appflowy-collaborate/src/group/manager.rs b/services/appflowy-collaborate/src/group/manager.rs index 338ae944..5b6022d8 100644 --- a/services/appflowy-collaborate/src/group/manager.rs +++ b/services/appflowy-collaborate/src/group/manager.rs @@ -16,7 +16,7 @@ use collab_rt_entity::CollabMessage; use collab_stream::client::{CollabRedisStream, CONTROL_STREAM_KEY}; use collab_stream::model::CollabControlEvent; use collab_stream::stream_group::StreamGroup; -use database::collab::CollabStorage; +use database::collab::{CollabStorage, GetCollabOrigin}; use database_entity::dto::QueryCollabParams; use crate::client::client_msg_router::ClientMessageRouter; @@ -271,7 +271,7 @@ where S: CollabStorage, { let encode_collab = storage - .get_encode_collab(&uid, params.clone(), true) + .get_encode_collab(GetCollabOrigin::User { uid }, params.clone(), false) .await?; let result = Collab::new_with_source( CollabOrigin::Server, diff --git a/services/appflowy-collaborate/src/indexer/provider.rs b/services/appflowy-collaborate/src/indexer/provider.rs index dd064acc..78616f3b 100644 --- a/services/appflowy-collaborate/src/indexer/provider.rs +++ b/services/appflowy-collaborate/src/indexer/provider.rs @@ -19,7 +19,7 @@ use crate::config::get_env_var; use crate::indexer::DocumentIndexer; use app_error::AppError; use appflowy_ai_client::client::AppFlowyAIClient; -use database::collab::select_blob_from_af_collab; +use database::collab::{CollabStorage, GetCollabOrigin}; use database::index::{get_collabs_without_embeddings, upsert_collab_embeddings}; use database::workspace::select_workspace_settings; use database_entity::dto::{AFCollabEmbeddingParams, AFCollabEmbeddings, CollabParams}; @@ -93,20 +93,22 @@ impl IndexerProvider { fn get_unindexed_collabs( &self, + storage: Arc, ) -> Pin> + Send>> { let db = self.db.clone(); + Box::pin(try_stream! { let collabs = get_collabs_without_embeddings(&db).await?; - if !collabs.is_empty() { tracing::trace!("found {} unindexed collabs", collabs.len()); } for cid in collabs { match &cid.collab_type { CollabType::Document => { - let collab = - select_blob_from_af_collab(&db, &CollabType::Document, &cid.object_id).await?; - let collab = EncodedCollab::decode_from_bytes(&collab)?; + let collab = storage + .get_encode_collab(GetCollabOrigin::Server, cid.clone().into(), false) + .await?; + yield UnindexedCollab { workspace_id: cid.workspace_id, object_id: cid.object_id, @@ -125,8 +127,8 @@ impl IndexerProvider { }) } - pub async fn handle_unindexed_collabs(indexer: Arc) { - let mut stream = indexer.get_unindexed_collabs(); + pub async fn handle_unindexed_collabs(indexer: Arc, storage: Arc) { + let mut stream = indexer.get_unindexed_collabs(storage); while let Some(result) = stream.next().await { match result { Ok(collab) => { diff --git a/services/appflowy-collaborate/src/rt_server.rs b/services/appflowy-collaborate/src/rt_server.rs index 4422f25a..ca1f09f1 100644 --- a/services/appflowy-collaborate/src/rt_server.rs +++ b/services/appflowy-collaborate/src/rt_server.rs @@ -93,7 +93,7 @@ where spawn_metrics(metrics.clone(), storage.clone()); - spawn_handle_unindexed_collabs(indexer_provider); + spawn_handle_unindexed_collabs(indexer_provider, storage.clone()); Ok(Self { storage, @@ -271,8 +271,14 @@ where } } -fn spawn_handle_unindexed_collabs(indexer_provider: Arc) { - tokio::spawn(IndexerProvider::handle_unindexed_collabs(indexer_provider)); +fn spawn_handle_unindexed_collabs( + indexer_provider: Arc, + storage: Arc, +) { + tokio::spawn(IndexerProvider::handle_unindexed_collabs( + indexer_provider, + storage, + )); } fn spawn_period_check_inactive_group( diff --git a/src/api/workspace.rs b/src/api/workspace.rs index c3250021..5b94b829 100644 --- a/src/api/workspace.rs +++ b/src/api/workspace.rs @@ -24,7 +24,7 @@ use authentication::jwt::{OptionalUserUuid, UserUuid}; use collab_rt_entity::realtime_proto::HttpRealtimeMessage; use collab_rt_entity::RealtimeMessage; use collab_rt_protocol::validate_encode_collab; -use database::collab::CollabStorage; +use database::collab::{CollabStorage, GetCollabOrigin}; use database::user::select_uid_from_email; use database_entity::dto::*; use shared_entity::dto::workspace_dto::*; @@ -764,7 +764,7 @@ async fn get_collab_handler( let object_id = params.object_id.clone(); let encode_collab = state .collab_access_control_storage - .get_encode_collab(&uid, params, false) + .get_encode_collab(GetCollabOrigin::User { uid }, params, true) .await .map_err(AppResponseError::from)?; @@ -800,7 +800,7 @@ async fn v1_get_collab_handler( let encode_collab = state .collab_access_control_storage - .get_encode_collab(&uid, param, false) + .get_encode_collab(GetCollabOrigin::User { uid }, param, true) .await .map_err(AppResponseError::from)?; @@ -845,9 +845,9 @@ async fn create_collab_snapshot_handler( let encoded_collab_v1 = state .collab_access_control_storage .get_encode_collab( - &uid, + GetCollabOrigin::User { uid }, QueryCollabParams::new(&object_id, collab_type.clone(), &workspace_id), - false, + true, ) .await? .encode_to_bytes() diff --git a/tests/collab/storage_test.rs b/tests/collab/storage_test.rs index f2f59be1..6c44eef3 100644 --- a/tests/collab/storage_test.rs +++ b/tests/collab/storage_test.rs @@ -491,7 +491,7 @@ async fn simulate_small_data_set_write(pool: PgPool) { collab_type: params.collab_type.clone(), }; let encode_collab_from_disk = collab_cache - .get_encode_collab_from_disk(&user.uid, query) + .get_encode_collab_from_disk(query) .await .unwrap(); @@ -558,7 +558,7 @@ async fn simulate_large_data_set_write(pool: PgPool) { collab_type: params.collab_type.clone(), }; let encode_collab_from_disk = collab_cache - .get_encode_collab_from_disk(&user.uid, query) + .get_encode_collab_from_disk(query) .await .unwrap(); assert_eq!(