From a6eb8607c9ca7a8f992bcf70e427580fa1287dc3 Mon Sep 17 00:00:00 2001 From: Zack <33050391+speed2exe@users.noreply.github.com> Date: Thu, 28 Sep 2023 14:53:41 +0800 Subject: [PATCH] feat: async read for file upload (#80) * feat: async read for file upload * feat: async read for file download --------- Co-authored-by: nathan --- ...d93cc3933954854a4862bd235dd5cdccbe10.json} | 6 +- Cargo.lock | 7 +- Cargo.toml | 4 +- build/run_local_server.sh | 5 -- libs/client-api/Cargo.toml | 1 + libs/client-api/src/http.rs | 20 ++++-- libs/storage/src/file_storage.rs | 15 ++-- libs/storage/src/lib.rs | 1 + libs/storage/src/user.rs | 61 ++++++++++++++++ libs/storage/src/workspace.rs | 60 ---------------- src/api/file_storage.rs | 26 +++++-- src/biz/file_storage.rs | 71 +++++++++++++------ src/biz/mod.rs | 1 + src/biz/user.rs | 5 +- src/biz/utils.rs | 44 ++++++++++++ 15 files changed, 214 insertions(+), 113 deletions(-) rename .sqlx/{query-f63968cfcf5a7c9bd3517573213dcf40388fe754fc3204789cf4aff3da5d00b2.json => query-180e896f2f540e16284f7adc687dd93cc3933954854a4862bd235dd5cdccbe10.json} (65%) create mode 100644 libs/storage/src/user.rs create mode 100644 src/biz/utils.rs diff --git a/.sqlx/query-f63968cfcf5a7c9bd3517573213dcf40388fe754fc3204789cf4aff3da5d00b2.json b/.sqlx/query-180e896f2f540e16284f7adc687dd93cc3933954854a4862bd235dd5cdccbe10.json similarity index 65% rename from .sqlx/query-f63968cfcf5a7c9bd3517573213dcf40388fe754fc3204789cf4aff3da5d00b2.json rename to .sqlx/query-180e896f2f540e16284f7adc687dd93cc3933954854a4862bd235dd5cdccbe10.json index b2e41071..06423f8a 100644 --- a/.sqlx/query-f63968cfcf5a7c9bd3517573213dcf40388fe754fc3204789cf4aff3da5d00b2.json +++ b/.sqlx/query-180e896f2f540e16284f7adc687dd93cc3933954854a4862bd235dd5cdccbe10.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n INSERT INTO af_file_metadata (owner_uid, path, file_type, file_size)\n SELECT uid, $2, $3, $4\n FROM af_user\n WHERE uuid = $1\n ON CONFLICT (owner_uid, path) DO UPDATE SET\n file_type = $3,\n file_size = $4\n RETURNING *\n ", + "query": "\n INSERT INTO af_file_metadata\n (owner_uid, path, file_type, file_size)\n VALUES ($1, $2, $3, $4)\n ON CONFLICT (owner_uid, path) DO UPDATE SET\n file_type = $3,\n file_size = $4\n RETURNING *\n ", "describe": { "columns": [ { @@ -31,7 +31,7 @@ ], "parameters": { "Left": [ - "Uuid", + "Int8", "Varchar", "Varchar", "Int8" @@ -45,5 +45,5 @@ false ] }, - "hash": "f63968cfcf5a7c9bd3517573213dcf40388fe754fc3204789cf4aff3da5d00b2" + "hash": "180e896f2f540e16284f7adc687dd93cc3933954854a4862bd235dd5cdccbe10" } diff --git a/Cargo.lock b/Cargo.lock index 1d7b9963..f4b6d220 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -460,6 +460,7 @@ dependencies = [ "derive_more", "dotenv", "fancy-regex", + "futures", "futures-util", "gotrue", "gotrue-entity", @@ -489,6 +490,7 @@ dependencies = [ "token", "tokio", "tokio-stream", + "tokio-util", "tracing", "tracing-actix-web", "tracing-bunyan-formatter", @@ -836,6 +838,7 @@ dependencies = [ "collab", "collab-define", "collab-sync-protocol", + "futures-core", "futures-util", "gotrue", "gotrue-entity", @@ -4027,9 +4030,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.8" +version = "0.7.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" +checksum = "1d68074620f57a0b21594d9735eb2e98ab38b17f80d3fcb189fca266771ca60d" dependencies = [ "bytes", "futures-core", diff --git a/Cargo.toml b/Cargo.toml index 8e09bdf8..5ba6da0a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,9 @@ tokio = { version = "1.26.0", features = [ "time", ] } tokio-stream = "0.1.14" -futures-util = "0.3.26" +tokio-util = { version = "0.7.9", features = ["io"] } +futures = "0.3.17" +futures-util ={ version = "0.3.26" , features = ["std","io"] } config = { version = "0.13.3", default-features = false, features = ["yaml"] } once_cell = "1.13.0" chrono = { version = "0.4.23", features = ["serde", "clock"], default-features = false } diff --git a/build/run_local_server.sh b/build/run_local_server.sh index 09c94c0a..847fccaa 100755 --- a/build/run_local_server.sh +++ b/build/run_local_server.sh @@ -45,11 +45,6 @@ pkill -f appflowy_cloud || true cargo sqlx database create && cargo sqlx migrate run && cargo sqlx prepare --workspace RUST_LOG=trace cargo run & -# sometimes the gotrue server may not be ready yet -sleep 1 -# created registered user -./build/init_registered_user.sh - # revert to require signup email verification export GOTRUE_MAILER_AUTOCONFIRM=false docker-compose --file ./docker-compose-dev.yml up -d diff --git a/libs/client-api/Cargo.toml b/libs/client-api/Cargo.toml index 2d6684d7..15742a20 100644 --- a/libs/client-api/Cargo.toml +++ b/libs/client-api/Cargo.toml @@ -27,6 +27,7 @@ serde = { version = "1.0", features = ["derive"] } tokio-tungstenite = { version = "0.20.1" } tokio = { version = "1.26", features = ["full"] } futures-util = "0.3.26" +futures-core = "0.3.26" tokio-retry = "0.3" bytes = "1.0" uuid = "1.4.1" diff --git a/libs/client-api/src/http.rs b/libs/client-api/src/http.rs index 0e8d2267..b2e0bf9c 100644 --- a/libs/client-api/src/http.rs +++ b/libs/client-api/src/http.rs @@ -1,5 +1,6 @@ use anyhow::anyhow; use bytes::Bytes; +use futures_util::StreamExt; use gotrue::grant::Grant; use gotrue::grant::PasswordGrant; use gotrue::grant::RefreshTokenGrant; @@ -463,7 +464,10 @@ impl Client { AppResponse::<()>::from_response(resp).await?.into_error() } - pub async fn get_file_storage_object(&self, path: &str) -> Result { + pub async fn get_file_storage_object_stream( + &self, + path: &str, + ) -> Result>, AppError> { let url = format!("{}/api/file_storage/{}", self.base_url, path); let resp = self .http_client_with_auth(Method::GET, &url) @@ -471,10 +475,7 @@ impl Client { .send() .await?; match resp.status() { - reqwest::StatusCode::OK => { - let bytes = resp.bytes().await?; - Ok(bytes) - }, + reqwest::StatusCode::OK => Ok(resp.bytes_stream()), reqwest::StatusCode::NOT_FOUND => Err(ErrorCode::FileNotFound.into()), c => Err(AppError::new( ErrorCode::Unhandled, @@ -483,6 +484,15 @@ impl Client { } } + pub async fn get_file_storage_object(&self, path: &str) -> Result { + let mut acc: Vec = Vec::new(); + let mut stream = self.get_file_storage_object_stream(path).await?; + while let Some(raw_bytes) = stream.next().await { + acc.extend_from_slice(&raw_bytes?); + } + Ok(Bytes::from(acc)) + } + pub async fn delete_file_storage_object(&self, path: &str) -> Result<(), AppError> { let url = format!("{}/api/file_storage/{}", self.base_url, path); let resp = self diff --git a/libs/storage/src/file_storage.rs b/libs/storage/src/file_storage.rs index cb3ce6ee..81ee7337 100644 --- a/libs/storage/src/file_storage.rs +++ b/libs/storage/src/file_storage.rs @@ -2,8 +2,8 @@ use sqlx::{PgPool, Transaction}; use storage_entity::AFFileMetadata; pub async fn insert_file_metadata( - trans: &mut Transaction<'_, sqlx::Postgres>, - user: &uuid::Uuid, + pg_pool: &PgPool, + owner_uid: i64, path: &str, file_type: &str, file_size: i64, @@ -11,21 +11,20 @@ pub async fn insert_file_metadata( sqlx::query_as!( AFFileMetadata, r#" - INSERT INTO af_file_metadata (owner_uid, path, file_type, file_size) - SELECT uid, $2, $3, $4 - FROM af_user - WHERE uuid = $1 + INSERT INTO af_file_metadata + (owner_uid, path, file_type, file_size) + VALUES ($1, $2, $3, $4) ON CONFLICT (owner_uid, path) DO UPDATE SET file_type = $3, file_size = $4 RETURNING * "#, - user, + owner_uid, path, file_type, file_size ) - .fetch_one(trans.as_mut()) + .fetch_one(pg_pool) .await } diff --git a/libs/storage/src/lib.rs b/libs/storage/src/lib.rs index c4eb536d..59b1b500 100644 --- a/libs/storage/src/lib.rs +++ b/libs/storage/src/lib.rs @@ -1,4 +1,5 @@ pub mod collab; pub mod error; pub mod file_storage; +pub mod user; pub mod workspace; diff --git a/libs/storage/src/user.rs b/libs/storage/src/user.rs new file mode 100644 index 00000000..65a5585f --- /dev/null +++ b/libs/storage/src/user.rs @@ -0,0 +1,61 @@ +use sqlx::PgPool; + +pub async fn update_user_name( + pool: &PgPool, + uuid: &uuid::Uuid, + name: &str, +) -> Result<(), sqlx::Error> { + sqlx::query!( + r#" + UPDATE af_user + SET name = $1 + WHERE uuid = $2 + "#, + name, + uuid + ) + .execute(pool) + .await?; + Ok(()) +} + +pub async fn create_user_if_not_exists( + pool: &PgPool, + user_uuid: &uuid::Uuid, + email: &str, + name: &str, +) -> Result { + let affected_rows = sqlx::query!( + r#" + INSERT INTO af_user (uuid, email, name) + SELECT $1, $2, $3 + WHERE NOT EXISTS ( + SELECT 1 FROM public.af_user WHERE email = $2 + ) + AND NOT EXISTS ( + SELECT 1 FROM public.af_user WHERE uuid = $1 + ) + "#, + user_uuid, + email, + name + ) + .execute(pool) + .await? + .rows_affected(); + + Ok(affected_rows > 0) +} + +pub async fn get_user_id(pool: &PgPool, gotrue_uuid: &uuid::Uuid) -> Result { + let uid = sqlx::query!( + r#" + SELECT uid FROM af_user WHERE uuid = $1 + "#, + gotrue_uuid + ) + .fetch_one(pool) + .await? + .uid; + Ok(uid) +} diff --git a/libs/storage/src/workspace.rs b/libs/storage/src/workspace.rs index b7dd1768..07fde8af 100644 --- a/libs/storage/src/workspace.rs +++ b/libs/storage/src/workspace.rs @@ -5,66 +5,6 @@ use sqlx::{ use storage_entity::{AFRole, AFUserProfileView, AFWorkspace, AFWorkspaceMember}; -pub async fn update_user_name( - pool: &PgPool, - uuid: &uuid::Uuid, - name: &str, -) -> Result<(), sqlx::Error> { - sqlx::query!( - r#" - UPDATE af_user - SET name = $1 - WHERE uuid = $2 - "#, - name, - uuid - ) - .execute(pool) - .await?; - Ok(()) -} - -pub async fn create_user_if_not_exists( - pool: &PgPool, - user_uuid: &uuid::Uuid, - email: &str, - name: &str, -) -> Result { - let affected_rows = sqlx::query!( - r#" - INSERT INTO af_user (uuid, email, name) - SELECT $1, $2, $3 - WHERE NOT EXISTS ( - SELECT 1 FROM public.af_user WHERE email = $2 - ) - AND NOT EXISTS ( - SELECT 1 FROM public.af_user WHERE uuid = $1 - ) - "#, - user_uuid, - email, - name - ) - .execute(pool) - .await? - .rows_affected(); - - Ok(affected_rows > 0) -} - -pub async fn get_user_id(pool: &PgPool, gotrue_uuid: &uuid::Uuid) -> Result { - let uid = sqlx::query!( - r#" - SELECT uid FROM af_user WHERE uuid = $1 - "#, - gotrue_uuid - ) - .fetch_one(pool) - .await? - .uid; - Ok(uid) -} - pub async fn select_all_workspaces_owned( pool: &PgPool, owner_uuid: &Uuid, diff --git a/src/api/file_storage.rs b/src/api/file_storage.rs index d9819c3d..4ad950f6 100644 --- a/src/api/file_storage.rs +++ b/src/api/file_storage.rs @@ -1,12 +1,18 @@ +use std::pin::Pin; + +use actix_http::body::BoxBody; use actix_web::http::header::ContentType; -use actix_web::Result; +use actix_web::web::Payload; use actix_web::{ web::{self, Data}, Scope, }; -use bytes::Bytes; +use actix_web::{HttpResponse, Result}; use shared_entity::data::{AppResponse, JsonAppResponse}; use shared_entity::error_code::ErrorCode; +use tokio::io::AsyncRead; +use tokio_stream::StreamExt; +use tokio_util::io::StreamReader; use crate::biz::file_storage; use crate::{component::auth::jwt::UserUuid, state::AppState}; @@ -24,18 +30,19 @@ async fn put_handler( user_uuid: UserUuid, state: Data, path: web::Path, - file_data: Bytes, + payload: Payload, content_type: web::Header, ) -> Result> { let file_path = path.into_inner(); let mime = content_type.into_inner().0; + let mut async_read = payload_to_async_read(payload); file_storage::put_object( &state.pg_pool, &state.s3_bucket, &user_uuid, &file_path, - &file_data, mime, + &mut async_read, ) .await?; Ok(AppResponse::Ok().into()) @@ -55,13 +62,20 @@ async fn get_handler( user_uuid: UserUuid, state: Data, path: web::Path, -) -> Result { +) -> Result> { let file_path = path.into_inner(); match file_storage::get_object(&state.pg_pool, &state.s3_bucket, &user_uuid, &file_path).await { - Ok(data) => Ok(data), + Ok(async_read) => Ok(HttpResponse::Ok().streaming(async_read)), Err(e) => match e.code { ErrorCode::FileNotFound => Err(actix_web::error::ErrorNotFound(e)), _ => Err(actix_web::error::ErrorInternalServerError(e)), }, } } + +fn payload_to_async_read(payload: actix_web::web::Payload) -> Pin> { + let mapped = + payload.map(|chunk| chunk.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))); + let reader = StreamReader::new(mapped); + Box::pin(reader) +} diff --git a/src/biz/file_storage.rs b/src/biz/file_storage.rs index f3d35c41..39859b48 100644 --- a/src/biz/file_storage.rs +++ b/src/biz/file_storage.rs @@ -1,28 +1,42 @@ +use std::pin::Pin; + +use futures_util::Stream; + use bytes::Bytes; -use s3::request::ResponseData; +use s3::request::{ResponseData, ResponseDataStream}; use shared_entity::{error::AppError, error_code::ErrorCode}; use sqlx::types::uuid; -use storage::file_storage; +use storage::{file_storage, user::get_user_id}; +use tokio_stream::StreamExt; -// todo: user writer -pub async fn put_object( +use super::utils::CountingReader; + +pub async fn put_object( pg_pool: &sqlx::PgPool, s3_bucket: &s3::Bucket, user_uuid: &uuid::Uuid, - path: &str, - data: &[u8], + file_path: &str, mime: mime::Mime, -) -> Result<(), AppError> { + async_read: &mut R, +) -> Result<(), AppError> +where + R: tokio::io::AsyncRead + std::marker::Unpin, +{ // TODO: access control - let size = data.len() as i64; - let mut trans = pg_pool.begin().await?; let file_type = mime.to_string(); - let metadata = - file_storage::insert_file_metadata(&mut trans, user_uuid, path, &file_type, size).await?; - let resp = s3_bucket.put_object(metadata.s3_path(), data).await?; - check_s3_status(&resp)?; - trans.commit().await?; + let owner_uid = get_user_id(pg_pool, user_uuid).await?; + let full_path = format!("{}/{}", owner_uid, file_path); + let mut counting_reader = CountingReader::new(async_read); + let status_code = s3_bucket + .put_object_stream(&mut counting_reader, full_path) + .await?; + check_s3_status_code(status_code)?; + + let size = counting_reader.count(); + let _metadata = + file_storage::insert_file_metadata(pg_pool, owner_uid, file_path, &file_type, size as i64) + .await?; Ok(()) } @@ -38,7 +52,7 @@ pub async fn delete_object( match file_storage::delete_file_metadata(&mut trans, user_uuid, path).await { Ok(metadata) => { let resp = s3_bucket.delete_object(metadata.s3_path()).await?; - check_s3_status(&resp)?; + check_s3_response_data(&resp)?; trans.commit().await?; Ok(()) }, @@ -49,20 +63,18 @@ pub async fn delete_object( } } -// user reader pub async fn get_object( pg_pool: &sqlx::PgPool, s3_bucket: &s3::Bucket, user_uuid: &uuid::Uuid, path: &str, -) -> Result { +) -> Result>>>, AppError> { // TODO: access control match file_storage::get_file_metadata(pg_pool, user_uuid, path).await { Ok(metadata) => { - let resp = s3_bucket.get_object(metadata.s3_path()).await?; - check_s3_status(&resp)?; - Ok(resp.bytes().to_owned()) + let resp = s3_bucket.get_object_stream(metadata.s3_path()).await?; + Ok(s3_response_stream_to_tokio_stream(resp)) }, Err(e) => match e { sqlx::Error::RowNotFound => Err(ErrorCode::FileNotFound.into()), @@ -71,7 +83,7 @@ pub async fn get_object( } } -fn check_s3_status(resp: &ResponseData) -> Result<(), AppError> { +fn check_s3_response_data(resp: &ResponseData) -> Result<(), AppError> { let status_code = resp.status_code(); match status_code { 200..=299 => Ok(()), @@ -85,3 +97,20 @@ fn check_s3_status(resp: &ResponseData) -> Result<(), AppError> { }, } } + +fn check_s3_status_code(status_code: u16) -> Result<(), AppError> { + match status_code { + 200..=299 => Ok(()), + error_code => { + tracing::error!("S3 error: {}", error_code); + Err(ErrorCode::S3Error.into()) + }, + } +} + +fn s3_response_stream_to_tokio_stream( + resp: ResponseDataStream, +) -> Pin>>> { + let mapped = resp.bytes.map(Ok::); + Box::pin(mapped) +} diff --git a/src/biz/mod.rs b/src/biz/mod.rs index b8a7f82b..6e689656 100644 --- a/src/biz/mod.rs +++ b/src/biz/mod.rs @@ -1,3 +1,4 @@ pub mod file_storage; pub mod user; +pub mod utils; pub mod workspace; diff --git a/src/biz/user.rs b/src/biz/user.rs index c7ab0375..192e8be5 100644 --- a/src/biz/user.rs +++ b/src/biz/user.rs @@ -1,8 +1,9 @@ use anyhow::Result; use gotrue::api::Client; use shared_entity::error::AppError; -use storage::workspace::{ - create_user_if_not_exists, select_user_profile_view_by_uuid, update_user_name, +use storage::{ + user::{create_user_if_not_exists, update_user_name}, + workspace::select_user_profile_view_by_uuid, }; use storage_entity::AFUserProfileView; diff --git a/src/biz/utils.rs b/src/biz/utils.rs new file mode 100644 index 00000000..36f003e5 --- /dev/null +++ b/src/biz/utils.rs @@ -0,0 +1,44 @@ +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::io::{self, AsyncRead, ReadBuf}; + +pub struct CountingReader { + reader: R, + count: usize, +} + +impl AsyncRead for CountingReader +where + R: AsyncRead + Unpin, +{ + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let before = buf.filled().len(); + let poll = Pin::new(&mut self.reader).poll_read(cx, buf); + let after = buf.filled().len(); + self.count += after - before; + poll + } +} + +impl CountingReader { + pub fn new(reader: R) -> Self { + Self { reader, count: 0 } + } + + pub fn count(&self) -> usize { + self.count + } +} + +impl AsRef for CountingReader +where + R: AsyncRead + Unpin, +{ + fn as_ref(&self) -> &R { + &self.reader + } +}