From a3af38162eb238b7e0dff56e6182c270e0d6e719 Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Thu, 20 Jun 2024 21:12:28 +0800 Subject: [PATCH] chore: fix upload filem meta key by adding parent dir as prefix (#635) * chore: fix upload filem meta key by adding parent dir as prefix * chore: fix test --- Cargo.lock | 1 + libs/client-api/Cargo.toml | 1 + libs/client-api/src/http_blob.rs | 5 ++ libs/client-api/src/native/http_native.rs | 3 ++ libs/database/src/file/file_storage.rs | 10 ++-- libs/database/src/file/s3_client_impl.rs | 4 +- src/api/file_storage.rs | 25 +++++++--- tests/file_test/multiple_part_test.rs | 59 +++++++++++++++++++++++ 8 files changed, 93 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e069bb85..323acab1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1973,6 +1973,7 @@ dependencies = [ "infra", "mime", "parking_lot 0.12.1", + "percent-encoding", "prost", "reqwest 0.11.27", "scraper", diff --git a/libs/client-api/Cargo.toml b/libs/client-api/Cargo.toml index 10e53c56..4f92b282 100644 --- a/libs/client-api/Cargo.toml +++ b/libs/client-api/Cargo.toml @@ -43,6 +43,7 @@ shared-entity = { workspace = true } collab-rt-entity = { workspace = true } client-api-entity.workspace = true serde_urlencoded = "0.7.1" +percent-encoding = "2.3.1" [target.'cfg(not(target_arch = "wasm32"))'.dependencies] tokio-retry = "0.3" diff --git a/libs/client-api/src/http_blob.rs b/libs/client-api/src/http_blob.rs index 18e696fa..6a446707 100644 --- a/libs/client-api/src/http_blob.rs +++ b/libs/client-api/src/http_blob.rs @@ -5,6 +5,7 @@ use app_error::AppError; use bytes::Bytes; use futures_util::TryStreamExt; use mime::Mime; +use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC}; use reqwest::{header, Method, StatusCode}; use shared_entity::dto::workspace_dto::{BlobMetadata, RepeatedBlobMetaData}; use shared_entity::response::{AppResponse, AppResponseError}; @@ -67,6 +68,7 @@ impl Client { .into_data() } pub fn get_blob_url_v1(&self, workspace_id: &str, parent_dir: &str, file_id: &str) -> String { + let parent_dir = utf8_percent_encode(parent_dir, NON_ALPHANUMERIC).to_string(); format!( "{}/api/file_storage/{workspace_id}/v1/blob/{parent_dir}/{file_id}", self.base_url @@ -80,6 +82,7 @@ impl Client { parent_dir: &str, file_id: &str, ) -> Result<(Mime, Vec), AppResponseError> { + // Encode the parent directory to ensure it's URL-safe. let url = self.get_blob_url_v1(workspace_id, parent_dir, file_id); self.get_blob(&url).await } @@ -91,6 +94,7 @@ impl Client { parent_dir: &str, file_id: &str, ) -> Result<(), AppResponseError> { + let parent_dir = utf8_percent_encode(parent_dir, NON_ALPHANUMERIC).to_string(); let url = format!( "{}/api/file_storage/{workspace_id}/v1/blob/{parent_dir}/{file_id}", self.base_url @@ -111,6 +115,7 @@ impl Client { parent_dir: &str, file_id: &str, ) -> Result { + let parent_dir = utf8_percent_encode(parent_dir, NON_ALPHANUMERIC).to_string(); let url = format!( "{}/api/file_storage/{workspace_id}/v1/metadata/{parent_dir}/{file_id}", self.base_url diff --git a/libs/client-api/src/native/http_native.rs b/libs/client-api/src/native/http_native.rs index 08561d1a..48b42e95 100644 --- a/libs/client-api/src/native/http_native.rs +++ b/libs/client-api/src/native/http_native.rs @@ -19,6 +19,7 @@ use shared_entity::dto::workspace_dto::CollabResponse; use shared_entity::response::{AppResponse, AppResponseError}; use std::future::Future; +use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC}; use std::sync::atomic::Ordering; use std::time::Duration; use tokio_retry::strategy::{ExponentialBackoff, FixedInterval}; @@ -69,6 +70,8 @@ impl Client { ))); } + // Encode the parent directory to ensure it's URL-safe. + let parent_dir = utf8_percent_encode(parent_dir, NON_ALPHANUMERIC).to_string(); let url = format!( "{}/api/file_storage/{workspace_id}/upload_part/{parent_dir}/{file_id}/{upload_id}/{part_number}", self.base_url diff --git a/libs/database/src/file/file_storage.rs b/libs/database/src/file/file_storage.rs index 15682bea..9462052e 100644 --- a/libs/database/src/file/file_storage.rs +++ b/libs/database/src/file/file_storage.rs @@ -50,7 +50,7 @@ pub trait BucketClient { pub trait BlobKey: Send + Sync { fn workspace_id(&self) -> &Uuid; fn object_key(&self) -> String; - fn meta_key(&self) -> &str; + fn meta_key(&self) -> String; fn e_tag(&self) -> &str; } @@ -81,7 +81,7 @@ where file_data: Vec, file_type: String, ) -> Result<(), AppError> { - if is_blob_metadata_exists(&self.pg_pool, key.workspace_id(), key.meta_key()).await? { + if is_blob_metadata_exists(&self.pg_pool, key.workspace_id(), &key.meta_key()).await? { warn!( "file already exists, workspace_id: {}, meta_key: {}", key.workspace_id(), @@ -94,7 +94,7 @@ where insert_blob_metadata( &self.pg_pool, - key.meta_key(), + &key.meta_key(), key.workspace_id(), &file_type, file_data.len(), @@ -107,7 +107,7 @@ where self.client.delete_blob(&key.object_key()).await?; let mut tx = self.pg_pool.begin().await?; - delete_blob_metadata(&mut tx, key.workspace_id(), key.meta_key()).await?; + delete_blob_metadata(&mut tx, key.workspace_id(), &key.meta_key()).await?; tx.commit().await?; Ok(()) } @@ -159,7 +159,7 @@ where let (content_length, content_type) = self.client.complete_upload(&key, req).await?; insert_blob_metadata( &self.pg_pool, - key.meta_key(), + &key.meta_key(), key.workspace_id(), &content_type, content_length, diff --git a/libs/database/src/file/s3_client_impl.rs b/libs/database/src/file/s3_client_impl.rs index 1dcddaa8..84c73c73 100644 --- a/libs/database/src/file/s3_client_impl.rs +++ b/libs/database/src/file/s3_client_impl.rs @@ -175,7 +175,7 @@ impl BucketClient for AwsS3BucketClientImpl { .content_type(req.content_type) .send() .await - .map_err(|err| anyhow!("Failed to create upload: {}", err))?; + .map_err(|err| anyhow!(format!("Failed to create upload: {:?}", err)))?; match multipart_upload_res.upload_id { None => Err(anyhow!("Failed to create upload: upload_id is None").into()), @@ -212,7 +212,7 @@ impl BucketClient for AwsS3BucketClientImpl { .body(body) .send() .await - .map_err(|err| anyhow!("Failed to upload part: {}", err))?; + .map_err(|err| anyhow!(format!("Failed to upload part: {:?}", err)))?; match upload_part_res.e_tag { None => Err(anyhow!("Failed to upload part: e_tag is None").into()), diff --git a/src/api/file_storage.rs b/src/api/file_storage.rs index d4c64fb5..39283d12 100644 --- a/src/api/file_storage.rs +++ b/src/api/file_storage.rs @@ -26,7 +26,7 @@ use std::pin::Pin; use tokio::io::{AsyncRead, AsyncReadExt}; use tokio_stream::StreamExt; use tokio_util::io::StreamReader; -use tracing::{error, event, instrument}; +use tracing::{error, event, instrument, trace}; use crate::state::AppState; @@ -115,6 +115,15 @@ async fn upload_part_handler( mut payload: Payload, ) -> Result> { let path_params = path.into_inner(); + trace!( + "upload part: workspace_id: {}, parent_dir: {}, file_id: {}, upload_id: {}, part_num: {}", + path_params.workspace_id, + path_params.parent_dir, + path_params.file_id, + path_params.upload_id, + path_params.part_num + ); + let content_length = content_length.into_inner().into_inner(); let mut content = Vec::with_capacity(content_length); while let Some(chunk) = payload.try_next().await? { @@ -277,7 +286,7 @@ async fn get_blob_by_object_key( // Get the metadata let result = state .bucket_storage - .get_blob_metadata(key.workspace_id(), key.meta_key()) + .get_blob_metadata(key.workspace_id(), &key.meta_key()) .await; if let Err(err) = result.as_ref() { @@ -344,7 +353,7 @@ async fn get_blob_metadata_handler( // Get the metadata let metadata = state .bucket_storage - .get_blob_metadata(&path.workspace_id, path.meta_key()) + .get_blob_metadata(&path.workspace_id, &path.meta_key()) .await .map(|meta| BlobMetadata { workspace_id: meta.workspace_id, @@ -368,7 +377,7 @@ async fn get_blob_metadata_v1_handler( // Get the metadata let metadata = state .bucket_storage - .get_blob_metadata(&path.workspace_id, path.meta_key()) + .get_blob_metadata(&path.workspace_id, &path.meta_key()) .await .map(|meta| BlobMetadata { workspace_id: meta.workspace_id, @@ -443,8 +452,8 @@ impl BlobKey for BlobPathV0 { format!("{}/{}", self.workspace_id, self.file_id) } - fn meta_key(&self) -> &str { - &self.file_id + fn meta_key(&self) -> String { + self.file_id.clone() } fn e_tag(&self) -> &str { @@ -469,8 +478,8 @@ impl BlobKey for BlobPathV1 { format!("{}/{}/{}", self.workspace_id, self.parent_dir, self.file_id) } - fn meta_key(&self) -> &str { - &self.file_id + fn meta_key(&self) -> String { + format!("{}_{}", self.parent_dir, self.file_id) } fn e_tag(&self) -> &str { diff --git a/tests/file_test/multiple_part_test.rs b/tests/file_test/multiple_part_test.rs index 986e6f83..4b445d82 100644 --- a/tests/file_test/multiple_part_test.rs +++ b/tests/file_test/multiple_part_test.rs @@ -355,3 +355,62 @@ async fn invalid_test() { assert_eq!(err.code, ErrorCode::Internal); } } + +#[tokio::test] +async fn multiple_level_dir_upload_file_test() { + // Test with smaller file (single part) + let (c1, _user1) = generate_unique_registered_user_client().await; + let workspace_id = workspace_id_from_client(&c1).await; + let mime = mime::TEXT_PLAIN_UTF_8; + let text = generate_random_string(1024); + let file_id = Uuid::new_v4().to_string(); + let parent_dir = "file/v1/image".to_string(); + let upload = c1 + .create_upload( + &workspace_id, + CreateUploadRequest { + file_id: file_id.clone(), + parent_dir: parent_dir.clone(), + content_type: mime.to_string(), + }, + ) + .await + .unwrap(); + let chunked_bytes = ChunkedBytes::from_bytes(Bytes::from(text.clone())).unwrap(); + let mut completed_parts = Vec::new(); + let iter = chunked_bytes.iter().enumerate(); + for (index, next) in iter { + let resp = c1 + .upload_part( + &workspace_id, + &parent_dir, + &file_id, + &upload.upload_id, + index as i32 + 1, + next.to_vec(), + ) + .await + .unwrap(); + + completed_parts.push(CompletedPartRequest { + e_tag: resp.e_tag, + part_number: resp.part_num, + }); + } + let req = CompleteUploadRequest { + file_id: file_id.clone(), + parent_dir: parent_dir.clone(), + upload_id: upload.upload_id, + parts: completed_parts, + }; + c1.complete_upload(&workspace_id, req).await.unwrap(); + + let blob = c1 + .get_blob_v1(&workspace_id, &parent_dir, &file_id) + .await + .unwrap() + .1; + + let blob_text = String::from_utf8(blob.to_vec()).unwrap(); + assert_eq!(blob_text, text); +}