feat: async read for file upload (#80)

* feat: async read for file upload

* feat: async read for file download

---------

Co-authored-by: nathan <nathan@appflowy.io>
This commit is contained in:
Zack 2023-09-28 14:53:41 +08:00 committed by GitHub
parent b8bd53ff5c
commit a6eb8607c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 214 additions and 113 deletions

View File

@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO af_file_metadata (owner_uid, path, file_type, file_size)\n SELECT uid, $2, $3, $4\n FROM af_user\n WHERE uuid = $1\n ON CONFLICT (owner_uid, path) DO UPDATE SET\n file_type = $3,\n file_size = $4\n RETURNING *\n ",
"query": "\n INSERT INTO af_file_metadata\n (owner_uid, path, file_type, file_size)\n VALUES ($1, $2, $3, $4)\n ON CONFLICT (owner_uid, path) DO UPDATE SET\n file_type = $3,\n file_size = $4\n RETURNING *\n ",
"describe": {
"columns": [
{
@ -31,7 +31,7 @@
],
"parameters": {
"Left": [
"Uuid",
"Int8",
"Varchar",
"Varchar",
"Int8"
@ -45,5 +45,5 @@
false
]
},
"hash": "f63968cfcf5a7c9bd3517573213dcf40388fe754fc3204789cf4aff3da5d00b2"
"hash": "180e896f2f540e16284f7adc687dd93cc3933954854a4862bd235dd5cdccbe10"
}

7
Cargo.lock generated
View File

@ -460,6 +460,7 @@ dependencies = [
"derive_more",
"dotenv",
"fancy-regex",
"futures",
"futures-util",
"gotrue",
"gotrue-entity",
@ -489,6 +490,7 @@ dependencies = [
"token",
"tokio",
"tokio-stream",
"tokio-util",
"tracing",
"tracing-actix-web",
"tracing-bunyan-formatter",
@ -836,6 +838,7 @@ dependencies = [
"collab",
"collab-define",
"collab-sync-protocol",
"futures-core",
"futures-util",
"gotrue",
"gotrue-entity",
@ -4027,9 +4030,9 @@ dependencies = [
[[package]]
name = "tokio-util"
version = "0.7.8"
version = "0.7.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d"
checksum = "1d68074620f57a0b21594d9735eb2e98ab38b17f80d3fcb189fca266771ca60d"
dependencies = [
"bytes",
"futures-core",

View File

@ -30,7 +30,9 @@ tokio = { version = "1.26.0", features = [
"time",
] }
tokio-stream = "0.1.14"
futures-util = "0.3.26"
tokio-util = { version = "0.7.9", features = ["io"] }
futures = "0.3.17"
futures-util ={ version = "0.3.26" , features = ["std","io"] }
config = { version = "0.13.3", default-features = false, features = ["yaml"] }
once_cell = "1.13.0"
chrono = { version = "0.4.23", features = ["serde", "clock"], default-features = false }

View File

@ -45,11 +45,6 @@ pkill -f appflowy_cloud || true
cargo sqlx database create && cargo sqlx migrate run && cargo sqlx prepare --workspace
RUST_LOG=trace cargo run &
# sometimes the gotrue server may not be ready yet
sleep 1
# created registered user
./build/init_registered_user.sh
# revert to require signup email verification
export GOTRUE_MAILER_AUTOCONFIRM=false
docker-compose --file ./docker-compose-dev.yml up -d

View File

@ -27,6 +27,7 @@ serde = { version = "1.0", features = ["derive"] }
tokio-tungstenite = { version = "0.20.1" }
tokio = { version = "1.26", features = ["full"] }
futures-util = "0.3.26"
futures-core = "0.3.26"
tokio-retry = "0.3"
bytes = "1.0"
uuid = "1.4.1"

View File

@ -1,5 +1,6 @@
use anyhow::anyhow;
use bytes::Bytes;
use futures_util::StreamExt;
use gotrue::grant::Grant;
use gotrue::grant::PasswordGrant;
use gotrue::grant::RefreshTokenGrant;
@ -463,7 +464,10 @@ impl Client {
AppResponse::<()>::from_response(resp).await?.into_error()
}
pub async fn get_file_storage_object(&self, path: &str) -> Result<Bytes, AppError> {
pub async fn get_file_storage_object_stream(
&self,
path: &str,
) -> Result<impl futures_core::Stream<Item = reqwest::Result<Bytes>>, AppError> {
let url = format!("{}/api/file_storage/{}", self.base_url, path);
let resp = self
.http_client_with_auth(Method::GET, &url)
@ -471,10 +475,7 @@ impl Client {
.send()
.await?;
match resp.status() {
reqwest::StatusCode::OK => {
let bytes = resp.bytes().await?;
Ok(bytes)
},
reqwest::StatusCode::OK => Ok(resp.bytes_stream()),
reqwest::StatusCode::NOT_FOUND => Err(ErrorCode::FileNotFound.into()),
c => Err(AppError::new(
ErrorCode::Unhandled,
@ -483,6 +484,15 @@ impl Client {
}
}
pub async fn get_file_storage_object(&self, path: &str) -> Result<Bytes, AppError> {
let mut acc: Vec<u8> = Vec::new();
let mut stream = self.get_file_storage_object_stream(path).await?;
while let Some(raw_bytes) = stream.next().await {
acc.extend_from_slice(&raw_bytes?);
}
Ok(Bytes::from(acc))
}
pub async fn delete_file_storage_object(&self, path: &str) -> Result<(), AppError> {
let url = format!("{}/api/file_storage/{}", self.base_url, path);
let resp = self

View File

@ -2,8 +2,8 @@ use sqlx::{PgPool, Transaction};
use storage_entity::AFFileMetadata;
pub async fn insert_file_metadata(
trans: &mut Transaction<'_, sqlx::Postgres>,
user: &uuid::Uuid,
pg_pool: &PgPool,
owner_uid: i64,
path: &str,
file_type: &str,
file_size: i64,
@ -11,21 +11,20 @@ pub async fn insert_file_metadata(
sqlx::query_as!(
AFFileMetadata,
r#"
INSERT INTO af_file_metadata (owner_uid, path, file_type, file_size)
SELECT uid, $2, $3, $4
FROM af_user
WHERE uuid = $1
INSERT INTO af_file_metadata
(owner_uid, path, file_type, file_size)
VALUES ($1, $2, $3, $4)
ON CONFLICT (owner_uid, path) DO UPDATE SET
file_type = $3,
file_size = $4
RETURNING *
"#,
user,
owner_uid,
path,
file_type,
file_size
)
.fetch_one(trans.as_mut())
.fetch_one(pg_pool)
.await
}

View File

@ -1,4 +1,5 @@
pub mod collab;
pub mod error;
pub mod file_storage;
pub mod user;
pub mod workspace;

61
libs/storage/src/user.rs Normal file
View File

@ -0,0 +1,61 @@
use sqlx::PgPool;
pub async fn update_user_name(
pool: &PgPool,
uuid: &uuid::Uuid,
name: &str,
) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
UPDATE af_user
SET name = $1
WHERE uuid = $2
"#,
name,
uuid
)
.execute(pool)
.await?;
Ok(())
}
pub async fn create_user_if_not_exists(
pool: &PgPool,
user_uuid: &uuid::Uuid,
email: &str,
name: &str,
) -> Result<bool, sqlx::Error> {
let affected_rows = sqlx::query!(
r#"
INSERT INTO af_user (uuid, email, name)
SELECT $1, $2, $3
WHERE NOT EXISTS (
SELECT 1 FROM public.af_user WHERE email = $2
)
AND NOT EXISTS (
SELECT 1 FROM public.af_user WHERE uuid = $1
)
"#,
user_uuid,
email,
name
)
.execute(pool)
.await?
.rows_affected();
Ok(affected_rows > 0)
}
pub async fn get_user_id(pool: &PgPool, gotrue_uuid: &uuid::Uuid) -> Result<i64, sqlx::Error> {
let uid = sqlx::query!(
r#"
SELECT uid FROM af_user WHERE uuid = $1
"#,
gotrue_uuid
)
.fetch_one(pool)
.await?
.uid;
Ok(uid)
}

View File

@ -5,66 +5,6 @@ use sqlx::{
use storage_entity::{AFRole, AFUserProfileView, AFWorkspace, AFWorkspaceMember};
pub async fn update_user_name(
pool: &PgPool,
uuid: &uuid::Uuid,
name: &str,
) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
UPDATE af_user
SET name = $1
WHERE uuid = $2
"#,
name,
uuid
)
.execute(pool)
.await?;
Ok(())
}
pub async fn create_user_if_not_exists(
pool: &PgPool,
user_uuid: &uuid::Uuid,
email: &str,
name: &str,
) -> Result<bool, sqlx::Error> {
let affected_rows = sqlx::query!(
r#"
INSERT INTO af_user (uuid, email, name)
SELECT $1, $2, $3
WHERE NOT EXISTS (
SELECT 1 FROM public.af_user WHERE email = $2
)
AND NOT EXISTS (
SELECT 1 FROM public.af_user WHERE uuid = $1
)
"#,
user_uuid,
email,
name
)
.execute(pool)
.await?
.rows_affected();
Ok(affected_rows > 0)
}
pub async fn get_user_id(pool: &PgPool, gotrue_uuid: &uuid::Uuid) -> Result<i64, sqlx::Error> {
let uid = sqlx::query!(
r#"
SELECT uid FROM af_user WHERE uuid = $1
"#,
gotrue_uuid
)
.fetch_one(pool)
.await?
.uid;
Ok(uid)
}
pub async fn select_all_workspaces_owned(
pool: &PgPool,
owner_uuid: &Uuid,

View File

@ -1,12 +1,18 @@
use std::pin::Pin;
use actix_http::body::BoxBody;
use actix_web::http::header::ContentType;
use actix_web::Result;
use actix_web::web::Payload;
use actix_web::{
web::{self, Data},
Scope,
};
use bytes::Bytes;
use actix_web::{HttpResponse, Result};
use shared_entity::data::{AppResponse, JsonAppResponse};
use shared_entity::error_code::ErrorCode;
use tokio::io::AsyncRead;
use tokio_stream::StreamExt;
use tokio_util::io::StreamReader;
use crate::biz::file_storage;
use crate::{component::auth::jwt::UserUuid, state::AppState};
@ -24,18 +30,19 @@ async fn put_handler(
user_uuid: UserUuid,
state: Data<AppState>,
path: web::Path<String>,
file_data: Bytes,
payload: Payload,
content_type: web::Header<ContentType>,
) -> Result<JsonAppResponse<()>> {
let file_path = path.into_inner();
let mime = content_type.into_inner().0;
let mut async_read = payload_to_async_read(payload);
file_storage::put_object(
&state.pg_pool,
&state.s3_bucket,
&user_uuid,
&file_path,
&file_data,
mime,
&mut async_read,
)
.await?;
Ok(AppResponse::Ok().into())
@ -55,13 +62,20 @@ async fn get_handler(
user_uuid: UserUuid,
state: Data<AppState>,
path: web::Path<String>,
) -> Result<Bytes> {
) -> Result<HttpResponse<BoxBody>> {
let file_path = path.into_inner();
match file_storage::get_object(&state.pg_pool, &state.s3_bucket, &user_uuid, &file_path).await {
Ok(data) => Ok(data),
Ok(async_read) => Ok(HttpResponse::Ok().streaming(async_read)),
Err(e) => match e.code {
ErrorCode::FileNotFound => Err(actix_web::error::ErrorNotFound(e)),
_ => Err(actix_web::error::ErrorInternalServerError(e)),
},
}
}
fn payload_to_async_read(payload: actix_web::web::Payload) -> Pin<Box<dyn AsyncRead>> {
let mapped =
payload.map(|chunk| chunk.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)));
let reader = StreamReader::new(mapped);
Box::pin(reader)
}

View File

@ -1,28 +1,42 @@
use std::pin::Pin;
use futures_util::Stream;
use bytes::Bytes;
use s3::request::ResponseData;
use s3::request::{ResponseData, ResponseDataStream};
use shared_entity::{error::AppError, error_code::ErrorCode};
use sqlx::types::uuid;
use storage::file_storage;
use storage::{file_storage, user::get_user_id};
use tokio_stream::StreamExt;
// todo: user writer
pub async fn put_object(
use super::utils::CountingReader;
pub async fn put_object<R>(
pg_pool: &sqlx::PgPool,
s3_bucket: &s3::Bucket,
user_uuid: &uuid::Uuid,
path: &str,
data: &[u8],
file_path: &str,
mime: mime::Mime,
) -> Result<(), AppError> {
async_read: &mut R,
) -> Result<(), AppError>
where
R: tokio::io::AsyncRead + std::marker::Unpin,
{
// TODO: access control
let size = data.len() as i64;
let mut trans = pg_pool.begin().await?;
let file_type = mime.to_string();
let metadata =
file_storage::insert_file_metadata(&mut trans, user_uuid, path, &file_type, size).await?;
let resp = s3_bucket.put_object(metadata.s3_path(), data).await?;
check_s3_status(&resp)?;
trans.commit().await?;
let owner_uid = get_user_id(pg_pool, user_uuid).await?;
let full_path = format!("{}/{}", owner_uid, file_path);
let mut counting_reader = CountingReader::new(async_read);
let status_code = s3_bucket
.put_object_stream(&mut counting_reader, full_path)
.await?;
check_s3_status_code(status_code)?;
let size = counting_reader.count();
let _metadata =
file_storage::insert_file_metadata(pg_pool, owner_uid, file_path, &file_type, size as i64)
.await?;
Ok(())
}
@ -38,7 +52,7 @@ pub async fn delete_object(
match file_storage::delete_file_metadata(&mut trans, user_uuid, path).await {
Ok(metadata) => {
let resp = s3_bucket.delete_object(metadata.s3_path()).await?;
check_s3_status(&resp)?;
check_s3_response_data(&resp)?;
trans.commit().await?;
Ok(())
},
@ -49,20 +63,18 @@ pub async fn delete_object(
}
}
// user reader
pub async fn get_object(
pg_pool: &sqlx::PgPool,
s3_bucket: &s3::Bucket,
user_uuid: &uuid::Uuid,
path: &str,
) -> Result<Bytes, AppError> {
) -> Result<Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>>>>, AppError> {
// TODO: access control
match file_storage::get_file_metadata(pg_pool, user_uuid, path).await {
Ok(metadata) => {
let resp = s3_bucket.get_object(metadata.s3_path()).await?;
check_s3_status(&resp)?;
Ok(resp.bytes().to_owned())
let resp = s3_bucket.get_object_stream(metadata.s3_path()).await?;
Ok(s3_response_stream_to_tokio_stream(resp))
},
Err(e) => match e {
sqlx::Error::RowNotFound => Err(ErrorCode::FileNotFound.into()),
@ -71,7 +83,7 @@ pub async fn get_object(
}
}
fn check_s3_status(resp: &ResponseData) -> Result<(), AppError> {
fn check_s3_response_data(resp: &ResponseData) -> Result<(), AppError> {
let status_code = resp.status_code();
match status_code {
200..=299 => Ok(()),
@ -85,3 +97,20 @@ fn check_s3_status(resp: &ResponseData) -> Result<(), AppError> {
},
}
}
fn check_s3_status_code(status_code: u16) -> Result<(), AppError> {
match status_code {
200..=299 => Ok(()),
error_code => {
tracing::error!("S3 error: {}", error_code);
Err(ErrorCode::S3Error.into())
},
}
}
fn s3_response_stream_to_tokio_stream(
resp: ResponseDataStream,
) -> Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>>>> {
let mapped = resp.bytes.map(Ok::<bytes::Bytes, std::io::Error>);
Box::pin(mapped)
}

View File

@ -1,3 +1,4 @@
pub mod file_storage;
pub mod user;
pub mod utils;
pub mod workspace;

View File

@ -1,8 +1,9 @@
use anyhow::Result;
use gotrue::api::Client;
use shared_entity::error::AppError;
use storage::workspace::{
create_user_if_not_exists, select_user_profile_view_by_uuid, update_user_name,
use storage::{
user::{create_user_if_not_exists, update_user_name},
workspace::select_user_profile_view_by_uuid,
};
use storage_entity::AFUserProfileView;

44
src/biz/utils.rs Normal file
View File

@ -0,0 +1,44 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{self, AsyncRead, ReadBuf};
pub struct CountingReader<R> {
reader: R,
count: usize,
}
impl<R> AsyncRead for CountingReader<R>
where
R: AsyncRead + Unpin,
{
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let before = buf.filled().len();
let poll = Pin::new(&mut self.reader).poll_read(cx, buf);
let after = buf.filled().len();
self.count += after - before;
poll
}
}
impl<R> CountingReader<R> {
pub fn new(reader: R) -> Self {
Self { reader, count: 0 }
}
pub fn count(&self) -> usize {
self.count
}
}
impl<R> AsRef<R> for CountingReader<R>
where
R: AsyncRead + Unpin,
{
fn as_ref(&self) -> &R {
&self.reader
}
}