fix: upload file zip file expire (#987)
This commit is contained in:
parent
91b0c50722
commit
50d519c4ea
|
|
@ -253,37 +253,47 @@ async fn consume_task(
|
||||||
entry_id: String,
|
entry_id: String,
|
||||||
) -> Result<(), ImportError> {
|
) -> Result<(), ImportError> {
|
||||||
if let ImportTask::Notion(task) = &mut import_task {
|
if let ImportTask::Notion(task) = &mut import_task {
|
||||||
if let Some(created_at_timestamp) = task.created_at {
|
// If no created_at timestamp, proceed directly to processing
|
||||||
if is_task_expired(created_at_timestamp, task.last_process_at) {
|
if task.created_at.is_none() {
|
||||||
if let Ok(import_record) = select_import_task(&context.pg_pool, &task.task_id).await {
|
return process_and_ack_task(context, import_task, stream_name, group_name, &entry_id).await;
|
||||||
handle_expired_task(
|
}
|
||||||
&mut context,
|
|
||||||
&import_record,
|
|
||||||
task,
|
|
||||||
stream_name,
|
|
||||||
group_name,
|
|
||||||
&entry_id,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
return Ok(());
|
// Check if the task is expired
|
||||||
} else if !check_blob_existence(&context.s3_client, &task.s3_key).await? {
|
if is_task_expired(task.created_at.unwrap(), task.last_process_at) {
|
||||||
task.last_process_at = Some(Utc::now().timestamp());
|
if let Ok(import_record) = select_import_task(&context.pg_pool, &task.task_id).await {
|
||||||
trace!("[Import] {} file not found, re-add task", task.workspace_id);
|
handle_expired_task(
|
||||||
re_add_task(
|
&mut context,
|
||||||
&mut context.redis_client,
|
&import_record,
|
||||||
|
task,
|
||||||
stream_name,
|
stream_name,
|
||||||
group_name,
|
group_name,
|
||||||
import_task,
|
|
||||||
&entry_id,
|
&entry_id,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
return Ok(());
|
|
||||||
}
|
}
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if the blob exists
|
||||||
|
if check_blob_existence(&context.s3_client, &task.s3_key).await? {
|
||||||
|
if task.last_process_at.is_none() {
|
||||||
|
task.last_process_at = Some(Utc::now().timestamp());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
trace!("[Import] {} file not found, queue task", task.workspace_id);
|
||||||
|
push_task(
|
||||||
|
&mut context.redis_client,
|
||||||
|
stream_name,
|
||||||
|
group_name,
|
||||||
|
import_task,
|
||||||
|
&entry_id,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
return Ok(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Process and acknowledge the task
|
||||||
process_and_ack_task(context, import_task, stream_name, group_name, &entry_id).await
|
process_and_ack_task(context, import_task, stream_name, group_name, &entry_id).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -353,14 +363,10 @@ async fn process_and_ack_task(
|
||||||
result
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
fn is_task_expired(timestamp: i64, last_process_at: Option<i64>) -> bool {
|
fn is_task_expired(created_timestamp: i64, last_process_at: Option<i64>) -> bool {
|
||||||
if last_process_at.is_none() {
|
match DateTime::<Utc>::from_timestamp(created_timestamp, 0) {
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
match DateTime::<Utc>::from_timestamp(timestamp, 0) {
|
|
||||||
None => {
|
None => {
|
||||||
info!("[Import] failed to parse timestamp: {}", timestamp);
|
info!("[Import] failed to parse timestamp: {}", created_timestamp);
|
||||||
true
|
true
|
||||||
},
|
},
|
||||||
Some(created_at) => {
|
Some(created_at) => {
|
||||||
|
|
@ -374,15 +380,28 @@ fn is_task_expired(timestamp: i64, last_process_at: Option<i64>) -> bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
let elapsed = now - created_at;
|
let elapsed = now - created_at;
|
||||||
let minutes = get_env_var("APPFLOWY_WORKER_IMPORT_TASK_EXPIRE_MINUTES", "20")
|
let hours = get_env_var("APPFLOWY_WORKER_IMPORT_TASK_PROCESS_EXPIRE_HOURS", "6")
|
||||||
.parse::<i64>()
|
.parse::<i64>()
|
||||||
.unwrap_or(20);
|
.unwrap_or(6);
|
||||||
|
|
||||||
|
if elapsed.num_hours() >= hours {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if last_process_at.is_none() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
let elapsed = now - created_at;
|
||||||
|
let minutes = get_env_var("APPFLOWY_WORKER_IMPORT_TASK_EXPIRE_MINUTES", "30")
|
||||||
|
.parse::<i64>()
|
||||||
|
.unwrap_or(30);
|
||||||
elapsed.num_minutes() >= minutes
|
elapsed.num_minutes() >= minutes
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn re_add_task(
|
async fn push_task(
|
||||||
redis_client: &mut ConnectionManager,
|
redis_client: &mut ConnectionManager,
|
||||||
stream_name: &str,
|
stream_name: &str,
|
||||||
group_name: &str,
|
group_name: &str,
|
||||||
|
|
@ -494,6 +513,14 @@ async fn process_task(
|
||||||
remove_workspace(&task.workspace_id, &context.pg_pool).await;
|
remove_workspace(&task.workspace_id, &context.pg_pool).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
match fs::remove_dir_all(&unzip_dir_path).await {
|
||||||
|
Ok(_) => info!(
|
||||||
|
"[Import]: {} deleted unzip file: {:?}",
|
||||||
|
task.workspace_id, unzip_dir_path
|
||||||
|
),
|
||||||
|
Err(err) => error!("Failed to delete unzip file: {:?}", err),
|
||||||
|
}
|
||||||
|
|
||||||
clean_up(&context.s3_client, &task).await;
|
clean_up(&context.s3_client, &task).await;
|
||||||
notify_user(&task, result, context.notifier, &context.metrics).await?;
|
notify_user(&task, result, context.notifier, &context.metrics).await?;
|
||||||
},
|
},
|
||||||
|
|
@ -1067,17 +1094,6 @@ async fn process_unzip_file(
|
||||||
batch_upload_files_to_s3(&import_task.workspace_id, s3_client, upload_resources)
|
batch_upload_files_to_s3(&import_task.workspace_id, s3_client, upload_resources)
|
||||||
.await
|
.await
|
||||||
.map_err(|err| ImportError::Internal(anyhow!("Failed to upload files to S3: {:?}", err)))?;
|
.map_err(|err| ImportError::Internal(anyhow!("Failed to upload files to S3: {:?}", err)))?;
|
||||||
|
|
||||||
// 10. delete zip file regardless of success or failure
|
|
||||||
match fs::remove_dir_all(unzip_dir_path).await {
|
|
||||||
Ok(_) => trace!(
|
|
||||||
"[Import]: {} deleted unzip file: {:?}",
|
|
||||||
import_task.workspace_id,
|
|
||||||
unzip_dir_path
|
|
||||||
),
|
|
||||||
Err(err) => error!("Failed to delete unzip file: {:?}", err),
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -53,5 +53,7 @@ async fn test_document_indexing_and_search() {
|
||||||
assert_eq!(search_resp.len(), 1);
|
assert_eq!(search_resp.len(), 1);
|
||||||
let item = &search_resp[0];
|
let item = &search_resp[0];
|
||||||
assert_eq!(item.object_id, object_id);
|
assert_eq!(item.object_id, object_id);
|
||||||
assert_eq!(item.preview.as_deref(), Some("\nWelcome to AppFlowy"));
|
|
||||||
|
let preview = item.preview.clone().unwrap();
|
||||||
|
assert!(preview.contains("Welcome to AppFlowy"));
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue