diff --git a/services/appflowy-worker/src/import_worker/worker.rs b/services/appflowy-worker/src/import_worker/worker.rs index e2b7a68c..80c9e700 100644 --- a/services/appflowy-worker/src/import_worker/worker.rs +++ b/services/appflowy-worker/src/import_worker/worker.rs @@ -375,7 +375,8 @@ async fn handle_expired_task( task.workspace_id, err ); } - if let Err(err) = xack_task(&mut context.redis_client, stream_name, group_name, entry_id).await { + if let Err(err) = delete_task(&mut context.redis_client, stream_name, group_name, entry_id).await + { error!( "[Import] failed to acknowledge task:{} error:{:?}", task.workspace_id, err @@ -409,7 +410,7 @@ async fn process_and_ack_task( entry_id: &str, ) -> Result<(), ImportError> { let result = process_task(context.clone(), import_task).await; - xack_task(&mut context.redis_client, stream_name, group_name, entry_id) + delete_task(&mut context.redis_client, stream_name, group_name, entry_id) .await .ok(); result @@ -471,7 +472,7 @@ fn is_task_expired(created_timestamp: i64, last_process_at: Option) -> Resu async fn push_task( redis_client: &mut ConnectionManager, stream_name: &str, - group_name: &str, + _group_name: &str, task: ImportTask, entry_id: &str, ) -> Result<(), ImportError> { @@ -483,11 +484,10 @@ async fn push_task( let mut pipeline = redis::pipe(); pipeline .atomic() // Ensures the commands are executed atomically - .cmd("XACK") // Acknowledge the task + .cmd("XDEL") // delete the task .arg(stream_name) - .arg(group_name) .arg(entry_id) - .ignore() // Ignore the result of XACK + .ignore() // Ignore the result of XDEL .cmd("XADD") // Re-add the task to the stream .arg(stream_name) .arg("*") @@ -507,17 +507,17 @@ async fn push_task( } } -async fn xack_task( +async fn delete_task( redis_client: &mut ConnectionManager, stream_name: &str, - group_name: &str, + _group_name: &str, entry_id: &str, ) -> Result<(), ImportError> { let _: () = redis_client - .xack(stream_name, group_name, &[entry_id]) + .xdel(stream_name, &[entry_id]) .await .map_err(|e| { - error!("Failed to acknowledge task: {:?}", e); + error!("Failed to delete import task: {:?}", e); ImportError::Internal(e.into()) })?; Ok(())