diff --git a/libs/collab-stream/src/error.rs b/libs/collab-stream/src/error.rs index 49becbbd..06da1932 100644 --- a/libs/collab-stream/src/error.rs +++ b/libs/collab-stream/src/error.rs @@ -5,6 +5,12 @@ pub enum StreamError { #[error(transparent)] RedisError(#[from] RedisError), + #[error("Stream already exist: {0}")] + StreamAlreadyExist(String), + + #[error("Stream not exist: {0}")] + StreamNotExist(String), + #[error("Unexpected value: {0}")] UnexpectedValue(String), @@ -30,6 +36,12 @@ pub enum StreamError { Internal(anyhow::Error), } +impl StreamError { + pub fn is_stream_not_exist(&self) -> bool { + matches!(self, StreamError::StreamNotExist(_)) + } +} + pub fn internal(msg: T) -> RedisError { let msg = msg.to_string(); RedisError::from((redis::ErrorKind::TypeError, "", msg)) diff --git a/libs/collab-stream/src/stream_group.rs b/libs/collab-stream/src/stream_group.rs index e962a0d2..3bb4887c 100644 --- a/libs/collab-stream/src/stream_group.rs +++ b/libs/collab-stream/src/stream_group.rs @@ -6,12 +6,12 @@ use redis::streams::{ StreamClaimOptions, StreamClaimReply, StreamMaxlen, StreamPendingData, StreamPendingReply, StreamReadOptions, }; -use redis::{pipe, AsyncCommands, RedisResult}; +use redis::{pipe, AsyncCommands, ErrorKind, RedisResult}; use std::sync::Arc; use std::time::Duration; use tokio_util::sync::CancellationToken; -use tracing::{error, trace, warn}; +use tracing::{error, info, trace, warn}; #[derive(Clone)] pub struct StreamGroup { @@ -63,15 +63,41 @@ impl StreamGroup { //Use '$' if you want new messages or '0' to read from the beginning. .xgroup_create_mkstream(&self.stream_key, &self.group_name, "0") .await; - if let Err(e) = result { - tracing::warn!( - "error when creating consumer group `{}` `{}`: {:?}", - self.stream_key, - self.group_name, - e - ); + + match result { + Ok(_) => Ok(()), + Err(redis_error) => { + warn!( + "error when creating consumer group `{}` `{}`: {:?}", + self.stream_key, self.group_name, redis_error + ); + + return match redis_error.kind() { + ErrorKind::ExtensionError => match redis_error.code() { + None => Err(StreamError::from(redis_error)), + Some(code) => { + if code == "BUSYGROUP" { + Ok(()) + } else { + Err(StreamError::from(redis_error)) + } + }, + }, + _ => Err(StreamError::from(redis_error)), + }; + }, + } + } + + pub async fn destroy_group(&mut self) { + let result: RedisResult = self + .connection_manager + .xgroup_destroy::<&str, &str, bool>(&self.stream_key, &self.group_name) + .await; + match result { + Ok(_) => info!("destroy group success: {}", self.group_name), + Err(err) => error!("destroy group error: {:?}", err), } - Ok(()) } /// Acknowledges messages processed by a consumer. @@ -231,7 +257,20 @@ impl StreamGroup { let map: StreamMessageByStreamKey = self .connection_manager .xread_options(&[&self.stream_key], &[message_id], &options) - .await?; + .await + .map_err(|redis_error| match redis_error.kind() { + ErrorKind::ExtensionError => match redis_error.code() { + None => StreamError::from(redis_error), + Some(code) => { + if code == "NOGROUP" { + StreamError::StreamNotExist(redis_error.to_string()) + } else { + StreamError::from(redis_error) + } + }, + }, + _ => StreamError::from(redis_error), + })?; match map.0.into_iter().next() { None => Ok(Vec::with_capacity(0)), diff --git a/libs/collab-stream/tests/collab_stream_test/stream_group_test.rs b/libs/collab-stream/tests/collab_stream_test/stream_group_test.rs index 9c6c4b8c..7bb218f2 100644 --- a/libs/collab-stream/tests/collab_stream_test/stream_group_test.rs +++ b/libs/collab-stream/tests/collab_stream_test/stream_group_test.rs @@ -1,4 +1,5 @@ use crate::collab_stream_test::test_util::{random_i64, stream_client}; +use collab_stream::error::StreamError; use collab_stream::model::StreamBinary; use collab_stream::stream_group::ReadOption; use futures::future::join; @@ -169,3 +170,31 @@ async fn read_all_message_test() { ); } } + +#[tokio::test] +async fn group_already_exist_test() { + let oid = format!("o{}", random_i64()); + let client = stream_client().await; + + // create group + client.collab_update_stream("w1", &oid, "g2").await.unwrap(); + + // create same group + client.collab_update_stream("w1", &oid, "g2").await.unwrap(); +} + +#[tokio::test] +async fn group_not_exist_test() { + let oid = format!("o{}", random_i64()); + let client = stream_client().await; + + // create group + let mut group = client.collab_update_stream("w1", &oid, "g2").await.unwrap(); + group.destroy_group().await; + + let err = group + .consumer_messages("consumer1", ReadOption::Count(15)) + .await + .unwrap_err(); + assert!(matches!(err, StreamError::StreamNotExist(_))); +} diff --git a/services/appflowy-indexer/src/collab_handle.rs b/services/appflowy-indexer/src/collab_handle.rs index ce7753ef..a5f5b84c 100644 --- a/services/appflowy-indexer/src/collab_handle.rs +++ b/services/appflowy-indexer/src/collab_handle.rs @@ -14,7 +14,7 @@ use tokio::select; use tokio::task::JoinSet; use tokio::time::interval; use tokio_util::sync::CancellationToken; -use tracing::instrument; +use tracing::{instrument, trace}; use uuid::Uuid; use collab_stream::client::CollabRedisStream; @@ -83,10 +83,10 @@ impl CollabHandle { }; let group_name = format!("indexer_{}:{}", workspace_id, object_id); + trace!("init collab redis stream: {}", group_name); let mut update_stream = redis_stream .collab_update_stream(&workspace_id, &object_id, &group_name) - .await - .unwrap(); + .await?; let messages = update_stream.get_unacked_messages(CONSUMER_NAME).await?; if !messages.is_empty() { @@ -133,29 +133,32 @@ impl CollabHandle { loop { select! { _ = closing.cancelled() => { - tracing::trace!("document {}/{} watcher cancelled, stopping.", workspace_id, object_id); + trace!("document {}/{} watcher cancelled, stopping.", workspace_id, object_id); return; }, _ = interval.tick() => { - let result = update_stream - .consumer_messages(CONSUMER_NAME, ReadOption::Count(100)) - .await; - match result { + match update_stream.consumer_messages(CONSUMER_NAME, ReadOption::Count(100)).await { Ok(messages) => { if let Some(content) = content.upgrade() { - // check if we received empty message batch, if not: update the collab - if !messages.is_empty() { - if let Err(err) = Self::handle_collab_updates(&mut update_stream, content.get_collab(), messages).await { - tracing::error!("document {}/{} watcher failed to handle updates: {}", workspace_id, object_id, err); + if !messages.is_empty() { + if let Err(err) = Self::handle_collab_updates(&mut update_stream, content.get_collab(), messages).await { + tracing::error!("document {}/{} watcher failed to handle updates: {}", workspace_id, object_id, err); + } } - } } else { - tracing::trace!("collab dropped, stopping consumer"); - return; + trace!("collab dropped, stopping consumer"); + return; } }, Err(err) => { - tracing::error!("document {}/{} watcher failed to receive messages: {}", workspace_id, object_id, err); + // we ensure that the stream exists by calling [CollabRedisStream::collab_update_stream] + // before spawning this task. So if the stream does not exist, it's a bug. + if err.is_stream_not_exist() { + tracing::error!("document {}/{} watcher failed to receive messages: {}", workspace_id, object_id, err); + return; + } else { + tracing::error!("document {}/{} watcher failed to receive messages: {}", workspace_id, object_id, err); + } }, } }