chore: retry upload file when get s3 service error (#993)

* chore: retry upload file when get s3 service error

* chore: ingore upload payload error
This commit is contained in:
Nathan.fooo 2024-11-14 14:39:36 +08:00 committed by GitHub
parent f6fef9918b
commit 06a6ea2985
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 146 additions and 56 deletions

View File

@ -175,6 +175,9 @@ pub enum AppError {
#[error("There is an invalid character in the publish namespace: {character}")]
CustomNamespaceInvalidCharacter { character: char },
#[error("{0}")]
ServiceTemporaryUnavailable(String),
}
impl AppError {
@ -251,6 +254,7 @@ impl AppError {
AppError::CustomNamespaceInvalidCharacter { .. } => {
ErrorCode::CustomNamespaceInvalidCharacter
},
AppError::ServiceTemporaryUnavailable(_) => ErrorCode::ServiceTemporaryUnavailable,
}
}
}
@ -390,6 +394,7 @@ pub enum ErrorCode {
PublishNameInvalidCharacter = 1051,
PublishNameTooLong = 1052,
CustomNamespaceInvalidCharacter = 1053,
ServiceTemporaryUnavailable = 1054,
}
impl ErrorCode {

View File

@ -25,7 +25,7 @@ pub trait BucketClient {
async fn put_blob(&self, object_key: &str, content: &[u8]) -> Result<(), AppError>;
async fn put_blob_as_content_type(
async fn put_blob_with_content_type(
&self,
object_key: &str,
stream: ByteStream,

View File

@ -4,12 +4,13 @@ use app_error::AppError;
use async_trait::async_trait;
use aws_sdk_s3::operation::delete_object::DeleteObjectOutput;
use aws_sdk_s3::error::SdkError;
use std::ops::Deref;
use std::time::{Duration, SystemTime};
use aws_sdk_s3::error::SdkError;
use aws_sdk_s3::operation::delete_objects::DeleteObjectsOutput;
use aws_sdk_s3::operation::get_object::GetObjectError;
use aws_sdk_s3::presigning::PresigningConfig;
use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart, Delete, ObjectIdentifier};
@ -133,12 +134,17 @@ impl BucketClient for AwsS3BucketClientImpl {
.body(body)
.send()
.await
.map_err(|err| anyhow!("Failed to upload object to S3: {}", err))?;
.map_err(|err| match err {
SdkError::TimeoutError(_) | SdkError::DispatchFailure(_) | SdkError::ServiceError(_) => {
AppError::ServiceTemporaryUnavailable(format!("Failed to upload object to S3: {}", err))
},
_ => AppError::Internal(anyhow!("Failed to upload object to S3: {}", err)),
})?;
Ok(())
}
async fn put_blob_as_content_type(
async fn put_blob_with_content_type(
&self,
object_key: &str,
stream: ByteStream,
@ -158,7 +164,12 @@ impl BucketClient for AwsS3BucketClientImpl {
.content_type(content_type)
.send()
.await
.map_err(|err| anyhow!("Failed to upload object to S3: {}", err))?;
.map_err(|err| match err {
SdkError::TimeoutError(_) | SdkError::DispatchFailure(_) | SdkError::ServiceError(_) => {
AppError::ServiceTemporaryUnavailable(format!("Failed to upload object to S3: {}", err))
},
_ => AppError::Internal(anyhow!("Failed to upload object to S3: {}", err)),
})?;
Ok(())
}

View File

@ -13,6 +13,9 @@ pub enum WorkerError {
#[error(transparent)]
ImportError(#[from] ImportError),
#[error("S3 service unavailable: {0}")]
S3ServiceUnavailable(String),
#[error(transparent)]
Internal(#[from] anyhow::Error),
}

View File

@ -3,7 +3,7 @@ use crate::s3_client::{download_file, AutoRemoveDownloadedFile, S3StreamResponse
use anyhow::anyhow;
use aws_sdk_s3::primitives::ByteStream;
use crate::error::ImportError;
use crate::error::{ImportError, WorkerError};
use crate::mailer::ImportNotionMailerParam;
use crate::s3_client::S3Client;
@ -46,7 +46,7 @@ use database::pg_row::AFImportTask;
use serde::{Deserialize, Serialize};
use serde_json::from_str;
use sqlx::types::chrono;
use sqlx::types::chrono::{DateTime, Utc};
use sqlx::types::chrono::{DateTime, TimeZone, Utc};
use sqlx::PgPool;
use std::collections::{HashMap, HashSet};
use std::env::temp_dir;
@ -259,7 +259,7 @@ async fn consume_task(
}
// Check if the task is expired
if is_task_expired(task.created_at.unwrap(), task.last_process_at) {
if let Err(err) = is_task_expired(task.created_at.unwrap(), task.last_process_at) {
if let Ok(import_record) = select_import_task(&context.pg_pool, &task.task_id).await {
handle_expired_task(
&mut context,
@ -268,6 +268,7 @@ async fn consume_task(
stream_name,
group_name,
&entry_id,
&err,
)
.await?;
}
@ -304,10 +305,11 @@ async fn handle_expired_task(
stream_name: &str,
group_name: &str,
entry_id: &str,
reason: &str,
) -> Result<(), ImportError> {
info!(
"[Import]: {} import is expired, delete workspace",
task.workspace_id
"[Import]: {} import is expired with reason:{}, delete workspace",
task.workspace_id, reason
);
update_import_task_status(
@ -363,20 +365,20 @@ async fn process_and_ack_task(
result
}
fn is_task_expired(created_timestamp: i64, last_process_at: Option<i64>) -> bool {
match DateTime::<Utc>::from_timestamp(created_timestamp, 0) {
None => {
info!("[Import] failed to parse timestamp: {}", created_timestamp);
true
},
fn is_task_expired(created_timestamp: i64, last_process_at: Option<i64>) -> Result<(), String> {
match Utc.timestamp_opt(created_timestamp, 0).single() {
None => Err(format!(
"[Import] failed to parse timestamp: {}",
created_timestamp
)),
Some(created_at) => {
let now = Utc::now();
if created_at > now {
error!(
return Err(format!(
"[Import] created_at is in the future: {} > {}",
created_at, now
);
return false;
created_at.format("%m/%d/%y %H:%M"),
now.format("%m/%d/%y %H:%M")
));
}
let elapsed = now - created_at;
@ -385,18 +387,33 @@ fn is_task_expired(created_timestamp: i64, last_process_at: Option<i64>) -> bool
.unwrap_or(6);
if elapsed.num_hours() >= hours {
return true;
return Err(format!(
"[Import] task is expired: created_at: {}, last_process_at: {:?}, elapsed: {} hours",
created_at.format("%m/%d/%y %H:%M"),
last_process_at,
elapsed.num_hours()
));
}
if last_process_at.is_none() {
return false;
return Ok(());
}
let elapsed = now - created_at;
let minutes = get_env_var("APPFLOWY_WORKER_IMPORT_TASK_EXPIRE_MINUTES", "30")
.parse::<i64>()
.unwrap_or(30);
elapsed.num_minutes() >= minutes
if elapsed.num_minutes() >= minutes {
Err(format!(
"[Import] task is expired: created_at: {}, last_process_at: {:?}, elapsed: {} minutes",
created_at.format("%m/%d/%y %H:%M"),
last_process_at,
elapsed.num_minutes()
))
} else {
Ok(())
}
},
}
}
@ -513,16 +530,18 @@ async fn process_task(
remove_workspace(&task.workspace_id, &context.pg_pool).await;
}
match fs::remove_dir_all(&unzip_dir_path).await {
Ok(_) => info!(
"[Import]: {} deleted unzip file: {:?}",
task.workspace_id, unzip_dir_path
),
Err(err) => error!("Failed to delete unzip file: {:?}", err),
}
clean_up(&context.s3_client, &task).await;
notify_user(&task, result, context.notifier, &context.metrics).await?;
tokio::spawn(async move {
match fs::remove_dir_all(&unzip_dir_path).await {
Ok(_) => info!(
"[Import]: {} deleted unzip file: {:?}",
task.workspace_id, unzip_dir_path
),
Err(err) => error!("Failed to delete unzip file: {:?}", err),
}
});
},
Err(err) => {
// If there is any errors when download or unzip the file, we will remove the file from S3 and notify the user.
@ -706,10 +725,9 @@ async fn download_and_unzip_file(
.await
.map_err(|err| ImportError::Internal(err.into()))??;
trace!(
"[Import] {} finish unzip file: {:?}",
import_task.workspace_id,
unzip_file.unzip_dir
info!(
"[Import] {} finish unzip file to dir:{}, file:{:?}",
import_task.workspace_id, unzip_file.dir_name, unzip_file.unzip_dir
);
Ok(unzip_file.unzip_dir)
}
@ -1197,11 +1215,12 @@ async fn batch_upload_files_to_s3(
.buffer_unordered(5);
let results: Vec<_> = upload_stream.collect().await;
let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect();
if errors.is_empty() {
Ok(())
} else {
Err(anyhow!("Some uploads failed: {:?}", errors))
if !errors.is_empty() {
error!("Some uploads failed: {:?}", errors);
}
Ok(())
}
async fn upload_file_to_s3(
@ -1217,12 +1236,29 @@ async fn upload_file_to_s3(
return Err(anyhow!("File does not exist: {:?}", path));
}
let mut attempt = 0;
let max_retries = 2;
let object_key = format!("{}/{}/{}", workspace_id, object_id, file_id);
let byte_stream = ByteStream::from_path(path).await?;
client
.put_blob(&object_key, byte_stream, Some(file_type))
.await?;
Ok(())
while attempt <= max_retries {
let byte_stream = ByteStream::from_path(path).await?;
match client
.put_blob(&object_key, byte_stream, Some(file_type))
.await
{
Ok(_) => return Ok(()),
Err(WorkerError::S3ServiceUnavailable(_)) if attempt < max_retries => {
attempt += 1;
tokio::time::sleep(Duration::from_secs(3)).await;
},
Err(err) => return Err(err.into()),
}
}
Err(anyhow!(
"Failed to upload file to S3 after {} attempts",
max_retries + 1
))
}
async fn get_encode_collab_from_bytes(

View File

@ -128,10 +128,18 @@ impl S3Client for S3ClientImpl {
.await
{
Ok(_) => Ok(()),
Err(err) => Err(WorkerError::from(anyhow!(
"Failed to put object to S3: {}",
err
))),
Err(err) => match err {
SdkError::TimeoutError(_) | SdkError::DispatchFailure(_) | SdkError::ServiceError(_) => {
Err(WorkerError::S3ServiceUnavailable(format!(
"Failed to upload object to S3: {}",
err
)))
},
_ => Err(WorkerError::Internal(anyhow!(
"Failed to upload object to S3: {}",
err
))),
},
}
}

View File

@ -220,13 +220,8 @@ async fn import_data_handler(
"User:{} import data:{} to new workspace:{}, name:{}",
uid, file.size, workspace_id, file.name,
);
let stream = ByteStream::from_path(&file.file_path).await.map_err(|e| {
AppError::Internal(anyhow!("Failed to create ByteStream from file path: {}", e))
})?;
state
.bucket_client
.put_blob_as_content_type(&workspace_id, stream, "application/zip")
.await?;
upload_file_with_retry(&state, &workspace_id, &file.file_path).await?;
// This task will be deserialized into ImportTask
let task_id = Uuid::new_v4();
@ -260,6 +255,38 @@ async fn import_data_handler(
Ok(AppResponse::Ok().into())
}
async fn upload_file_with_retry(
state: &AppState,
workspace_id: &str,
file_path: &PathBuf,
) -> Result<(), AppError> {
let mut attempt = 0;
let max_retries = 3;
while attempt <= max_retries {
let stream = ByteStream::from_path(file_path).await.map_err(|e| {
AppError::Internal(anyhow!("Failed to create ByteStream from file path: {}", e))
})?;
let result = state
.bucket_client
.put_blob_with_content_type(workspace_id, stream, "application/zip")
.await;
match result {
Ok(_) => return Ok(()),
Err(AppError::ServiceTemporaryUnavailable(_)) if attempt < max_retries => {
attempt += 1;
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
},
Err(err) => return Err(err),
}
}
Err(AppError::ServiceTemporaryUnavailable(
"Failed to upload file to S3".to_string(),
))
}
async fn check_maximum_task(state: &Data<AppState>, uid: i64) -> Result<(), AppError> {
let count = num_pending_task(uid, &state.pg_pool).await?;
let maximum_pending_task = get_env_var("MAXIMUM_IMPORT_PENDING_TASK", "3")

View File

@ -448,7 +448,7 @@ pub async fn upload_avatar(
let object_key = avatar_object_key(&file_id);
client
.put_blob_as_content_type(
.put_blob_with_content_type(
&object_key,
ByteStream::from(avatar.data.to_vec()),
&content_type,