chore: notify user file size (#1161)
This commit is contained in:
parent
5a2f54bfdd
commit
c5cdb06b77
|
|
@ -288,7 +288,23 @@ async fn consume_task(
|
|||
Some(file_size) => {
|
||||
if file_size > context.maximum_import_file_size as i64 {
|
||||
let file_size_in_mb = file_size as f64 / 1_048_576.0;
|
||||
let max_size_in_mb = context.maximum_import_file_size as f64 / 1_048_576.0;
|
||||
let max_size_in_mb = (context.maximum_import_file_size as f64 / 1_048_576.0).ceil();
|
||||
if let Ok(import_record) = select_import_task(&context.pg_pool, &task.task_id).await {
|
||||
handle_failed_task(
|
||||
&mut context,
|
||||
&import_record,
|
||||
task,
|
||||
stream_name,
|
||||
group_name,
|
||||
&entry_id,
|
||||
ImportError::UploadFileTooLarge {
|
||||
file_size_in_mb,
|
||||
max_size_in_mb,
|
||||
},
|
||||
ImportTaskState::Failed,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
return Err(ImportError::UploadFileTooLarge {
|
||||
file_size_in_mb,
|
||||
|
|
@ -299,16 +315,18 @@ async fn consume_task(
|
|||
}
|
||||
|
||||
// Check if the task is expired
|
||||
if let Err(err) = is_task_expired(task.created_at.unwrap(), task.last_process_at) {
|
||||
if let Err(reason) = is_task_expired(task.created_at.unwrap(), task.last_process_at) {
|
||||
if let Ok(import_record) = select_import_task(&context.pg_pool, &task.task_id).await {
|
||||
handle_expired_task(
|
||||
error!("[Import] {} task is expired: {}", task.workspace_id, reason);
|
||||
handle_failed_task(
|
||||
&mut context,
|
||||
&import_record,
|
||||
task,
|
||||
stream_name,
|
||||
group_name,
|
||||
&entry_id,
|
||||
&err,
|
||||
ImportError::UploadFileExpire,
|
||||
ImportTaskState::Expire,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
|
@ -342,30 +360,28 @@ async fn consume_task(
|
|||
}
|
||||
}
|
||||
|
||||
async fn handle_expired_task(
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn handle_failed_task(
|
||||
context: &mut TaskContext,
|
||||
import_record: &AFImportTask,
|
||||
task: &NotionImportTask,
|
||||
stream_name: &str,
|
||||
group_name: &str,
|
||||
entry_id: &str,
|
||||
reason: &str,
|
||||
error: ImportError,
|
||||
task_state: ImportTaskState,
|
||||
) -> Result<(), ImportError> {
|
||||
info!(
|
||||
"[Import]: {} import is expired with reason:{}",
|
||||
task.workspace_id, reason
|
||||
"[Import]: {} import was failed with reason:{}",
|
||||
task.workspace_id, error
|
||||
);
|
||||
|
||||
update_import_task_status(
|
||||
&import_record.task_id,
|
||||
ImportTaskState::Expire,
|
||||
&context.pg_pool,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error!("Failed to update import task status: {:?}", e);
|
||||
ImportError::Internal(e.into())
|
||||
})?;
|
||||
update_import_task_status(&import_record.task_id, task_state, &context.pg_pool)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error!("Failed to update import task status: {:?}", e);
|
||||
ImportError::Internal(e.into())
|
||||
})?;
|
||||
remove_workspace(&import_record.workspace_id, &context.pg_pool).await;
|
||||
info!("[Import]: deleted workspace {}", task.workspace_id);
|
||||
|
||||
|
|
@ -382,13 +398,7 @@ async fn handle_expired_task(
|
|||
task.workspace_id, err
|
||||
);
|
||||
}
|
||||
notify_user(
|
||||
task,
|
||||
Err(ImportError::UploadFileExpire),
|
||||
context.notifier.clone(),
|
||||
&context.metrics,
|
||||
)
|
||||
.await?;
|
||||
notify_user(task, Err(error), context.notifier.clone(), &context.metrics).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue