chore: recreate group if it isn't exist (#1001)

This commit is contained in:
Nathan.fooo 2024-11-17 12:14:50 +08:00 committed by GitHub
parent 7c6a706bbd
commit dcbc84dacc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 48 additions and 21 deletions

View File

@ -16,6 +16,9 @@ pub enum WorkerError {
#[error("S3 service unavailable: {0}")]
S3ServiceUnavailable(String),
#[error("Redis stream group not exist: {0}")]
StreamGroupNotExist(String),
#[error(transparent)]
Internal(#[from] anyhow::Error),
}

View File

@ -80,10 +80,7 @@ pub async fn run_import_worker(
tick_interval_secs: u64,
) -> Result<(), ImportError> {
info!("Starting importer worker");
if let Err(err) = ensure_consumer_group(stream_name, GROUP_NAME, &mut redis_client)
.await
.map_err(ImportError::Internal)
{
if let Err(err) = ensure_consumer_group(stream_name, GROUP_NAME, &mut redis_client).await {
error!("Failed to ensure consumer group: {:?}", err);
}
@ -179,6 +176,7 @@ async fn process_upcoming_tasks(
loop {
interval.tick().await;
let tasks: StreamReadReply = match redis_client
.xread_options(&[stream_name], &[">"], &options)
.await
@ -186,6 +184,17 @@ async fn process_upcoming_tasks(
Ok(tasks) => tasks,
Err(err) => {
error!("Failed to read tasks from Redis stream: {:?}", err);
// Use command:
// docker exec -it appflowy-cloud-redis-1 redis-cli FLUSHDB to generate the error
// NOGROUP: No such key 'import_task_stream' or consumer group 'import_task_group' in XREADGROUP with GROUP option
if let Some(code) = err.code() {
if code == "NOGROUP" {
if let Err(err) = ensure_consumer_group(stream_name, GROUP_NAME, redis_client).await {
error!("Failed to ensure consumer group: {:?}", err);
}
}
}
continue;
},
};
@ -198,6 +207,7 @@ async fn process_upcoming_tasks(
Ok(import_task) => {
let stream_name = stream_name.to_string();
let group_name = group_name.to_string();
let context = TaskContext {
storage_dir: storage_dir.to_path_buf(),
redis_client: redis_client.clone(),
@ -206,7 +216,8 @@ async fn process_upcoming_tasks(
notifier: notifier.clone(),
metrics: metrics.clone(),
};
task_handlers.push(spawn_local(async move {
let handle = spawn_local(async move {
consume_task(
context,
import_task,
@ -216,7 +227,8 @@ async fn process_upcoming_tasks(
)
.await?;
Ok::<(), ImportError>(())
}));
});
task_handlers.push(handle);
},
Err(err) => {
error!("Failed to deserialize task: {:?}", err);
@ -233,6 +245,7 @@ async fn process_upcoming_tasks(
}
}
}
info!("[Import] stop reading tasks from stream");
}
#[derive(Clone)]
struct TaskContext {
@ -280,8 +293,12 @@ async fn consume_task(
if task.last_process_at.is_none() {
task.last_process_at = Some(Utc::now().timestamp());
}
process_and_ack_task(context, import_task, stream_name, group_name, &entry_id).await
} else {
trace!("[Import] {} file not found, queue task", task.workspace_id);
info!(
"[Import] {} zip file not found, queue task",
task.workspace_id
);
push_task(
&mut context.redis_client,
stream_name,
@ -290,12 +307,12 @@ async fn consume_task(
&entry_id,
)
.await?;
return Ok(());
Ok(())
}
}
// Process and acknowledge the task
} else {
// If the task is not a notion task, proceed directly to processing
process_and_ack_task(context, import_task, stream_name, group_name, &entry_id).await
}
}
async fn handle_expired_task(
@ -308,7 +325,7 @@ async fn handle_expired_task(
reason: &str,
) -> Result<(), ImportError> {
info!(
"[Import]: {} import is expired with reason:{}, delete workspace",
"[Import]: {} import is expired with reason:{}",
task.workspace_id, reason
);
@ -323,6 +340,7 @@ async fn handle_expired_task(
ImportError::Internal(e.into())
})?;
remove_workspace(&import_record.workspace_id, &context.pg_pool).await;
info!("[Import]: deleted workspace {}", task.workspace_id);
if let Err(err) = context.s3_client.delete_blob(task.s3_key.as_str()).await {
error!(
@ -330,7 +348,12 @@ async fn handle_expired_task(
task.workspace_id, err
);
}
let _ = xack_task(&mut context.redis_client, stream_name, group_name, entry_id).await;
if let Err(err) = xack_task(&mut context.redis_client, stream_name, group_name, entry_id).await {
error!(
"[Import] failed to acknowledge task:{} error:{:?}",
task.workspace_id, err
);
}
notify_user(
task,
Err(ImportError::UploadFileExpire),
@ -388,7 +411,7 @@ fn is_task_expired(created_timestamp: i64, last_process_at: Option<i64>) -> Resu
if elapsed.num_hours() >= hours {
return Err(format!(
"[Import] task is expired: created_at: {}, last_process_at: {:?}, elapsed: {} hours",
"task is expired: created_at: {}, last_process_at: {:?}, elapsed: {} hours",
created_at.format("%m/%d/%y %H:%M"),
last_process_at,
elapsed.num_hours()
@ -485,10 +508,7 @@ async fn process_task(
.parse()
.unwrap_or(false);
info!(
"[Import]: Processing task: {}, retry interval: {}, streaming: {}",
import_task, retry_interval, streaming
);
info!("[Import]: Processing task: {}", import_task);
match import_task {
ImportTask::Notion(task) => {
@ -1285,7 +1305,7 @@ async fn ensure_consumer_group(
stream_key: &str,
group_name: &str,
redis_client: &mut ConnectionManager,
) -> Result<(), anyhow::Error> {
) -> Result<(), WorkerError> {
let result: RedisResult<()> = redis_client
.xgroup_create_mkstream(stream_key, group_name, "0")
.await;
@ -1293,11 +1313,15 @@ async fn ensure_consumer_group(
if let Err(redis_error) = result {
if let Some(code) = redis_error.code() {
if code == "BUSYGROUP" {
return Ok(()); // Group already exists, considered as success.
return Ok(());
}
if code == "NOGROUP" {
return Err(WorkerError::StreamGroupNotExist(group_name.to_string()));
}
}
error!("Error when creating consumer group: {:?}", redis_error);
return Err(redis_error.into());
return Err(WorkerError::Internal(redis_error.into()));
}
Ok(())