diff --git a/services/appflowy-worker/src/import_worker/worker.rs b/services/appflowy-worker/src/import_worker/worker.rs index 8dae8175..d8718b56 100644 --- a/services/appflowy-worker/src/import_worker/worker.rs +++ b/services/appflowy-worker/src/import_worker/worker.rs @@ -253,37 +253,47 @@ async fn consume_task( entry_id: String, ) -> Result<(), ImportError> { if let ImportTask::Notion(task) = &mut import_task { - if let Some(created_at_timestamp) = task.created_at { - if is_task_expired(created_at_timestamp, task.last_process_at) { - if let Ok(import_record) = select_import_task(&context.pg_pool, &task.task_id).await { - handle_expired_task( - &mut context, - &import_record, - task, - stream_name, - group_name, - &entry_id, - ) - .await?; - } + // If no created_at timestamp, proceed directly to processing + if task.created_at.is_none() { + return process_and_ack_task(context, import_task, stream_name, group_name, &entry_id).await; + } - return Ok(()); - } else if !check_blob_existence(&context.s3_client, &task.s3_key).await? { - task.last_process_at = Some(Utc::now().timestamp()); - trace!("[Import] {} file not found, re-add task", task.workspace_id); - re_add_task( - &mut context.redis_client, + // Check if the task is expired + if is_task_expired(task.created_at.unwrap(), task.last_process_at) { + if let Ok(import_record) = select_import_task(&context.pg_pool, &task.task_id).await { + handle_expired_task( + &mut context, + &import_record, + task, stream_name, group_name, - import_task, &entry_id, ) .await?; - return Ok(()); } + return Ok(()); + } + + // Check if the blob exists + if check_blob_existence(&context.s3_client, &task.s3_key).await? { + if task.last_process_at.is_none() { + task.last_process_at = Some(Utc::now().timestamp()); + } + } else { + trace!("[Import] {} file not found, queue task", task.workspace_id); + push_task( + &mut context.redis_client, + stream_name, + group_name, + import_task, + &entry_id, + ) + .await?; + return Ok(()); } } + // Process and acknowledge the task process_and_ack_task(context, import_task, stream_name, group_name, &entry_id).await } @@ -353,14 +363,10 @@ async fn process_and_ack_task( result } -fn is_task_expired(timestamp: i64, last_process_at: Option) -> bool { - if last_process_at.is_none() { - return false; - } - - match DateTime::::from_timestamp(timestamp, 0) { +fn is_task_expired(created_timestamp: i64, last_process_at: Option) -> bool { + match DateTime::::from_timestamp(created_timestamp, 0) { None => { - info!("[Import] failed to parse timestamp: {}", timestamp); + info!("[Import] failed to parse timestamp: {}", created_timestamp); true }, Some(created_at) => { @@ -374,15 +380,28 @@ fn is_task_expired(timestamp: i64, last_process_at: Option) -> bool { } let elapsed = now - created_at; - let minutes = get_env_var("APPFLOWY_WORKER_IMPORT_TASK_EXPIRE_MINUTES", "20") + let hours = get_env_var("APPFLOWY_WORKER_IMPORT_TASK_PROCESS_EXPIRE_HOURS", "6") .parse::() - .unwrap_or(20); + .unwrap_or(6); + + if elapsed.num_hours() >= hours { + return true; + } + + if last_process_at.is_none() { + return false; + } + + let elapsed = now - created_at; + let minutes = get_env_var("APPFLOWY_WORKER_IMPORT_TASK_EXPIRE_MINUTES", "30") + .parse::() + .unwrap_or(30); elapsed.num_minutes() >= minutes }, } } -async fn re_add_task( +async fn push_task( redis_client: &mut ConnectionManager, stream_name: &str, group_name: &str, @@ -494,6 +513,14 @@ async fn process_task( remove_workspace(&task.workspace_id, &context.pg_pool).await; } + match fs::remove_dir_all(&unzip_dir_path).await { + Ok(_) => info!( + "[Import]: {} deleted unzip file: {:?}", + task.workspace_id, unzip_dir_path + ), + Err(err) => error!("Failed to delete unzip file: {:?}", err), + } + clean_up(&context.s3_client, &task).await; notify_user(&task, result, context.notifier, &context.metrics).await?; }, @@ -1067,17 +1094,6 @@ async fn process_unzip_file( batch_upload_files_to_s3(&import_task.workspace_id, s3_client, upload_resources) .await .map_err(|err| ImportError::Internal(anyhow!("Failed to upload files to S3: {:?}", err)))?; - - // 10. delete zip file regardless of success or failure - match fs::remove_dir_all(unzip_dir_path).await { - Ok(_) => trace!( - "[Import]: {} deleted unzip file: {:?}", - import_task.workspace_id, - unzip_dir_path - ), - Err(err) => error!("Failed to delete unzip file: {:?}", err), - } - Ok(()) } diff --git a/tests/search/document_search.rs b/tests/search/document_search.rs index 0b62ca9b..95262a26 100644 --- a/tests/search/document_search.rs +++ b/tests/search/document_search.rs @@ -53,5 +53,7 @@ async fn test_document_indexing_and_search() { assert_eq!(search_resp.len(), 1); let item = &search_resp[0]; assert_eq!(item.object_id, object_id); - assert_eq!(item.preview.as_deref(), Some("\nWelcome to AppFlowy")); + + let preview = item.preview.clone().unwrap(); + assert!(preview.contains("Welcome to AppFlowy")); }