From 089b3046ab1cea807c0170ce58f94971a5b86212 Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Sun, 8 Oct 2023 23:53:16 +0800 Subject: [PATCH] chore: implement batch get (#106) * chore: implement batch get * chore: add request id and update the local_server.sh * chore: update collab commit id --- ...ca0f0a2877738144afbd82844d75c4ea0ea8e.json | 29 +++++ Cargo.lock | 5 +- Cargo.toml | 5 +- build/run_local_server.sh | 15 ++- libs/client-api/src/http.rs | 22 +++- libs/database-entity/src/lib.rs | 29 ++++- libs/database/src/collab.rs | 16 ++- libs/database/src/collab_db_ops.rs | 73 +++++++++++- src/api/{collaborate.rs => collab_data.rs} | 36 +++++- src/api/mod.rs | 4 +- src/component/storage_proxy.rs | 44 ++++++- tests/collab/storage_test.rs | 109 +++++++++++++++++- 12 files changed, 364 insertions(+), 23 deletions(-) create mode 100644 .sqlx/query-a7f47366a4016e10dfe9195f865ca0f0a2877738144afbd82844d75c4ea0ea8e.json rename src/api/{collaborate.rs => collab_data.rs} (74%) diff --git a/.sqlx/query-a7f47366a4016e10dfe9195f865ca0f0a2877738144afbd82844d75c4ea0ea8e.json b/.sqlx/query-a7f47366a4016e10dfe9195f865ca0f0a2877738144afbd82844d75c4ea0ea8e.json new file mode 100644 index 00000000..8c24a41f --- /dev/null +++ b/.sqlx/query-a7f47366a4016e10dfe9195f865ca0f0a2877738144afbd82844d75c4ea0ea8e.json @@ -0,0 +1,29 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT oid, blob\n FROM af_collab\n WHERE oid = ANY($1) AND partition_key = $2 AND deleted_at IS NULL;\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "oid", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "blob", + "type_info": "Bytea" + } + ], + "parameters": { + "Left": [ + "TextArray", + "Int4" + ] + }, + "nullable": [ + false, + false + ] + }, + "hash": "a7f47366a4016e10dfe9195f865ca0f0a2877738144afbd82844d75c4ea0ea8e" +} diff --git a/Cargo.lock b/Cargo.lock index 9b983960..e7ba63cc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -466,6 +466,7 @@ dependencies = [ "gotrue", "gotrue-entity", "infra", + "itertools", "jsonwebtoken", "lazy_static", "mime", @@ -866,7 +867,7 @@ dependencies = [ [[package]] name = "collab" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=4a4df0b#4a4df0b287f197db99a02748548a5c0836c91588" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=752bff0#752bff0b6db0419642478472ecdeb7c44a321510" dependencies = [ "anyhow", "async-trait", @@ -885,7 +886,7 @@ dependencies = [ [[package]] name = "collab-define" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=4a4df0b#4a4df0b287f197db99a02748548a5c0836c91588" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=752bff0#752bff0b6db0419642478472ecdeb7c44a321510" dependencies = [ "anyhow", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 9c65b4b8..c8950e7e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,6 +78,7 @@ gotrue = { path = "libs/gotrue" } gotrue-entity = { path = "libs/gotrue-entity" } infra = { path = "libs/infra" } shared_entity = { path = "libs/shared-entity", features = ["cloud"] } +itertools = "0.11" [dev-dependencies] @@ -126,8 +127,8 @@ lto = false opt-level = 3 [patch.crates-io] -collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "4a4df0b" } -collab-define = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "4a4df0b" } +collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "752bff0" } +collab-define = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "752bff0" } # Comment the above and uncomment the below to use local version of collab by cloning the repo and placing it in libs folder #collab = { path = "libs/AppFlowy-Collab/collab" } diff --git a/build/run_local_server.sh b/build/run_local_server.sh index c6b8afce..439c7f4e 100755 --- a/build/run_local_server.sh +++ b/build/run_local_server.sh @@ -53,8 +53,21 @@ then cargo sqlx prepare --workspace fi +# Maximum number of restart attempts +MAX_RESTARTS=5 +RESTARTS=0 +# Start the server and restart it on failure +while [ "$RESTARTS" -lt "$MAX_RESTARTS" ]; do + RUST_LOG=trace cargo run && break + RESTARTS=$((RESTARTS+1)) + echo "Server crashed! Attempting to restart ($RESTARTS/$MAX_RESTARTS)" + sleep 5 +done -RUST_LOG=trace cargo run & +if [ "$RESTARTS" -eq "$MAX_RESTARTS" ]; then + echo "Server failed to start after $MAX_RESTARTS attempts, exiting." + exit 1 +fi # revert to require signup email verification export GOTRUE_MAILER_AUTOCONFIRM=false diff --git a/libs/client-api/src/http.rs b/libs/client-api/src/http.rs index 71ea18b6..2544efd4 100644 --- a/libs/client-api/src/http.rs +++ b/libs/client-api/src/http.rs @@ -25,7 +25,10 @@ use tracing::instrument; use gotrue_entity::{AccessTokenResponse, User}; use crate::notify::{ClientToken, TokenStateReceiver}; -use database_entity::{AFUserProfileView, AFWorkspaceMember, InsertCollabParams}; +use database_entity::{ + AFUserProfileView, AFWorkspaceMember, BatchQueryCollabParams, BatchQueryCollabResult, + InsertCollabParams, +}; use database_entity::{AFWorkspaces, QueryCollabParams}; use database_entity::{DeleteCollabParams, RawData}; use shared_entity::app_error::AppError; @@ -517,6 +520,23 @@ impl Client { .into_data() } + #[instrument(level = "debug", skip_all, err)] + pub async fn batch_get_collab( + &self, + params: BatchQueryCollabParams, + ) -> Result { + let url = format!("{}/api/collab/list", self.base_url); + let resp = self + .http_client_with_auth(Method::GET, &url) + .await? + .json(¶ms) + .send() + .await?; + AppResponse::::from_response(resp) + .await? + .into_data() + } + #[instrument(level = "debug", skip_all, err)] pub async fn delete_collab(&self, params: DeleteCollabParams) -> Result<(), AppError> { let url = format!("{}/api/collab/", self.base_url); diff --git a/libs/database-entity/src/lib.rs b/libs/database-entity/src/lib.rs index 0b47f01c..e6d1ef40 100644 --- a/libs/database-entity/src/lib.rs +++ b/libs/database-entity/src/lib.rs @@ -2,7 +2,8 @@ use chrono::{DateTime, Utc}; use collab_define::CollabType; use serde::{Deserialize, Serialize}; use sqlx::FromRow; -use std::ops::Deref; +use std::collections::HashMap; +use std::ops::{Deref, DerefMut}; use validator::{Validate, ValidationError}; pub type RawData = Vec; @@ -92,6 +93,23 @@ pub struct QueryCollabParams { pub collab_type: CollabType, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BatchQueryCollabParams(pub Vec); + +impl Deref for BatchQueryCollabParams { + type Target = Vec; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for BatchQueryCollabParams { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AFCollabSnapshot { pub snapshot_id: i64, @@ -211,3 +229,12 @@ impl AFFileMetadata { format!("{}/{}", self.owner_uid, self.path) } } + +#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)] +pub enum QueryCollabResult { + Success { blob: RawData }, + Failed { error: String }, +} + +#[derive(Serialize, Deserialize)] +pub struct BatchQueryCollabResult(pub HashMap); diff --git a/libs/database/src/collab.rs b/libs/database/src/collab.rs index 0b2f2815..4c387ba0 100644 --- a/libs/database/src/collab.rs +++ b/libs/database/src/collab.rs @@ -6,10 +6,11 @@ use collab_define::CollabType; use database_entity::database_error::DatabaseError; use database_entity::{ AFCollabSnapshots, InsertCollabParams, InsertSnapshotParams, QueryCollabParams, - QueryObjectSnapshotParams, QuerySnapshotParams, RawData, + QueryCollabResult, QueryObjectSnapshotParams, QuerySnapshotParams, RawData, }; use sqlx::types::Uuid; use sqlx::PgPool; +use std::collections::HashMap; use std::sync::Weak; use validator::Validate; @@ -58,6 +59,11 @@ pub trait CollabStorage: Clone + Send + Sync + 'static { /// * `Result` - Returns the data of the collaboration if found, `Err` otherwise. async fn get_collab(&self, params: QueryCollabParams) -> Result; + async fn batch_get_collab( + &self, + queries: Vec, + ) -> HashMap; + /// Deletes a collaboration from the storage. /// /// # Arguments @@ -137,7 +143,6 @@ impl CollabStorage for CollabPostgresDBStorageImpl { async fn get_collab(&self, params: QueryCollabParams) -> Result { params.validate()?; - match collab_db_ops::get_collab_blob(&self.pg_pool, ¶ms.collab_type, ¶ms.object_id) .await { @@ -152,6 +157,13 @@ impl CollabStorage for CollabPostgresDBStorageImpl { } } + async fn batch_get_collab( + &self, + queries: Vec, + ) -> HashMap { + collab_db_ops::batch_get_collab_blob(&self.pg_pool, queries).await + } + async fn delete_collab(&self, object_id: &str) -> Result<()> { collab_db_ops::delete_collab(&self.pg_pool, object_id).await?; Ok(()) diff --git a/libs/database/src/collab_db_ops.rs b/libs/database/src/collab_db_ops.rs index 8565e626..36465afd 100644 --- a/libs/database/src/collab_db_ops.rs +++ b/libs/database/src/collab_db_ops.rs @@ -1,12 +1,14 @@ -use collab_define::CollabType; -use std::{ops::DerefMut, str::FromStr}; - use anyhow::Context; +use collab_define::CollabType; use database_entity::{ database_error::DatabaseError, AFCollabSnapshot, AFCollabSnapshots, InsertCollabParams, + QueryCollabParams, QueryCollabResult, RawData, }; + use sqlx::{PgPool, Transaction}; -use tracing::trace; +use std::collections::HashMap; +use std::{ops::DerefMut, str::FromStr}; +use tracing::{error, trace}; use uuid::Uuid; pub async fn collab_exists(pg_pool: &PgPool, oid: &str) -> Result { @@ -154,6 +156,69 @@ pub async fn get_collab_blob( .await } +pub async fn batch_get_collab_blob( + pg_pool: &PgPool, + queries: Vec, +) -> HashMap { + let mut results = HashMap::new(); + let mut object_ids_by_collab_type: HashMap> = HashMap::new(); + for params in queries { + object_ids_by_collab_type + .entry(params.collab_type) + .or_default() + .push(params.object_id); + } + + for (collab_type, mut object_ids) in object_ids_by_collab_type.into_iter() { + let partition_key = collab_type.value(); + let par_results: Result, sqlx::Error> = sqlx::query_as!( + QueryCollabData, + r#" + SELECT oid, blob + FROM af_collab + WHERE oid = ANY($1) AND partition_key = $2 AND deleted_at IS NULL; + "#, + &object_ids, + partition_key, + ) + .fetch_all(pg_pool) + .await; + + match par_results { + Ok(par_results) => { + object_ids.retain(|oid| !par_results.iter().any(|par_result| par_result.oid == *oid)); + + results.extend(par_results.into_iter().map(|par_result| { + ( + par_result.oid, + QueryCollabResult::Success { + blob: par_result.blob, + }, + ) + })); + + results.extend(object_ids.into_iter().map(|oid| { + ( + oid, + QueryCollabResult::Failed { + error: "Record not found".to_string(), + }, + ) + })); + }, + Err(err) => error!("Batch get collab errors: {}", err), + } + } + + results +} + +#[derive(Debug, sqlx::FromRow)] +struct QueryCollabData { + oid: String, + blob: RawData, +} + pub async fn delete_collab(pg_pool: &PgPool, object_id: &str) -> Result<(), sqlx::Error> { sqlx::query!( r#" diff --git a/src/api/collaborate.rs b/src/api/collab_data.rs similarity index 74% rename from src/api/collaborate.rs rename to src/api/collab_data.rs index f4bbeed2..a6b4a0cc 100644 --- a/src/api/collaborate.rs +++ b/src/api/collab_data.rs @@ -6,15 +6,17 @@ use actix_web::web::{Data, Json}; use actix_web::Result; use actix_web::{web, Scope}; use database::collab::CollabStorage; + use database_entity::database_error::DatabaseError; use database_entity::{ - AFCollabSnapshots, DeleteCollabParams, InsertCollabParams, QueryCollabParams, - QueryObjectSnapshotParams, QuerySnapshotParams, RawData, + AFCollabSnapshots, BatchQueryCollabParams, BatchQueryCollabResult, DeleteCollabParams, + InsertCollabParams, QueryCollabParams, QueryObjectSnapshotParams, QuerySnapshotParams, RawData, }; use shared_entity::app_error::AppError; use shared_entity::data::AppResponse; use shared_entity::error_code::ErrorCode; use tracing::{debug, instrument}; +use tracing_actix_web::RequestId; pub fn collab_scope() -> Scope { web::scope("/api/collab") @@ -25,13 +27,15 @@ pub fn collab_scope() -> Scope { .route(web::put().to(update_collab_handler)) .route(web::delete().to(delete_collab_handler)), ) + .service(web::resource("list").route(web::get().to(batch_get_collab_handler))) .service(web::resource("snapshot").route(web::get().to(retrieve_snapshot_data_handler))) .service(web::resource("snapshots").route(web::get().to(retrieve_snapshots_handler))) } -#[instrument(skip_all, err)] +#[instrument(skip(state, payload), err)] async fn create_collab_handler( user_uuid: UserUuid, + required_id: RequestId, payload: Json, state: Data, ) -> Result>> { @@ -39,9 +43,10 @@ async fn create_collab_handler( Ok(Json(AppResponse::Ok())) } -#[instrument(skip(storage), err)] +#[instrument(skip(storage, payload), err)] async fn get_collab_handler( user_uuid: UserUuid, + required_id: RequestId, payload: Json, storage: Data>, ) -> Result>> { @@ -60,9 +65,27 @@ async fn get_collab_handler( Ok(Json(AppResponse::Ok().with_data(data))) } -#[instrument(skip_all, err)] +#[instrument(skip(storage, payload), err)] +async fn batch_get_collab_handler( + user_uuid: UserUuid, + required_id: RequestId, + payload: Json, + storage: Data>, +) -> Result>> { + // TODO: access control for user_uuid + let result = BatchQueryCollabResult( + storage + .collab_storage + .batch_get_collab(payload.into_inner().0) + .await, + ); + Ok(Json(AppResponse::Ok().with_data(result))) +} + +#[instrument(skip(state, payload), err)] async fn update_collab_handler( user_uuid: UserUuid, + required_id: RequestId, payload: Json, state: Data, ) -> Result>> { @@ -70,9 +93,10 @@ async fn update_collab_handler( Ok(AppResponse::Ok().into()) } -#[instrument(level = "info", skip_all, err)] +#[instrument(level = "info", skip(state, payload), err)] async fn delete_collab_handler( user_uuid: UserUuid, + required_id: RequestId, payload: Json, state: Data, ) -> Result>> { diff --git a/src/api/mod.rs b/src/api/mod.rs index f083ecf6..dd49ca34 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -1,10 +1,10 @@ -mod collaborate; +mod collab_data; mod file_storage; mod user; mod workspace; mod ws; -pub use collaborate::collab_scope; +pub use collab_data::collab_scope; pub use file_storage::file_storage_scope; pub use user::user_scope; pub use workspace::workspace_scope; diff --git a/src/component/storage_proxy.rs b/src/component/storage_proxy.rs index 7c28687f..3c888f44 100644 --- a/src/component/storage_proxy.rs +++ b/src/component/storage_proxy.rs @@ -3,14 +3,17 @@ use collab::core::collab::MutexCollab; use database::collab::{CollabPostgresDBStorageImpl, CollabStorage, StorageConfig}; use database_entity::{ AFCollabSnapshots, InsertCollabParams, InsertSnapshotParams, QueryCollabParams, - QueryObjectSnapshotParams, QuerySnapshotParams, RawData, + QueryCollabResult, QueryObjectSnapshotParams, QuerySnapshotParams, RawData, }; +use itertools::{Either, Itertools}; use std::{ collections::HashMap, sync::{Arc, Weak}, }; use tokio::sync::RwLock; use tracing::info; +use validator::Validate; + #[derive(Clone)] pub struct CollabStorageProxy { inner: CollabPostgresDBStorageImpl, @@ -71,6 +74,45 @@ impl CollabStorage for CollabStorageProxy { } } + async fn batch_get_collab( + &self, + queries: Vec, + ) -> HashMap { + let (valid_queries, mut results): (Vec<_>, HashMap<_, _>) = + queries + .into_iter() + .partition_map(|params| match params.validate() { + Ok(_) => Either::Left(params), + Err(err) => Either::Right(( + params.object_id, + QueryCollabResult::Failed { + error: err.to_string(), + }, + )), + }); + + let read_guard = self.collab_by_object_id.read().await; + let (results_from_memory, queries): (HashMap<_, _>, Vec<_>) = + valid_queries.into_iter().partition_map(|params| { + match read_guard + .get(¶ms.object_id) + .and_then(|collab| collab.upgrade()) + { + Some(collab) => Either::Left(( + params.object_id, + QueryCollabResult::Success { + blob: collab.encode_as_update_v1().0, + }, + )), + None => Either::Right(params), + } + }); + + results.extend(results_from_memory); + results.extend(self.inner.batch_get_collab(queries).await); + results + } + async fn delete_collab(&self, object_id: &str) -> database::collab::Result<()> { self.inner.delete_collab(object_id).await } diff --git a/tests/collab/storage_test.rs b/tests/collab/storage_test.rs index e85c44be..3c652aa8 100644 --- a/tests/collab/storage_test.rs +++ b/tests/collab/storage_test.rs @@ -1,9 +1,13 @@ use crate::{ collab::workspace_id_from_client, user::utils::generate_unique_registered_user_client, }; +use std::collections::HashMap; use collab_define::CollabType; -use database_entity::{DeleteCollabParams, InsertCollabParams, QueryCollabParams}; +use database_entity::{ + BatchQueryCollabParams, DeleteCollabParams, InsertCollabParams, QueryCollabParams, + QueryCollabResult, +}; use shared_entity::error_code::ErrorCode; use sqlx::types::Uuid; @@ -33,6 +37,109 @@ async fn success_insert_collab_test() { assert_eq!(bytes, raw_data); } +#[tokio::test] +async fn success_batch_get_collab_test() { + let (c, _user) = generate_unique_registered_user_client().await; + let workspace_id = workspace_id_from_client(&c).await; + let queries = BatchQueryCollabParams(vec![ + QueryCollabParams { + object_id: Uuid::new_v4().to_string(), + collab_type: CollabType::Document, + }, + QueryCollabParams { + object_id: Uuid::new_v4().to_string(), + collab_type: CollabType::Folder, + }, + QueryCollabParams { + object_id: Uuid::new_v4().to_string(), + collab_type: CollabType::Database, + }, + ]); + + let mut expected_results = HashMap::new(); + for i in 0..3 { + let object_id = queries.0[i].object_id.clone(); + let collab_type = queries.0[i].collab_type.clone(); + let raw_data = format!("hello world {}", i).as_bytes().to_vec(); + + expected_results.insert( + object_id.clone(), + QueryCollabResult::Success { + blob: raw_data.clone(), + }, + ); + + c.create_collab(InsertCollabParams::new( + &object_id, + collab_type, + raw_data.clone(), + workspace_id.clone(), + )) + .await + .unwrap(); + } + + let results = c.batch_get_collab(queries).await.unwrap().0; + for (object_id, result) in expected_results.iter() { + assert_eq!(result, results.get(object_id).unwrap()); + } +} + +#[tokio::test] +async fn success_part_batch_get_collab_test() { + let (c, _user) = generate_unique_registered_user_client().await; + let workspace_id = workspace_id_from_client(&c).await; + let queries = BatchQueryCollabParams(vec![ + QueryCollabParams { + object_id: Uuid::new_v4().to_string(), + collab_type: CollabType::Document, + }, + QueryCollabParams { + object_id: Uuid::new_v4().to_string(), + collab_type: CollabType::Folder, + }, + QueryCollabParams { + object_id: Uuid::new_v4().to_string(), + collab_type: CollabType::Database, + }, + ]); + + let mut expected_results = HashMap::new(); + for i in 0..3 { + let object_id = queries.0[i].object_id.clone(); + let collab_type = queries.0[i].collab_type.clone(); + let raw_data = format!("hello world {}", i).as_bytes().to_vec(); + if i == 1 { + expected_results.insert( + object_id.clone(), + QueryCollabResult::Failed { + error: "Record not found".to_string(), + }, + ); + } else { + expected_results.insert( + object_id.clone(), + QueryCollabResult::Success { + blob: raw_data.clone(), + }, + ); + c.create_collab(InsertCollabParams::new( + &object_id, + collab_type, + raw_data.clone(), + workspace_id.clone(), + )) + .await + .unwrap(); + } + } + + let results = c.batch_get_collab(queries).await.unwrap().0; + for (object_id, result) in expected_results.iter() { + assert_eq!(result, results.get(object_id).unwrap()); + } +} + #[tokio::test] async fn success_delete_collab_test() { let (c, _user) = generate_unique_registered_user_client().await;