diff --git a/services/appflowy-worker/src/error.rs b/services/appflowy-worker/src/error.rs index daaa98b4..77985ae6 100644 --- a/services/appflowy-worker/src/error.rs +++ b/services/appflowy-worker/src/error.rs @@ -16,6 +16,9 @@ pub enum WorkerError { #[error("S3 service unavailable: {0}")] S3ServiceUnavailable(String), + #[error("Redis stream group not exist: {0}")] + StreamGroupNotExist(String), + #[error(transparent)] Internal(#[from] anyhow::Error), } diff --git a/services/appflowy-worker/src/import_worker/worker.rs b/services/appflowy-worker/src/import_worker/worker.rs index e3289807..6d07ebe2 100644 --- a/services/appflowy-worker/src/import_worker/worker.rs +++ b/services/appflowy-worker/src/import_worker/worker.rs @@ -80,10 +80,7 @@ pub async fn run_import_worker( tick_interval_secs: u64, ) -> Result<(), ImportError> { info!("Starting importer worker"); - if let Err(err) = ensure_consumer_group(stream_name, GROUP_NAME, &mut redis_client) - .await - .map_err(ImportError::Internal) - { + if let Err(err) = ensure_consumer_group(stream_name, GROUP_NAME, &mut redis_client).await { error!("Failed to ensure consumer group: {:?}", err); } @@ -179,6 +176,7 @@ async fn process_upcoming_tasks( loop { interval.tick().await; + let tasks: StreamReadReply = match redis_client .xread_options(&[stream_name], &[">"], &options) .await @@ -186,6 +184,17 @@ async fn process_upcoming_tasks( Ok(tasks) => tasks, Err(err) => { error!("Failed to read tasks from Redis stream: {:?}", err); + + // Use command: + // docker exec -it appflowy-cloud-redis-1 redis-cli FLUSHDB to generate the error + // NOGROUP: No such key 'import_task_stream' or consumer group 'import_task_group' in XREADGROUP with GROUP option + if let Some(code) = err.code() { + if code == "NOGROUP" { + if let Err(err) = ensure_consumer_group(stream_name, GROUP_NAME, redis_client).await { + error!("Failed to ensure consumer group: {:?}", err); + } + } + } continue; }, }; @@ -198,6 +207,7 @@ async fn process_upcoming_tasks( Ok(import_task) => { let stream_name = stream_name.to_string(); let group_name = group_name.to_string(); + let context = TaskContext { storage_dir: storage_dir.to_path_buf(), redis_client: redis_client.clone(), @@ -206,7 +216,8 @@ async fn process_upcoming_tasks( notifier: notifier.clone(), metrics: metrics.clone(), }; - task_handlers.push(spawn_local(async move { + + let handle = spawn_local(async move { consume_task( context, import_task, @@ -216,7 +227,8 @@ async fn process_upcoming_tasks( ) .await?; Ok::<(), ImportError>(()) - })); + }); + task_handlers.push(handle); }, Err(err) => { error!("Failed to deserialize task: {:?}", err); @@ -233,6 +245,7 @@ async fn process_upcoming_tasks( } } } + info!("[Import] stop reading tasks from stream"); } #[derive(Clone)] struct TaskContext { @@ -280,8 +293,12 @@ async fn consume_task( if task.last_process_at.is_none() { task.last_process_at = Some(Utc::now().timestamp()); } + process_and_ack_task(context, import_task, stream_name, group_name, &entry_id).await } else { - trace!("[Import] {} file not found, queue task", task.workspace_id); + info!( + "[Import] {} zip file not found, queue task", + task.workspace_id + ); push_task( &mut context.redis_client, stream_name, @@ -290,12 +307,12 @@ async fn consume_task( &entry_id, ) .await?; - return Ok(()); + Ok(()) } + } else { + // If the task is not a notion task, proceed directly to processing + process_and_ack_task(context, import_task, stream_name, group_name, &entry_id).await } - - // Process and acknowledge the task - process_and_ack_task(context, import_task, stream_name, group_name, &entry_id).await } async fn handle_expired_task( @@ -308,7 +325,7 @@ async fn handle_expired_task( reason: &str, ) -> Result<(), ImportError> { info!( - "[Import]: {} import is expired with reason:{}, delete workspace", + "[Import]: {} import is expired with reason:{}", task.workspace_id, reason ); @@ -323,6 +340,7 @@ async fn handle_expired_task( ImportError::Internal(e.into()) })?; remove_workspace(&import_record.workspace_id, &context.pg_pool).await; + info!("[Import]: deleted workspace {}", task.workspace_id); if let Err(err) = context.s3_client.delete_blob(task.s3_key.as_str()).await { error!( @@ -330,7 +348,12 @@ async fn handle_expired_task( task.workspace_id, err ); } - let _ = xack_task(&mut context.redis_client, stream_name, group_name, entry_id).await; + if let Err(err) = xack_task(&mut context.redis_client, stream_name, group_name, entry_id).await { + error!( + "[Import] failed to acknowledge task:{} error:{:?}", + task.workspace_id, err + ); + } notify_user( task, Err(ImportError::UploadFileExpire), @@ -388,7 +411,7 @@ fn is_task_expired(created_timestamp: i64, last_process_at: Option) -> Resu if elapsed.num_hours() >= hours { return Err(format!( - "[Import] task is expired: created_at: {}, last_process_at: {:?}, elapsed: {} hours", + "task is expired: created_at: {}, last_process_at: {:?}, elapsed: {} hours", created_at.format("%m/%d/%y %H:%M"), last_process_at, elapsed.num_hours() @@ -485,10 +508,7 @@ async fn process_task( .parse() .unwrap_or(false); - info!( - "[Import]: Processing task: {}, retry interval: {}, streaming: {}", - import_task, retry_interval, streaming - ); + info!("[Import]: Processing task: {}", import_task); match import_task { ImportTask::Notion(task) => { @@ -1285,7 +1305,7 @@ async fn ensure_consumer_group( stream_key: &str, group_name: &str, redis_client: &mut ConnectionManager, -) -> Result<(), anyhow::Error> { +) -> Result<(), WorkerError> { let result: RedisResult<()> = redis_client .xgroup_create_mkstream(stream_key, group_name, "0") .await; @@ -1293,11 +1313,15 @@ async fn ensure_consumer_group( if let Err(redis_error) = result { if let Some(code) = redis_error.code() { if code == "BUSYGROUP" { - return Ok(()); // Group already exists, considered as success. + return Ok(()); + } + + if code == "NOGROUP" { + return Err(WorkerError::StreamGroupNotExist(group_name.to_string())); } } error!("Error when creating consumer group: {:?}", redis_error); - return Err(redis_error.into()); + return Err(WorkerError::Internal(redis_error.into())); } Ok(())