From d89cbe1c8c3afc5cc44180418d97de01097e418e Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Wed, 16 Oct 2024 14:02:03 +0800 Subject: [PATCH] chore: support streaming unzip using env (#888) --- .../src/import_worker/worker.rs | 73 +++++++++++++++---- 1 file changed, 57 insertions(+), 16 deletions(-) diff --git a/services/appflowy-worker/src/import_worker/worker.rs b/services/appflowy-worker/src/import_worker/worker.rs index ed476f2f..e1bf3821 100644 --- a/services/appflowy-worker/src/import_worker/worker.rs +++ b/services/appflowy-worker/src/import_worker/worker.rs @@ -27,8 +27,11 @@ use database::workspace::{ }; use database_entity::dto::CollabParams; +use async_zip::base::read::stream::{Ready, ZipFileReader}; + use futures::stream::FuturesUnordered; -use futures::{stream, StreamExt}; +use futures::{stream, AsyncBufRead, StreamExt}; +use infra::env_util::get_env_var; use redis::aio::ConnectionManager; use redis::streams::{ StreamClaimOptions, StreamClaimReply, StreamId, StreamPendingReply, StreamReadOptions, @@ -46,6 +49,7 @@ use std::fs::Permissions; use std::ops::DerefMut; use std::os::unix::fs::PermissionsExt; use std::path::{Path, PathBuf}; +use std::pin::Pin; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; @@ -267,12 +271,21 @@ async fn process_task( ) -> Result<(), ImportError> { trace!("[Import]: Processing task: {}", import_task); + let retry_interval: u64 = get_env_var("APPFLOWY_WORKER_IMPORT_NOTION_RETRY_INTERVAL", "10") + .parse() + .unwrap_or(10); + match import_task { ImportTask::Notion(task) => { // 1. download zip file - let unzip_result = - download_and_unzip_file_retry(storage_dir, &task, s3_client, 3, Duration::from_secs(5)) - .await; + let unzip_result = download_and_unzip_file_retry( + storage_dir, + &task, + s3_client, + 3, + Duration::from_secs(retry_interval), + ) + .await; match unzip_result { Ok(unzip_dir_path) => { // 2. process unzip file @@ -319,7 +332,11 @@ async fn process_task( }, } } - +/// Retries the download and unzipping of a file from an S3 source. +/// +/// This function attempts to download a zip file from an S3 bucket and unzip it to a local directory. +/// If the operation fails, it will retry up to `max_retries` times, waiting for `interval` between each attempt. +/// pub async fn download_and_unzip_file_retry( storage_dir: &Path, import_task: &NotionImportTask, @@ -349,7 +366,11 @@ pub async fn download_and_unzip_file_retry( } } } - +/// Downloads a zip file from S3 and unzips it to the local directory. +/// +/// This function fetches a zip file from an S3 source using the provided S3 client, +/// downloads it (if needed), and unzips the contents to the specified local directory. +/// async fn download_and_unzip_file( storage_dir: &Path, import_task: &NotionImportTask, @@ -365,16 +386,7 @@ async fn download_and_unzip_file( .map_err(|err| ImportError::Internal(err.into()))?; let buffer_size = buffer_size_from_content_length(content_length); - // Read from stream - // let reader = BufReader::with_capacity(buffer_size, stream); - // let zip_reader = async_zip::base::read::stream::ZipFileReader::new(reader); - - // Download first and then read from local file - let zip_file_path = download_file(storage_dir, stream).await?; - - let file = fs::File::open(&zip_file_path).await.unwrap(); - let reader = tokio::io::BufReader::with_capacity(buffer_size, file).compat(); - let zip_reader = async_zip::base::read::stream::ZipFileReader::new(reader); + let zip_reader = get_zip_reader(storage_dir, stream, buffer_size).await?; let unique_file_name = Uuid::new_v4().to_string(); let output_file_path = storage_dir.join(unique_file_name); fs::create_dir_all(&output_file_path) @@ -394,6 +406,35 @@ async fn download_and_unzip_file( Ok(unzip_file.unzip_dir_path) } +/// Asynchronously returns a `ZipFileReader` that can read from a stream or a downloaded file, based on the environment setting. +/// +/// This function checks whether streaming is enabled via the `APPFLOWY_WORKER_IMPORT_NOTION_STREAMING` environment variable. +/// If streaming is enabled, it reads the zip file directly from the provided stream. +/// Otherwise, it first downloads the zip file to a local file and then reads from it. +/// +async fn get_zip_reader( + storage_dir: &Path, + stream: Box, + buffer_size: usize, +) -> Result>>>, ImportError> { + let streaming = get_env_var("APPFLOWY_WORKER_IMPORT_NOTION_STREAMING", "true") + .parse() + .unwrap_or(true); + + let zip_reader = if streaming { + let reader = futures::io::BufReader::with_capacity(buffer_size, stream); + let boxed_reader: Pin> = Box::pin(reader); + async_zip::base::read::stream::ZipFileReader::new(boxed_reader) + } else { + let zip_file_path = download_file(storage_dir, stream).await?; + let file = fs::File::open(&zip_file_path).await.unwrap(); + let reader = tokio::io::BufReader::with_capacity(buffer_size, file).compat(); + let boxed_reader: Pin> = Box::pin(reader); + async_zip::base::read::stream::ZipFileReader::new(boxed_reader) + }; + Ok(zip_reader) +} + /// Determines the buffer size based on the content length of the file. /// If the buffer is too small, the zip reader will frequently pause to fetch more data, /// causing delays. This can make the unzip process appear slower and can even cause premature