From 06a6ea298516b1d422cfc797ebbbea8ab374fe91 Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Thu, 14 Nov 2024 14:39:36 +0800 Subject: [PATCH] chore: retry upload file when get s3 service error (#993) * chore: retry upload file when get s3 service error * chore: ingore upload payload error --- libs/app-error/src/lib.rs | 5 + libs/database/src/file/file_storage.rs | 2 +- libs/database/src/file/s3_client_impl.rs | 19 ++- services/appflowy-worker/src/error.rs | 3 + .../src/import_worker/worker.rs | 114 ++++++++++++------ services/appflowy-worker/src/s3_client.rs | 16 ++- src/api/data_import.rs | 41 +++++-- src/biz/template/ops.rs | 2 +- 8 files changed, 146 insertions(+), 56 deletions(-) diff --git a/libs/app-error/src/lib.rs b/libs/app-error/src/lib.rs index 0a2eed68..572f82d8 100644 --- a/libs/app-error/src/lib.rs +++ b/libs/app-error/src/lib.rs @@ -175,6 +175,9 @@ pub enum AppError { #[error("There is an invalid character in the publish namespace: {character}")] CustomNamespaceInvalidCharacter { character: char }, + + #[error("{0}")] + ServiceTemporaryUnavailable(String), } impl AppError { @@ -251,6 +254,7 @@ impl AppError { AppError::CustomNamespaceInvalidCharacter { .. } => { ErrorCode::CustomNamespaceInvalidCharacter }, + AppError::ServiceTemporaryUnavailable(_) => ErrorCode::ServiceTemporaryUnavailable, } } } @@ -390,6 +394,7 @@ pub enum ErrorCode { PublishNameInvalidCharacter = 1051, PublishNameTooLong = 1052, CustomNamespaceInvalidCharacter = 1053, + ServiceTemporaryUnavailable = 1054, } impl ErrorCode { diff --git a/libs/database/src/file/file_storage.rs b/libs/database/src/file/file_storage.rs index f3d2acb5..8d3c2b0c 100644 --- a/libs/database/src/file/file_storage.rs +++ b/libs/database/src/file/file_storage.rs @@ -25,7 +25,7 @@ pub trait BucketClient { async fn put_blob(&self, object_key: &str, content: &[u8]) -> Result<(), AppError>; - async fn put_blob_as_content_type( + async fn put_blob_with_content_type( &self, object_key: &str, stream: ByteStream, diff --git a/libs/database/src/file/s3_client_impl.rs b/libs/database/src/file/s3_client_impl.rs index b847a161..8b20e948 100644 --- a/libs/database/src/file/s3_client_impl.rs +++ b/libs/database/src/file/s3_client_impl.rs @@ -4,12 +4,13 @@ use app_error::AppError; use async_trait::async_trait; use aws_sdk_s3::operation::delete_object::DeleteObjectOutput; -use aws_sdk_s3::error::SdkError; use std::ops::Deref; use std::time::{Duration, SystemTime}; +use aws_sdk_s3::error::SdkError; use aws_sdk_s3::operation::delete_objects::DeleteObjectsOutput; use aws_sdk_s3::operation::get_object::GetObjectError; + use aws_sdk_s3::presigning::PresigningConfig; use aws_sdk_s3::primitives::ByteStream; use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart, Delete, ObjectIdentifier}; @@ -133,12 +134,17 @@ impl BucketClient for AwsS3BucketClientImpl { .body(body) .send() .await - .map_err(|err| anyhow!("Failed to upload object to S3: {}", err))?; + .map_err(|err| match err { + SdkError::TimeoutError(_) | SdkError::DispatchFailure(_) | SdkError::ServiceError(_) => { + AppError::ServiceTemporaryUnavailable(format!("Failed to upload object to S3: {}", err)) + }, + _ => AppError::Internal(anyhow!("Failed to upload object to S3: {}", err)), + })?; Ok(()) } - async fn put_blob_as_content_type( + async fn put_blob_with_content_type( &self, object_key: &str, stream: ByteStream, @@ -158,7 +164,12 @@ impl BucketClient for AwsS3BucketClientImpl { .content_type(content_type) .send() .await - .map_err(|err| anyhow!("Failed to upload object to S3: {}", err))?; + .map_err(|err| match err { + SdkError::TimeoutError(_) | SdkError::DispatchFailure(_) | SdkError::ServiceError(_) => { + AppError::ServiceTemporaryUnavailable(format!("Failed to upload object to S3: {}", err)) + }, + _ => AppError::Internal(anyhow!("Failed to upload object to S3: {}", err)), + })?; Ok(()) } diff --git a/services/appflowy-worker/src/error.rs b/services/appflowy-worker/src/error.rs index 6b363690..daaa98b4 100644 --- a/services/appflowy-worker/src/error.rs +++ b/services/appflowy-worker/src/error.rs @@ -13,6 +13,9 @@ pub enum WorkerError { #[error(transparent)] ImportError(#[from] ImportError), + #[error("S3 service unavailable: {0}")] + S3ServiceUnavailable(String), + #[error(transparent)] Internal(#[from] anyhow::Error), } diff --git a/services/appflowy-worker/src/import_worker/worker.rs b/services/appflowy-worker/src/import_worker/worker.rs index d8718b56..e3289807 100644 --- a/services/appflowy-worker/src/import_worker/worker.rs +++ b/services/appflowy-worker/src/import_worker/worker.rs @@ -3,7 +3,7 @@ use crate::s3_client::{download_file, AutoRemoveDownloadedFile, S3StreamResponse use anyhow::anyhow; use aws_sdk_s3::primitives::ByteStream; -use crate::error::ImportError; +use crate::error::{ImportError, WorkerError}; use crate::mailer::ImportNotionMailerParam; use crate::s3_client::S3Client; @@ -46,7 +46,7 @@ use database::pg_row::AFImportTask; use serde::{Deserialize, Serialize}; use serde_json::from_str; use sqlx::types::chrono; -use sqlx::types::chrono::{DateTime, Utc}; +use sqlx::types::chrono::{DateTime, TimeZone, Utc}; use sqlx::PgPool; use std::collections::{HashMap, HashSet}; use std::env::temp_dir; @@ -259,7 +259,7 @@ async fn consume_task( } // Check if the task is expired - if is_task_expired(task.created_at.unwrap(), task.last_process_at) { + if let Err(err) = is_task_expired(task.created_at.unwrap(), task.last_process_at) { if let Ok(import_record) = select_import_task(&context.pg_pool, &task.task_id).await { handle_expired_task( &mut context, @@ -268,6 +268,7 @@ async fn consume_task( stream_name, group_name, &entry_id, + &err, ) .await?; } @@ -304,10 +305,11 @@ async fn handle_expired_task( stream_name: &str, group_name: &str, entry_id: &str, + reason: &str, ) -> Result<(), ImportError> { info!( - "[Import]: {} import is expired, delete workspace", - task.workspace_id + "[Import]: {} import is expired with reason:{}, delete workspace", + task.workspace_id, reason ); update_import_task_status( @@ -363,20 +365,20 @@ async fn process_and_ack_task( result } -fn is_task_expired(created_timestamp: i64, last_process_at: Option) -> bool { - match DateTime::::from_timestamp(created_timestamp, 0) { - None => { - info!("[Import] failed to parse timestamp: {}", created_timestamp); - true - }, +fn is_task_expired(created_timestamp: i64, last_process_at: Option) -> Result<(), String> { + match Utc.timestamp_opt(created_timestamp, 0).single() { + None => Err(format!( + "[Import] failed to parse timestamp: {}", + created_timestamp + )), Some(created_at) => { let now = Utc::now(); if created_at > now { - error!( + return Err(format!( "[Import] created_at is in the future: {} > {}", - created_at, now - ); - return false; + created_at.format("%m/%d/%y %H:%M"), + now.format("%m/%d/%y %H:%M") + )); } let elapsed = now - created_at; @@ -385,18 +387,33 @@ fn is_task_expired(created_timestamp: i64, last_process_at: Option) -> bool .unwrap_or(6); if elapsed.num_hours() >= hours { - return true; + return Err(format!( + "[Import] task is expired: created_at: {}, last_process_at: {:?}, elapsed: {} hours", + created_at.format("%m/%d/%y %H:%M"), + last_process_at, + elapsed.num_hours() + )); } if last_process_at.is_none() { - return false; + return Ok(()); } let elapsed = now - created_at; let minutes = get_env_var("APPFLOWY_WORKER_IMPORT_TASK_EXPIRE_MINUTES", "30") .parse::() .unwrap_or(30); - elapsed.num_minutes() >= minutes + + if elapsed.num_minutes() >= minutes { + Err(format!( + "[Import] task is expired: created_at: {}, last_process_at: {:?}, elapsed: {} minutes", + created_at.format("%m/%d/%y %H:%M"), + last_process_at, + elapsed.num_minutes() + )) + } else { + Ok(()) + } }, } } @@ -513,16 +530,18 @@ async fn process_task( remove_workspace(&task.workspace_id, &context.pg_pool).await; } - match fs::remove_dir_all(&unzip_dir_path).await { - Ok(_) => info!( - "[Import]: {} deleted unzip file: {:?}", - task.workspace_id, unzip_dir_path - ), - Err(err) => error!("Failed to delete unzip file: {:?}", err), - } - clean_up(&context.s3_client, &task).await; notify_user(&task, result, context.notifier, &context.metrics).await?; + + tokio::spawn(async move { + match fs::remove_dir_all(&unzip_dir_path).await { + Ok(_) => info!( + "[Import]: {} deleted unzip file: {:?}", + task.workspace_id, unzip_dir_path + ), + Err(err) => error!("Failed to delete unzip file: {:?}", err), + } + }); }, Err(err) => { // If there is any errors when download or unzip the file, we will remove the file from S3 and notify the user. @@ -706,10 +725,9 @@ async fn download_and_unzip_file( .await .map_err(|err| ImportError::Internal(err.into()))??; - trace!( - "[Import] {} finish unzip file: {:?}", - import_task.workspace_id, - unzip_file.unzip_dir + info!( + "[Import] {} finish unzip file to dir:{}, file:{:?}", + import_task.workspace_id, unzip_file.dir_name, unzip_file.unzip_dir ); Ok(unzip_file.unzip_dir) } @@ -1197,11 +1215,12 @@ async fn batch_upload_files_to_s3( .buffer_unordered(5); let results: Vec<_> = upload_stream.collect().await; let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect(); - if errors.is_empty() { - Ok(()) - } else { - Err(anyhow!("Some uploads failed: {:?}", errors)) + + if !errors.is_empty() { + error!("Some uploads failed: {:?}", errors); } + + Ok(()) } async fn upload_file_to_s3( @@ -1217,12 +1236,29 @@ async fn upload_file_to_s3( return Err(anyhow!("File does not exist: {:?}", path)); } + let mut attempt = 0; + let max_retries = 2; + let object_key = format!("{}/{}/{}", workspace_id, object_id, file_id); - let byte_stream = ByteStream::from_path(path).await?; - client - .put_blob(&object_key, byte_stream, Some(file_type)) - .await?; - Ok(()) + while attempt <= max_retries { + let byte_stream = ByteStream::from_path(path).await?; + match client + .put_blob(&object_key, byte_stream, Some(file_type)) + .await + { + Ok(_) => return Ok(()), + Err(WorkerError::S3ServiceUnavailable(_)) if attempt < max_retries => { + attempt += 1; + tokio::time::sleep(Duration::from_secs(3)).await; + }, + Err(err) => return Err(err.into()), + } + } + + Err(anyhow!( + "Failed to upload file to S3 after {} attempts", + max_retries + 1 + )) } async fn get_encode_collab_from_bytes( diff --git a/services/appflowy-worker/src/s3_client.rs b/services/appflowy-worker/src/s3_client.rs index d5d109c1..f4514683 100644 --- a/services/appflowy-worker/src/s3_client.rs +++ b/services/appflowy-worker/src/s3_client.rs @@ -128,10 +128,18 @@ impl S3Client for S3ClientImpl { .await { Ok(_) => Ok(()), - Err(err) => Err(WorkerError::from(anyhow!( - "Failed to put object to S3: {}", - err - ))), + Err(err) => match err { + SdkError::TimeoutError(_) | SdkError::DispatchFailure(_) | SdkError::ServiceError(_) => { + Err(WorkerError::S3ServiceUnavailable(format!( + "Failed to upload object to S3: {}", + err + ))) + }, + _ => Err(WorkerError::Internal(anyhow!( + "Failed to upload object to S3: {}", + err + ))), + }, } } diff --git a/src/api/data_import.rs b/src/api/data_import.rs index 8b3d28a1..50215e62 100644 --- a/src/api/data_import.rs +++ b/src/api/data_import.rs @@ -220,13 +220,8 @@ async fn import_data_handler( "User:{} import data:{} to new workspace:{}, name:{}", uid, file.size, workspace_id, file.name, ); - let stream = ByteStream::from_path(&file.file_path).await.map_err(|e| { - AppError::Internal(anyhow!("Failed to create ByteStream from file path: {}", e)) - })?; - state - .bucket_client - .put_blob_as_content_type(&workspace_id, stream, "application/zip") - .await?; + + upload_file_with_retry(&state, &workspace_id, &file.file_path).await?; // This task will be deserialized into ImportTask let task_id = Uuid::new_v4(); @@ -260,6 +255,38 @@ async fn import_data_handler( Ok(AppResponse::Ok().into()) } +async fn upload_file_with_retry( + state: &AppState, + workspace_id: &str, + file_path: &PathBuf, +) -> Result<(), AppError> { + let mut attempt = 0; + let max_retries = 3; + + while attempt <= max_retries { + let stream = ByteStream::from_path(file_path).await.map_err(|e| { + AppError::Internal(anyhow!("Failed to create ByteStream from file path: {}", e)) + })?; + let result = state + .bucket_client + .put_blob_with_content_type(workspace_id, stream, "application/zip") + .await; + + match result { + Ok(_) => return Ok(()), + Err(AppError::ServiceTemporaryUnavailable(_)) if attempt < max_retries => { + attempt += 1; + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + }, + Err(err) => return Err(err), + } + } + + Err(AppError::ServiceTemporaryUnavailable( + "Failed to upload file to S3".to_string(), + )) +} + async fn check_maximum_task(state: &Data, uid: i64) -> Result<(), AppError> { let count = num_pending_task(uid, &state.pg_pool).await?; let maximum_pending_task = get_env_var("MAXIMUM_IMPORT_PENDING_TASK", "3") diff --git a/src/biz/template/ops.rs b/src/biz/template/ops.rs index 1a830af3..76539ffe 100644 --- a/src/biz/template/ops.rs +++ b/src/biz/template/ops.rs @@ -448,7 +448,7 @@ pub async fn upload_avatar( let object_key = avatar_object_key(&file_id); client - .put_blob_as_content_type( + .put_blob_with_content_type( &object_key, ByteStream::from(avatar.data.to_vec()), &content_type,