chore: support streaming unzip using env (#888)

This commit is contained in:
Nathan.fooo 2024-10-16 14:02:03 +08:00 committed by GitHub
parent 63739fc49c
commit d89cbe1c8c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 57 additions and 16 deletions

View File

@ -27,8 +27,11 @@ use database::workspace::{
};
use database_entity::dto::CollabParams;
use async_zip::base::read::stream::{Ready, ZipFileReader};
use futures::stream::FuturesUnordered;
use futures::{stream, StreamExt};
use futures::{stream, AsyncBufRead, StreamExt};
use infra::env_util::get_env_var;
use redis::aio::ConnectionManager;
use redis::streams::{
StreamClaimOptions, StreamClaimReply, StreamId, StreamPendingReply, StreamReadOptions,
@ -46,6 +49,7 @@ use std::fs::Permissions;
use std::ops::DerefMut;
use std::os::unix::fs::PermissionsExt;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
@ -267,12 +271,21 @@ async fn process_task(
) -> Result<(), ImportError> {
trace!("[Import]: Processing task: {}", import_task);
let retry_interval: u64 = get_env_var("APPFLOWY_WORKER_IMPORT_NOTION_RETRY_INTERVAL", "10")
.parse()
.unwrap_or(10);
match import_task {
ImportTask::Notion(task) => {
// 1. download zip file
let unzip_result =
download_and_unzip_file_retry(storage_dir, &task, s3_client, 3, Duration::from_secs(5))
.await;
let unzip_result = download_and_unzip_file_retry(
storage_dir,
&task,
s3_client,
3,
Duration::from_secs(retry_interval),
)
.await;
match unzip_result {
Ok(unzip_dir_path) => {
// 2. process unzip file
@ -319,7 +332,11 @@ async fn process_task(
},
}
}
/// Retries the download and unzipping of a file from an S3 source.
///
/// This function attempts to download a zip file from an S3 bucket and unzip it to a local directory.
/// If the operation fails, it will retry up to `max_retries` times, waiting for `interval` between each attempt.
///
pub async fn download_and_unzip_file_retry(
storage_dir: &Path,
import_task: &NotionImportTask,
@ -349,7 +366,11 @@ pub async fn download_and_unzip_file_retry(
}
}
}
/// Downloads a zip file from S3 and unzips it to the local directory.
///
/// This function fetches a zip file from an S3 source using the provided S3 client,
/// downloads it (if needed), and unzips the contents to the specified local directory.
///
async fn download_and_unzip_file(
storage_dir: &Path,
import_task: &NotionImportTask,
@ -365,16 +386,7 @@ async fn download_and_unzip_file(
.map_err(|err| ImportError::Internal(err.into()))?;
let buffer_size = buffer_size_from_content_length(content_length);
// Read from stream
// let reader = BufReader::with_capacity(buffer_size, stream);
// let zip_reader = async_zip::base::read::stream::ZipFileReader::new(reader);
// Download first and then read from local file
let zip_file_path = download_file(storage_dir, stream).await?;
let file = fs::File::open(&zip_file_path).await.unwrap();
let reader = tokio::io::BufReader::with_capacity(buffer_size, file).compat();
let zip_reader = async_zip::base::read::stream::ZipFileReader::new(reader);
let zip_reader = get_zip_reader(storage_dir, stream, buffer_size).await?;
let unique_file_name = Uuid::new_v4().to_string();
let output_file_path = storage_dir.join(unique_file_name);
fs::create_dir_all(&output_file_path)
@ -394,6 +406,35 @@ async fn download_and_unzip_file(
Ok(unzip_file.unzip_dir_path)
}
/// Asynchronously returns a `ZipFileReader` that can read from a stream or a downloaded file, based on the environment setting.
///
/// This function checks whether streaming is enabled via the `APPFLOWY_WORKER_IMPORT_NOTION_STREAMING` environment variable.
/// If streaming is enabled, it reads the zip file directly from the provided stream.
/// Otherwise, it first downloads the zip file to a local file and then reads from it.
///
async fn get_zip_reader(
storage_dir: &Path,
stream: Box<dyn AsyncBufRead + Unpin + Send>,
buffer_size: usize,
) -> Result<ZipFileReader<Ready<Pin<Box<dyn AsyncBufRead + Unpin + Send>>>>, ImportError> {
let streaming = get_env_var("APPFLOWY_WORKER_IMPORT_NOTION_STREAMING", "true")
.parse()
.unwrap_or(true);
let zip_reader = if streaming {
let reader = futures::io::BufReader::with_capacity(buffer_size, stream);
let boxed_reader: Pin<Box<dyn AsyncBufRead + Unpin + Send>> = Box::pin(reader);
async_zip::base::read::stream::ZipFileReader::new(boxed_reader)
} else {
let zip_file_path = download_file(storage_dir, stream).await?;
let file = fs::File::open(&zip_file_path).await.unwrap();
let reader = tokio::io::BufReader::with_capacity(buffer_size, file).compat();
let boxed_reader: Pin<Box<dyn AsyncBufRead + Unpin + Send>> = Box::pin(reader);
async_zip::base::read::stream::ZipFileReader::new(boxed_reader)
};
Ok(zip_reader)
}
/// Determines the buffer size based on the content length of the file.
/// If the buffer is too small, the zip reader will frequently pause to fetch more data,
/// causing delays. This can make the unzip process appear slower and can even cause premature