chore: update stream - live and current queries

This commit is contained in:
Bartosz Sypytkowski 2024-10-17 07:49:59 +02:00
parent 21b734269c
commit 13d657861a
3 changed files with 15 additions and 2 deletions

View File

@ -100,14 +100,23 @@ impl CollabRedisStream {
workspace_id: &str,
object_id: &str,
since: Option<MessageId>,
live: bool,
) -> impl Stream<Item = Result<(MessageId, CollabStreamUpdate), StreamError>> {
// use `:` separator as it adheres to Redis naming conventions
let mut conn = self.connection_manager.clone();
let stream_key = CollabStreamUpdate::stream_key(workspace_id, object_id);
let read_options = StreamReadOptions::default().count(100);
let mut since = since.unwrap_or_default();
async_stream::try_stream! {
loop {
let until = if live {
None
} else {
let mut reply: StreamRangeReply = conn.xrevrange_count(&stream_key, "+", "-", 1).await?;
match reply.ids.pop().map(|stream_id| stream_id.id) {
None => Some(MessageId::default()),
Some(id) => Some(MessageId::try_from(id)?),
}
};
while Some(since) < until {
let last_id = since.to_string();
let batch: CollabStreamUpdateBatch = conn
.xread_options(&[&stream_key], &[&last_id], &read_options)

View File

@ -390,6 +390,7 @@ impl CollabStreamUpdate {
/// Returns Redis stream key, that's storing entries mapped to/from [CollabStreamUpdate].
pub fn stream_key(workspace_id: &str, object_id: &str) -> String {
// use `:` separator as it adheres to Redis naming conventions
format!("af:{}:{}:updates", workspace_id, object_id)
}

View File

@ -167,6 +167,7 @@ impl CollabGroup {
&state.workspace_id,
&state.object_id,
None,
true,
);
pin_mut!(updates);
loop {
@ -550,6 +551,7 @@ impl CollabGroup {
}
},
Err(err) => {
tracing::warn!("[realtime]: failed to handled message: {}", msg_id);
state.metrics.apply_update_failed_count.inc();
let code = Self::ack_code_from_error(&err);
@ -902,6 +904,7 @@ impl CollabPersister {
&self.workspace_id,
&self.object_id,
None, //TODO: store Redis last msg id somewhere in doc state snapshot and replay from there
false, // read only data currently existing in the stream
);
pin_mut!(stream);
let mut i = 0;