diff --git a/.sqlx/query-f448ae1b28ef69f884040016072b12694e530b64a105e03a040c65b779c9d91e.json b/.sqlx/query-c360ec37792d567535ccd2a5011d92c7a201f516e92e204db855167f381c58b1.json similarity index 90% rename from .sqlx/query-f448ae1b28ef69f884040016072b12694e530b64a105e03a040c65b779c9d91e.json rename to .sqlx/query-c360ec37792d567535ccd2a5011d92c7a201f516e92e204db855167f381c58b1.json index 5b7e7844..4391bfa1 100644 --- a/.sqlx/query-f448ae1b28ef69f884040016072b12694e530b64a105e03a040c65b779c9d91e.json +++ b/.sqlx/query-c360ec37792d567535ccd2a5011d92c7a201f516e92e204db855167f381c58b1.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n workspace_id,\n database_storage_id,\n owner_uid,\n owner_profile.name as owner_name,\n owner_profile.email as owner_email,\n af_workspace.created_at,\n workspace_type,\n af_workspace.deleted_at,\n workspace_name,\n icon\n FROM public.af_workspace\n JOIN public.af_user owner_profile ON af_workspace.owner_uid = owner_profile.uid\n WHERE workspace_id = $1\n ", + "query": "\n SELECT\n workspace_id,\n database_storage_id,\n owner_uid,\n owner_profile.name as owner_name,\n owner_profile.email as owner_email,\n af_workspace.created_at,\n workspace_type,\n af_workspace.deleted_at,\n workspace_name,\n icon\n FROM public.af_workspace\n JOIN public.af_user owner_profile ON af_workspace.owner_uid = owner_profile.uid\n WHERE af_workspace.workspace_id = $1\n AND COALESCE(af_workspace.is_initialized, true) = true;\n ", "describe": { "columns": [ { @@ -72,5 +72,5 @@ false ] }, - "hash": "f448ae1b28ef69f884040016072b12694e530b64a105e03a040c65b779c9d91e" + "hash": "c360ec37792d567535ccd2a5011d92c7a201f516e92e204db855167f381c58b1" } diff --git a/Cargo.lock b/Cargo.lock index 40451c35..694832f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -608,6 +608,7 @@ dependencies = [ "assert-json-diff", "async-stream", "async-trait", + "async_zip", "authentication", "aws-config", "aws-sdk-s3", @@ -633,6 +634,7 @@ dependencies = [ "dotenvy", "fancy-regex 0.11.0", "futures", + "futures-lite", "futures-util", "gotrue", "gotrue-entity", @@ -660,6 +662,7 @@ dependencies = [ "rcgen", "redis 0.25.4", "reqwest 0.11.27", + "sanitize-filename", "scraper", "secrecy", "semver", @@ -2228,7 +2231,7 @@ dependencies = [ [[package]] name = "collab" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=3d282e6cb1172da9216d8f6e1546ffbf12a066b7#3d282e6cb1172da9216d8f6e1546ffbf12a066b7" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c#cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c" dependencies = [ "anyhow", "arc-swap", @@ -2253,7 +2256,7 @@ dependencies = [ [[package]] name = "collab-database" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=3d282e6cb1172da9216d8f6e1546ffbf12a066b7#3d282e6cb1172da9216d8f6e1546ffbf12a066b7" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c#cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c" dependencies = [ "anyhow", "async-trait", @@ -2292,7 +2295,7 @@ dependencies = [ [[package]] name = "collab-document" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=3d282e6cb1172da9216d8f6e1546ffbf12a066b7#3d282e6cb1172da9216d8f6e1546ffbf12a066b7" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c#cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c" dependencies = [ "anyhow", "arc-swap", @@ -2313,7 +2316,7 @@ dependencies = [ [[package]] name = "collab-entity" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=3d282e6cb1172da9216d8f6e1546ffbf12a066b7#3d282e6cb1172da9216d8f6e1546ffbf12a066b7" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c#cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c" dependencies = [ "anyhow", "bytes", @@ -2333,7 +2336,7 @@ dependencies = [ [[package]] name = "collab-folder" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=3d282e6cb1172da9216d8f6e1546ffbf12a066b7#3d282e6cb1172da9216d8f6e1546ffbf12a066b7" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c#cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c" dependencies = [ "anyhow", "arc-swap", @@ -2355,7 +2358,7 @@ dependencies = [ [[package]] name = "collab-importer" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=3d282e6cb1172da9216d8f6e1546ffbf12a066b7#3d282e6cb1172da9216d8f6e1546ffbf12a066b7" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c#cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c" dependencies = [ "anyhow", "async-recursion", @@ -2456,7 +2459,7 @@ dependencies = [ [[package]] name = "collab-user" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=3d282e6cb1172da9216d8f6e1546ffbf12a066b7#3d282e6cb1172da9216d8f6e1546ffbf12a066b7" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c#cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c" dependencies = [ "anyhow", "collab", diff --git a/Cargo.toml b/Cargo.toml index 3c5240ff..37f9fe9a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -149,6 +149,9 @@ byteorder = "1.5.0" sha2 = "0.10.8" rayon.workspace = true mailer.workspace = true +async_zip.workspace = true +sanitize-filename.workspace = true +futures-lite = "2.3.0" [dev-dependencies] @@ -170,6 +173,7 @@ collab-rt-entity.workspace = true hex = "0.4.3" unicode-normalization = "0.1.24" + [[bin]] name = "appflowy_cloud" path = "src/main.rs" @@ -270,6 +274,8 @@ tonic-proto = { path = "libs/tonic-proto" } appflowy-ai-client = { path = "libs/appflowy-ai-client", default-features = false } pgvector = { version = "0.4", features = ["sqlx"] } client-api-entity = { path = "libs/client-api-entity" } +async_zip = { version = "0.0.17", features = ["full"] } +sanitize-filename = "0.5.0" # collaboration yrs = { version = "0.21.2", features = ["sync"] } @@ -293,13 +299,13 @@ debug = true [patch.crates-io] # It's diffcult to resovle different version with the same crate used in AppFlowy Frontend and the Client-API crate. # So using patch to workaround this issue. -collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "3d282e6cb1172da9216d8f6e1546ffbf12a066b7" } -collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "3d282e6cb1172da9216d8f6e1546ffbf12a066b7" } -collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "3d282e6cb1172da9216d8f6e1546ffbf12a066b7" } -collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "3d282e6cb1172da9216d8f6e1546ffbf12a066b7" } -collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "3d282e6cb1172da9216d8f6e1546ffbf12a066b7" } -collab-database = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "3d282e6cb1172da9216d8f6e1546ffbf12a066b7" } -collab-importer = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "3d282e6cb1172da9216d8f6e1546ffbf12a066b7" } +collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c" } +collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c" } +collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c" } +collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c" } +collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c" } +collab-database = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c" } +collab-importer = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "cabf08194dff2b764ca0c7c63a2c1bdd5d02e45c" } [features] history = [] diff --git a/libs/database/src/workspace.rs b/libs/database/src/workspace.rs index 56a31bc5..afe77a3b 100644 --- a/libs/database/src/workspace.rs +++ b/libs/database/src/workspace.rs @@ -660,7 +660,8 @@ pub async fn select_workspace<'a, E: Executor<'a, Database = Postgres>>( icon FROM public.af_workspace JOIN public.af_user owner_profile ON af_workspace.owner_uid = owner_profile.uid - WHERE workspace_id = $1 + WHERE af_workspace.workspace_id = $1 + AND COALESCE(af_workspace.is_initialized, true) = true; "#, workspace_id ) diff --git a/services/appflowy-worker/src/error.rs b/services/appflowy-worker/src/error.rs index 021f0250..cc0b0330 100644 --- a/services/appflowy-worker/src/error.rs +++ b/services/appflowy-worker/src/error.rs @@ -25,6 +25,9 @@ pub enum ImportError { #[error("Can not open the workspace:{0}")] CannotOpenWorkspace(String), + #[error("Failed to unzip file: {0}")] + UnZipFileError(String), + #[error(transparent)] Internal(#[from] anyhow::Error), } @@ -125,6 +128,15 @@ impl ImportError { ), format!("Task ID: {} - Internal error: {}", task_id, err), ), + ImportError::UnZipFileError(_) => { + ( + format!( + "Task ID: {} - There was an issue unzipping the file. Please check the file and try again.", + task_id + ), + format!("Task ID: {} - Unzip file error", task_id), + ) + } } } } diff --git a/services/appflowy-worker/src/import_worker/worker.rs b/services/appflowy-worker/src/import_worker/worker.rs index 25755b1a..ed476f2f 100644 --- a/services/appflowy-worker/src/import_worker/worker.rs +++ b/services/appflowy-worker/src/import_worker/worker.rs @@ -1,9 +1,12 @@ use crate::import_worker::report::{ImportNotifier, ImportProgress, ImportResult}; -use crate::s3_client::S3StreamResponse; +use crate::s3_client::{download_file, S3StreamResponse}; use anyhow::anyhow; -use async_zip::base::read::stream::ZipFileReader; use aws_sdk_s3::primitives::ByteStream; +use crate::error::ImportError; +use crate::mailer::ImportNotionMailerParam; +use crate::s3_client::S3Client; + use bytes::Bytes; use collab::core::origin::CollabOrigin; use collab::entity::EncodedCollab; @@ -15,13 +18,15 @@ use collab_importer::notion::page::CollabResource; use collab_importer::notion::NotionImporter; use collab_importer::util::FileId; use collab_importer::zip_tool::unzip_stream; +use database::collab::mem_cache::{cache_exp_secs_from_collab_type, CollabMemCache}; use database::collab::{insert_into_af_collab_bulk_for_user, select_blob_from_af_collab}; +use database::resource_usage::{insert_blob_metadata_bulk, BulkInsertMeta}; use database::workspace::{ delete_from_workspace, select_workspace_database_storage_id, update_import_task_status, update_workspace_status, }; use database_entity::dto::CollabParams; -use futures::io::BufReader; + use futures::stream::FuturesUnordered; use futures::{stream, StreamExt}; use redis::aio::ConnectionManager; @@ -45,15 +50,9 @@ use std::str::FromStr; use std::sync::Arc; use std::time::Duration; use tokio::fs; - -use crate::s3_client::S3Client; -use database::collab::mem_cache::{cache_exp_secs_from_collab_type, CollabMemCache}; use tokio::task::spawn_local; use tokio::time::interval; - -use crate::error::ImportError; -use crate::mailer::ImportNotionMailerParam; -use database::resource_usage::{insert_blob_metadata_bulk, BulkInsertMeta}; +use tokio_util::compat::TokioAsyncReadCompatExt; use tracing::{error, info, trace, warn}; use uuid::Uuid; @@ -75,7 +74,16 @@ pub async fn run_import_worker( error!("Failed to ensure consumer group: {:?}", err); } + let mut storage_dir = temp_dir().join("import_worker_temp_dir"); + if !storage_dir.exists() { + if let Err(err) = fs::create_dir(&storage_dir).await { + error!("Failed to create importer temp dir: {:?}", err); + storage_dir = temp_dir(); + } + } + process_un_acked_tasks( + &storage_dir, &mut redis_client, &s3_client, &pg_pool, @@ -87,6 +95,7 @@ pub async fn run_import_worker( .await; process_upcoming_tasks( + &storage_dir, &mut redis_client, &s3_client, pg_pool, @@ -101,7 +110,9 @@ pub async fn run_import_worker( Ok(()) } +#[allow(clippy::too_many_arguments)] async fn process_un_acked_tasks( + storage_dir: &Path, redis_client: &mut ConnectionManager, s3_client: &Arc, pg_pool: &PgPool, @@ -117,6 +128,7 @@ async fn process_un_acked_tasks( for un_ack_task in un_ack_tasks { // Ignore the error here since the consume task will handle the error let _ = consume_task( + storage_dir, stream_name, group_name, un_ack_task.task, @@ -135,6 +147,7 @@ async fn process_un_acked_tasks( #[allow(clippy::too_many_arguments)] async fn process_upcoming_tasks( + storage_dir: &Path, redis_client: &mut ConnectionManager, s3_client: &Arc, pg_pool: PgPool, @@ -176,8 +189,10 @@ async fn process_upcoming_tasks( let notifier = notifier.clone(); let stream_name = stream_name.to_string(); let group_name = group_name.to_string(); + let storage_dir = storage_dir.to_path_buf(); task_handlers.push(spawn_local(async move { consume_task( + &storage_dir, &stream_name, &group_name, import_task, @@ -210,6 +225,7 @@ async fn process_upcoming_tasks( #[allow(clippy::too_many_arguments)] async fn consume_task( + storage_dir: &Path, stream_name: &str, group_name: &str, import_task: ImportTask, @@ -219,7 +235,15 @@ async fn consume_task( pg_pool: &Pool, notifier: Arc, ) -> Result<(), ImportError> { - let result = process_task(import_task, s3_client, redis_client, pg_pool, notifier).await; + let result = process_task( + storage_dir, + import_task, + s3_client, + redis_client, + pg_pool, + notifier, + ) + .await; // Each task will be consumed only once, regardless of success or failure. let _: () = redis_client @@ -234,6 +258,7 @@ async fn consume_task( } async fn process_task( + storage_dir: &Path, import_task: ImportTask, s3_client: &Arc, redis_client: &mut ConnectionManager, @@ -245,7 +270,10 @@ async fn process_task( match import_task { ImportTask::Notion(task) => { // 1. download zip file - match download_and_unzip_file(&task, s3_client).await { + let unzip_result = + download_and_unzip_file_retry(storage_dir, &task, s3_client, 3, Duration::from_secs(5)) + .await; + match unzip_result { Ok(unzip_dir_path) => { // 2. process unzip file let result = @@ -292,7 +320,38 @@ async fn process_task( } } +pub async fn download_and_unzip_file_retry( + storage_dir: &Path, + import_task: &NotionImportTask, + s3_client: &Arc, + max_retries: usize, + interval: Duration, +) -> Result { + let mut attempt = 0; + loop { + attempt += 1; + match download_and_unzip_file(storage_dir, import_task, s3_client).await { + Ok(result) => return Ok(result), + Err(err) if attempt <= max_retries => { + warn!( + "Attempt {} failed: {}. Retrying in {:?}...", + attempt, err, interval + ); + tokio::time::sleep(interval).await; + }, + Err(err) => { + return Err(ImportError::Internal(anyhow!( + "Failed after {} attempts: {}", + attempt, + err + ))); + }, + } + } +} + async fn download_and_unzip_file( + storage_dir: &Path, import_task: &NotionImportTask, s3_client: &Arc, ) -> Result { @@ -305,11 +364,19 @@ async fn download_and_unzip_file( .await .map_err(|err| ImportError::Internal(err.into()))?; let buffer_size = buffer_size_from_content_length(content_length); - let reader = BufReader::with_capacity(buffer_size, stream); - let zip_reader = ZipFileReader::new(reader); + // Read from stream + // let reader = BufReader::with_capacity(buffer_size, stream); + // let zip_reader = async_zip::base::read::stream::ZipFileReader::new(reader); + + // Download first and then read from local file + let zip_file_path = download_file(storage_dir, stream).await?; + + let file = fs::File::open(&zip_file_path).await.unwrap(); + let reader = tokio::io::BufReader::with_capacity(buffer_size, file).compat(); + let zip_reader = async_zip::base::read::stream::ZipFileReader::new(reader); let unique_file_name = Uuid::new_v4().to_string(); - let output_file_path = temp_dir().join(unique_file_name); + let output_file_path = storage_dir.join(unique_file_name); fs::create_dir_all(&output_file_path) .await .map_err(|err| ImportError::Internal(err.into()))?; @@ -323,6 +390,7 @@ async fn download_and_unzip_file( let unzip_file = unzip_stream(zip_reader, output_file_path) .await .map_err(ImportError::Internal)?; + Ok(unzip_file.unzip_dir_path) } diff --git a/services/appflowy-worker/src/s3_client.rs b/services/appflowy-worker/src/s3_client.rs index d5bd8db4..3ec8ab74 100644 --- a/services/appflowy-worker/src/s3_client.rs +++ b/services/appflowy-worker/src/s3_client.rs @@ -2,11 +2,19 @@ use crate::error::WorkerError; use anyhow::anyhow; use aws_sdk_s3::error::SdkError; +use anyhow::Result; use aws_sdk_s3::operation::get_object::GetObjectError; use aws_sdk_s3::primitives::ByteStream; use axum::async_trait; +use futures::AsyncReadExt; use std::ops::Deref; +use std::path::{Path, PathBuf}; +use tokio::fs; +use tokio::fs::File; +use tokio::io::AsyncWriteExt; use tokio_util::compat::TokioAsyncReadCompatExt; +use tracing::error; +use uuid::Uuid; #[async_trait] pub trait S3Client: Send + Sync { @@ -122,3 +130,66 @@ pub struct S3StreamResponse { pub content_type: Option, pub content_length: Option, } + +pub struct AutoRemoveDownloadedFile(PathBuf); + +impl AsRef for AutoRemoveDownloadedFile { + fn as_ref(&self) -> &Path { + &self.0 + } +} + +impl AsRef for AutoRemoveDownloadedFile { + fn as_ref(&self) -> &PathBuf { + &self.0 + } +} + +impl Deref for AutoRemoveDownloadedFile { + type Target = PathBuf; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Drop for AutoRemoveDownloadedFile { + fn drop(&mut self) { + let path = self.0.clone(); + tokio::spawn(async move { + if let Err(err) = fs::remove_file(&path).await { + error!( + "Failed to delete the auto remove downloaded file: {:?}, error: {}", + path, err + ) + } + }); + } +} + +pub async fn download_file( + storage_dir: &Path, + stream: Box, +) -> Result { + let zip_file_path = storage_dir.join(format!("{}.zip", Uuid::new_v4())); + write_stream_to_file(&zip_file_path, stream).await?; + Ok(AutoRemoveDownloadedFile(zip_file_path)) +} + +pub async fn write_stream_to_file( + file_path: &PathBuf, + mut stream: Box, +) -> Result<(), anyhow::Error> { + let mut file = File::create(file_path).await?; + let mut buffer = vec![0u8; 1_048_576]; + loop { + let bytes_read = stream.read(&mut buffer).await?; + if bytes_read == 0 { + break; + } + file.write_all(&buffer[..bytes_read]).await?; + } + file.flush().await?; + + Ok(()) +} diff --git a/src/api/data_import.rs b/src/api/data_import.rs index 839e5d77..a0a75b70 100644 --- a/src/api/data_import.rs +++ b/src/api/data_import.rs @@ -15,6 +15,7 @@ use futures_util::StreamExt; use shared_entity::dto::import_dto::{ImportTaskDetail, ImportTaskStatus, UserImportTask}; use shared_entity::response::{AppResponse, JsonAppResponse}; use std::env::temp_dir; +use std::path::PathBuf; use tokio::fs::File; use tokio::io::AsyncWriteExt; use tracing::{error, info, trace}; @@ -73,45 +74,20 @@ async fn import_data_handler( .and_then(|s| s.parse::().ok()) .unwrap_or(0); - let mut workspace_name = "".to_string(); + let file_path = temp_dir().join(format!("import_data_{}.zip", Uuid::new_v4())); + let file = write_multiple_part(&mut payload, file_path).await?; - // file_name must be unique - let file_name = format!("{}.zip", Uuid::new_v4()); - let file_path = temp_dir().join(&file_name); - - let mut file_size = 0; - let mut file = File::create(&file_path).await?; - while let Some(item) = payload.next().await { - let mut field = item?; - workspace_name = field - .content_disposition() - .and_then(|c| c.get_name().map(|f| f.to_string())) - .unwrap_or_else(|| format!("import-{}", chrono::Local::now().format("%d/%m/%Y %H:%M"))); - - while let Some(chunk) = field.next().await { - let data = chunk?; - file_size += data.len(); - file.write_all(&data).await?; - } - } - file.shutdown().await?; - drop(file); - - if workspace_name.is_empty() { - return Err(AppError::InvalidRequest("Invalid file".to_string()).into()); - } - - if content_length != file_size { + if content_length != file.size { trace!( "Import file fail. The Content-Length:{} doesn't match file size:{}", content_length, - file_size + file.size ); return Err( AppError::InvalidRequest(format!( "Content-Length:{} doesn't match file size:{}", - content_length, file_size + content_length, file.size )) .into(), ); @@ -123,16 +99,16 @@ async fn import_data_handler( &state.collab_access_control_storage, &user_uuid, uid, - &workspace_name, + &file.name, ) .await?; let workspace_id = workspace.workspace_id.to_string(); info!( "User:{} import data:{} to new workspace:{}, name:{}", - uid, file_size, workspace_id, workspace_name, + uid, file.size, workspace_id, file.name, ); - let stream = ByteStream::from_path(&file_path).await.map_err(|e| { + let stream = ByteStream::from_path(&file.file_path).await.map_err(|e| { AppError::Internal(anyhow!("Failed to create ByteStream from file path: {}", e)) })?; state @@ -140,20 +116,13 @@ async fn import_data_handler( .put_blob_as_content_type(&workspace_id, stream, "zip") .await?; - // delete the file after uploading - tokio::spawn(async move { - if let Err(err) = tokio::fs::remove_file(file_path).await { - error!("Failed to delete file after uploading: {}", err); - } - }); - create_upload_task( uid, &user_name, &user_email, &workspace_id, - &workspace_name, - file_size, + &file.name, + file.size, &host, &state.redis_connection_manager, &state.pg_pool, @@ -163,6 +132,61 @@ async fn import_data_handler( Ok(AppResponse::Ok().into()) } +struct AutoDeletedFile { + name: String, + file_path: PathBuf, + size: usize, +} + +impl Drop for AutoDeletedFile { + fn drop(&mut self) { + let path = self.file_path.clone(); + tokio::spawn(async move { + trace!("[AutoDeletedFile]: delete file: {:?}", path); + if let Err(err) = tokio::fs::remove_file(&path).await { + error!( + "Failed to delete the auto deleted file: {:?}, error: {}", + path, err + ) + } + }); + } +} + +async fn write_multiple_part( + payload: &mut Multipart, + file_path: PathBuf, +) -> Result { + let mut file_name = "".to_string(); + let mut file_size = 0; + let mut file = File::create(&file_path).await?; + while let Some(Ok(mut field)) = payload.next().await { + file_name = field + .content_disposition() + .and_then(|c| c.get_name().map(|f| f.to_string())) + .unwrap_or_else(|| format!("import-{}", chrono::Local::now().format("%d/%m/%Y %H:%M"))); + + while let Some(Ok(data)) = field.next().await { + file_size += data.len(); + file.write_all(&data).await?; + } + } + file.shutdown().await?; + drop(file); + + if file_name.is_empty() { + return Err(AppError::InvalidRequest( + "Can not get the file name".to_string(), + )); + } + + Ok(AutoDeletedFile { + name: file_name, + file_path, + size: file_size, + }) +} + fn get_host_from_request(req: &HttpRequest) -> String { req .headers() diff --git a/src/biz/data_import/mod.rs b/src/biz/data_import/mod.rs new file mode 100644 index 00000000..d0b1645e --- /dev/null +++ b/src/biz/data_import/mod.rs @@ -0,0 +1,46 @@ +use actix_multipart::Multipart; +use anyhow::Result; +use async_zip::base::write::ZipFileWriter; +use async_zip::{Compression, ZipEntryBuilder}; +use futures_lite::{AsyncWriteExt, StreamExt}; +use std::path::PathBuf; +use tokio::fs::File; +use tokio_util::compat::TokioAsyncWriteCompatExt; +use uuid::Uuid; + +#[allow(dead_code)] +pub async fn create_archive( + mut body: Multipart, + file_path: &PathBuf, +) -> Result<(String, usize), anyhow::Error> { + let mut file_name = "".to_string(); + let mut file_size = 0; + + let archive = File::create(file_path).await?.compat_write(); + let mut writer = ZipFileWriter::new(archive); + + while let Some(Ok(mut field)) = body.next().await { + let name = match field.content_disposition().and_then(|c| c.get_filename()) { + Some(filename) => sanitize_filename::sanitize(filename), + None => Uuid::new_v4().to_string(), + }; + + if file_name.is_empty() { + file_name = field + .content_disposition() + .and_then(|c| c.get_name().map(|f| f.to_string())) + .unwrap_or_else(|| format!("import-{}", chrono::Local::now().format("%d/%m/%Y %H:%M"))); + } + + // Build the zip entry + let builder = ZipEntryBuilder::new(name.into(), Compression::Deflate); + let mut entry_writer = writer.write_entry_stream(builder).await?; + while let Some(Ok(chunk)) = field.next().await { + file_size += chunk.len(); + entry_writer.write_all(&chunk).await?; + } + entry_writer.close().await?; + } + writer.close().await?; + Ok((file_name, file_size)) +} diff --git a/src/biz/mod.rs b/src/biz/mod.rs index 9496d008..b6378c71 100644 --- a/src/biz/mod.rs +++ b/src/biz/mod.rs @@ -1,6 +1,7 @@ pub mod access_request; pub mod chat; pub mod collab; +pub mod data_import; pub mod pg_listener; pub mod search; pub mod template;