chore: delete imported task stream value (#1160)
* chore: delete import task * chore: delete requeue task
This commit is contained in:
parent
4ddb08e0eb
commit
5a2f54bfdd
|
|
@ -375,7 +375,8 @@ async fn handle_expired_task(
|
|||
task.workspace_id, err
|
||||
);
|
||||
}
|
||||
if let Err(err) = xack_task(&mut context.redis_client, stream_name, group_name, entry_id).await {
|
||||
if let Err(err) = delete_task(&mut context.redis_client, stream_name, group_name, entry_id).await
|
||||
{
|
||||
error!(
|
||||
"[Import] failed to acknowledge task:{} error:{:?}",
|
||||
task.workspace_id, err
|
||||
|
|
@ -409,7 +410,7 @@ async fn process_and_ack_task(
|
|||
entry_id: &str,
|
||||
) -> Result<(), ImportError> {
|
||||
let result = process_task(context.clone(), import_task).await;
|
||||
xack_task(&mut context.redis_client, stream_name, group_name, entry_id)
|
||||
delete_task(&mut context.redis_client, stream_name, group_name, entry_id)
|
||||
.await
|
||||
.ok();
|
||||
result
|
||||
|
|
@ -471,7 +472,7 @@ fn is_task_expired(created_timestamp: i64, last_process_at: Option<i64>) -> Resu
|
|||
async fn push_task(
|
||||
redis_client: &mut ConnectionManager,
|
||||
stream_name: &str,
|
||||
group_name: &str,
|
||||
_group_name: &str,
|
||||
task: ImportTask,
|
||||
entry_id: &str,
|
||||
) -> Result<(), ImportError> {
|
||||
|
|
@ -483,11 +484,10 @@ async fn push_task(
|
|||
let mut pipeline = redis::pipe();
|
||||
pipeline
|
||||
.atomic() // Ensures the commands are executed atomically
|
||||
.cmd("XACK") // Acknowledge the task
|
||||
.cmd("XDEL") // delete the task
|
||||
.arg(stream_name)
|
||||
.arg(group_name)
|
||||
.arg(entry_id)
|
||||
.ignore() // Ignore the result of XACK
|
||||
.ignore() // Ignore the result of XDEL
|
||||
.cmd("XADD") // Re-add the task to the stream
|
||||
.arg(stream_name)
|
||||
.arg("*")
|
||||
|
|
@ -507,17 +507,17 @@ async fn push_task(
|
|||
}
|
||||
}
|
||||
|
||||
async fn xack_task(
|
||||
async fn delete_task(
|
||||
redis_client: &mut ConnectionManager,
|
||||
stream_name: &str,
|
||||
group_name: &str,
|
||||
_group_name: &str,
|
||||
entry_id: &str,
|
||||
) -> Result<(), ImportError> {
|
||||
let _: () = redis_client
|
||||
.xack(stream_name, group_name, &[entry_id])
|
||||
.xdel(stream_name, &[entry_id])
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error!("Failed to acknowledge task: {:?}", e);
|
||||
error!("Failed to delete import task: {:?}", e);
|
||||
ImportError::Internal(e.into())
|
||||
})?;
|
||||
Ok(())
|
||||
|
|
|
|||
Loading…
Reference in New Issue