chore: stop indexing when stream is not exist (#652)

This commit is contained in:
Nathan.fooo 2024-06-26 14:24:23 +08:00 committed by GitHub
parent c0bca1cb8c
commit 1a22813113
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 110 additions and 27 deletions

View File

@ -5,6 +5,12 @@ pub enum StreamError {
#[error(transparent)]
RedisError(#[from] RedisError),
#[error("Stream already exist: {0}")]
StreamAlreadyExist(String),
#[error("Stream not exist: {0}")]
StreamNotExist(String),
#[error("Unexpected value: {0}")]
UnexpectedValue(String),
@ -30,6 +36,12 @@ pub enum StreamError {
Internal(anyhow::Error),
}
impl StreamError {
pub fn is_stream_not_exist(&self) -> bool {
matches!(self, StreamError::StreamNotExist(_))
}
}
pub fn internal<T: ToString>(msg: T) -> RedisError {
let msg = msg.to_string();
RedisError::from((redis::ErrorKind::TypeError, "", msg))

View File

@ -6,12 +6,12 @@ use redis::streams::{
StreamClaimOptions, StreamClaimReply, StreamMaxlen, StreamPendingData, StreamPendingReply,
StreamReadOptions,
};
use redis::{pipe, AsyncCommands, RedisResult};
use redis::{pipe, AsyncCommands, ErrorKind, RedisResult};
use std::sync::Arc;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use tracing::{error, trace, warn};
use tracing::{error, info, trace, warn};
#[derive(Clone)]
pub struct StreamGroup {
@ -63,15 +63,41 @@ impl StreamGroup {
//Use '$' if you want new messages or '0' to read from the beginning.
.xgroup_create_mkstream(&self.stream_key, &self.group_name, "0")
.await;
if let Err(e) = result {
tracing::warn!(
"error when creating consumer group `{}` `{}`: {:?}",
self.stream_key,
self.group_name,
e
);
match result {
Ok(_) => Ok(()),
Err(redis_error) => {
warn!(
"error when creating consumer group `{}` `{}`: {:?}",
self.stream_key, self.group_name, redis_error
);
return match redis_error.kind() {
ErrorKind::ExtensionError => match redis_error.code() {
None => Err(StreamError::from(redis_error)),
Some(code) => {
if code == "BUSYGROUP" {
Ok(())
} else {
Err(StreamError::from(redis_error))
}
},
},
_ => Err(StreamError::from(redis_error)),
};
},
}
}
pub async fn destroy_group(&mut self) {
let result: RedisResult<bool> = self
.connection_manager
.xgroup_destroy::<&str, &str, bool>(&self.stream_key, &self.group_name)
.await;
match result {
Ok(_) => info!("destroy group success: {}", self.group_name),
Err(err) => error!("destroy group error: {:?}", err),
}
Ok(())
}
/// Acknowledges messages processed by a consumer.
@ -231,7 +257,20 @@ impl StreamGroup {
let map: StreamMessageByStreamKey = self
.connection_manager
.xread_options(&[&self.stream_key], &[message_id], &options)
.await?;
.await
.map_err(|redis_error| match redis_error.kind() {
ErrorKind::ExtensionError => match redis_error.code() {
None => StreamError::from(redis_error),
Some(code) => {
if code == "NOGROUP" {
StreamError::StreamNotExist(redis_error.to_string())
} else {
StreamError::from(redis_error)
}
},
},
_ => StreamError::from(redis_error),
})?;
match map.0.into_iter().next() {
None => Ok(Vec::with_capacity(0)),

View File

@ -1,4 +1,5 @@
use crate::collab_stream_test::test_util::{random_i64, stream_client};
use collab_stream::error::StreamError;
use collab_stream::model::StreamBinary;
use collab_stream::stream_group::ReadOption;
use futures::future::join;
@ -169,3 +170,31 @@ async fn read_all_message_test() {
);
}
}
#[tokio::test]
async fn group_already_exist_test() {
let oid = format!("o{}", random_i64());
let client = stream_client().await;
// create group
client.collab_update_stream("w1", &oid, "g2").await.unwrap();
// create same group
client.collab_update_stream("w1", &oid, "g2").await.unwrap();
}
#[tokio::test]
async fn group_not_exist_test() {
let oid = format!("o{}", random_i64());
let client = stream_client().await;
// create group
let mut group = client.collab_update_stream("w1", &oid, "g2").await.unwrap();
group.destroy_group().await;
let err = group
.consumer_messages("consumer1", ReadOption::Count(15))
.await
.unwrap_err();
assert!(matches!(err, StreamError::StreamNotExist(_)));
}

View File

@ -14,7 +14,7 @@ use tokio::select;
use tokio::task::JoinSet;
use tokio::time::interval;
use tokio_util::sync::CancellationToken;
use tracing::instrument;
use tracing::{instrument, trace};
use uuid::Uuid;
use collab_stream::client::CollabRedisStream;
@ -83,10 +83,10 @@ impl CollabHandle {
};
let group_name = format!("indexer_{}:{}", workspace_id, object_id);
trace!("init collab redis stream: {}", group_name);
let mut update_stream = redis_stream
.collab_update_stream(&workspace_id, &object_id, &group_name)
.await
.unwrap();
.await?;
let messages = update_stream.get_unacked_messages(CONSUMER_NAME).await?;
if !messages.is_empty() {
@ -133,29 +133,32 @@ impl CollabHandle {
loop {
select! {
_ = closing.cancelled() => {
tracing::trace!("document {}/{} watcher cancelled, stopping.", workspace_id, object_id);
trace!("document {}/{} watcher cancelled, stopping.", workspace_id, object_id);
return;
},
_ = interval.tick() => {
let result = update_stream
.consumer_messages(CONSUMER_NAME, ReadOption::Count(100))
.await;
match result {
match update_stream.consumer_messages(CONSUMER_NAME, ReadOption::Count(100)).await {
Ok(messages) => {
if let Some(content) = content.upgrade() {
// check if we received empty message batch, if not: update the collab
if !messages.is_empty() {
if let Err(err) = Self::handle_collab_updates(&mut update_stream, content.get_collab(), messages).await {
tracing::error!("document {}/{} watcher failed to handle updates: {}", workspace_id, object_id, err);
if !messages.is_empty() {
if let Err(err) = Self::handle_collab_updates(&mut update_stream, content.get_collab(), messages).await {
tracing::error!("document {}/{} watcher failed to handle updates: {}", workspace_id, object_id, err);
}
}
}
} else {
tracing::trace!("collab dropped, stopping consumer");
return;
trace!("collab dropped, stopping consumer");
return;
}
},
Err(err) => {
tracing::error!("document {}/{} watcher failed to receive messages: {}", workspace_id, object_id, err);
// we ensure that the stream exists by calling [CollabRedisStream::collab_update_stream]
// before spawning this task. So if the stream does not exist, it's a bug.
if err.is_stream_not_exist() {
tracing::error!("document {}/{} watcher failed to receive messages: {}", workspace_id, object_id, err);
return;
} else {
tracing::error!("document {}/{} watcher failed to receive messages: {}", workspace_id, object_id, err);
}
},
}
}