chore: split redis stream readers to active and one shot

This commit is contained in:
Bartosz Sypytkowski 2024-11-07 11:43:19 +01:00
parent c8dab0fb2c
commit bc49d73b40
4 changed files with 93 additions and 81 deletions

View File

@ -6,7 +6,7 @@ use crate::stream_group::{StreamConfig, StreamGroup};
use crate::stream_router::{StreamRouter, StreamRouterOptions}; use crate::stream_router::{StreamRouter, StreamRouterOptions};
use futures::Stream; use futures::Stream;
use redis::aio::ConnectionManager; use redis::aio::ConnectionManager;
use redis::streams::StreamReadOptions; use redis::streams::StreamReadReply;
use redis::{AsyncCommands, FromRedisValue}; use redis::{AsyncCommands, FromRedisValue};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@ -111,18 +111,46 @@ impl CollabRedisStream {
AwarenessUpdateSink::new(self.connection_manager.clone(), stream_key) AwarenessUpdateSink::new(self.connection_manager.clone(), stream_key)
} }
pub fn collab_updates( /// Reads all collab updates for a given `workspace_id`:`object_id` entry, starting
/// from a given message id. Once Redis stream return no more results, the stream will be closed.
pub async fn current_collab_updates(
&self,
workspace_id: &str,
object_id: &str,
since: Option<MessageId>,
) -> Result<Vec<(MessageId, CollabStreamUpdate)>, StreamError> {
let stream_key = CollabStreamUpdate::stream_key(workspace_id, object_id);
let since = since.unwrap_or_default().to_string();
let mut conn = self.connection_manager.clone();
let mut result = Vec::new();
let mut reply: StreamReadReply = conn.xread(&[&stream_key], &[&since]).await?;
if let Some(key) = reply.keys.pop() {
if key.key == stream_key {
for stream_id in key.ids {
let message_id = MessageId::try_from(stream_id.id)?;
let stream_update = CollabStreamUpdate::try_from(stream_id.map)?;
result.push((message_id, stream_update));
}
}
}
Ok(result)
}
/// Reads all collab updates for a given `workspace_id`:`object_id` entry, starting
/// from a given message id. This stream will be kept alive and pass over all future messages
/// coming from corresponding Redis stream until explicitly closed.
pub fn live_collab_updates(
&self, &self,
workspace_id: &str, workspace_id: &str,
object_id: &str, object_id: &str,
since: Option<MessageId>, since: Option<MessageId>,
keep_alive: bool,
) -> impl Stream<Item = Result<(MessageId, CollabStreamUpdate), StreamError>> { ) -> impl Stream<Item = Result<(MessageId, CollabStreamUpdate), StreamError>> {
let stream_key = CollabStreamUpdate::stream_key(workspace_id, object_id); let stream_key = CollabStreamUpdate::stream_key(workspace_id, object_id);
let since = since.map(|id| id.to_string()); let since = since.map(|id| id.to_string());
let mut reader = self.stream_router.observe(stream_key, since); let mut reader = self.stream_router.observe(stream_key, since);
async_stream::try_stream! { async_stream::try_stream! {
while let Some((message_id, fields)) = reader.recv().await { while let Some((message_id, fields)) = reader.recv().await {
tracing::trace!("incoming collab update `{}`", message_id);
let message_id = MessageId::try_from(message_id).map_err(|e| internal(e.to_string()))?; let message_id = MessageId::try_from(message_id).map_err(|e| internal(e.to_string()))?;
let collab_update = CollabStreamUpdate::try_from(fields)?; let collab_update = CollabStreamUpdate::try_from(fields)?;
yield (message_id, collab_update); yield (message_id, collab_update);
@ -140,7 +168,8 @@ impl CollabRedisStream {
let since = since.map(|id| id.to_string()); let since = since.map(|id| id.to_string());
let mut reader = self.stream_router.observe(stream_key, since); let mut reader = self.stream_router.observe(stream_key, since);
async_stream::try_stream! { async_stream::try_stream! {
while let Some((_message_id, fields)) = reader.recv().await { while let Some((message_id, fields)) = reader.recv().await {
tracing::trace!("incoming awareness update `{}`", message_id);
let awareness_update = AwarenessStreamUpdate::try_from(fields)?; let awareness_update = AwarenessStreamUpdate::try_from(fields)?;
yield awareness_update; yield awareness_update;
} }

View File

@ -24,8 +24,9 @@ pub type StreamReader = tokio::sync::mpsc::UnboundedReceiver<(String, RedisMap)>
/// fixed number of Redis connections. /// fixed number of Redis connections.
pub struct StreamRouter { pub struct StreamRouter {
buf: Sender<StreamHandle>, buf: Sender<StreamHandle>,
workers: Vec<Worker>,
alive: Arc<AtomicBool>, alive: Arc<AtomicBool>,
#[allow(dead_code)]
workers: Vec<Worker>,
} }
impl StreamRouter { impl StreamRouter {
@ -59,7 +60,7 @@ impl StreamRouter {
pub fn observe(&self, stream_key: StreamKey, last_id: Option<String>) -> StreamReader { pub fn observe(&self, stream_key: StreamKey, last_id: Option<String>) -> StreamReader {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let last_id = last_id.unwrap_or_default(); let last_id = last_id.unwrap_or_else(|| "0".to_string());
let h = StreamHandle::new(stream_key, last_id, tx); let h = StreamHandle::new(stream_key, last_id, tx);
self.buf.send(h).unwrap(); self.buf.send(h).unwrap();
rx rx
@ -106,7 +107,7 @@ impl Default for StreamRouterOptions {
} }
struct Worker { struct Worker {
handle: JoinHandle<()>, _handle: JoinHandle<()>,
} }
impl Worker { impl Worker {
@ -131,7 +132,7 @@ impl Worker {
tracing::error!("worker {} failed: {}", worker_id, err); tracing::error!("worker {} failed: {}", worker_id, err);
} }
}); });
Self { handle } Self { _handle: handle }
} }
fn process_streams( fn process_streams(
@ -149,7 +150,9 @@ impl Worker {
if !Self::read_buf(&rx, &mut stream_keys, &mut message_ids, &mut senders) { if !Self::read_buf(&rx, &mut stream_keys, &mut message_ids, &mut senders) {
break; // rx channel has closed break; // rx channel has closed
} }
if stream_keys.is_empty() {
let key_count = stream_keys.len();
if key_count == 0 {
tracing::warn!("Bug: read empty buf"); tracing::warn!("Bug: read empty buf");
sleep(Duration::from_millis(100)); sleep(Duration::from_millis(100));
continue; continue;
@ -157,6 +160,7 @@ impl Worker {
let result: StreamReadReply = conn.xread_options(&stream_keys, &message_ids, &options)?; let result: StreamReadReply = conn.xread_options(&stream_keys, &message_ids, &options)?;
let mut msgs = 0;
for stream in result.keys { for stream in result.keys {
let mut remove_sender = false; let mut remove_sender = false;
if let Some((sender, idx)) = senders.get(stream.key.as_str()) { if let Some((sender, idx)) = senders.get(stream.key.as_str()) {
@ -164,6 +168,7 @@ impl Worker {
let message_id = id.id; let message_id = id.id;
let value = id.map; let value = id.map;
message_ids[*idx] = message_id.clone(); //TODO: optimize message_ids[*idx] = message_id.clone(); //TODO: optimize
msgs += 1;
if let Err(err) = sender.send((message_id, value)) { if let Err(err) = sender.send((message_id, value)) {
tracing::warn!("failed to send: {}", err); tracing::warn!("failed to send: {}", err);
remove_sender = true; remove_sender = true;
@ -176,20 +181,27 @@ impl Worker {
} }
} }
if msgs > 0 {
tracing::trace!(
"XREAD: read total of {} messages for {} streams",
msgs,
key_count
);
}
Self::schedule_back(&tx, &mut stream_keys, &mut message_ids, &mut senders); Self::schedule_back(&tx, &mut stream_keys, &mut message_ids, &mut senders);
} }
Ok(()) Ok(())
} }
fn schedule_back<'a>( fn schedule_back(
tx: &Sender<StreamHandle>, tx: &Sender<StreamHandle>,
keys: &mut Vec<StreamKey>, keys: &mut Vec<StreamKey>,
ids: &mut Vec<String>, ids: &mut Vec<String>,
senders: &mut HashMap<&'a str, (StreamSender, usize)>, senders: &mut HashMap<&str, (StreamSender, usize)>,
) { ) {
let mut keys = keys.drain(..); let keys = keys.drain(..);
let mut ids = ids.drain(..); let mut ids = ids.drain(..);
while let Some(key) = keys.next() { for key in keys {
if let Some(last_id) = ids.next() { if let Some(last_id) = ids.next() {
if let Some((sender, _)) = senders.remove(key.as_str()) { if let Some((sender, _)) = senders.remove(key.as_str()) {
let h = StreamHandle::new(key, last_id, sender); let h = StreamHandle::new(key, last_id, sender);

View File

@ -166,11 +166,10 @@ impl CollabGroup {
/// Task used to receive collab updates from Redis. /// Task used to receive collab updates from Redis.
async fn inbound_task(state: Arc<CollabGroupState>) -> Result<(), RealtimeError> { async fn inbound_task(state: Arc<CollabGroupState>) -> Result<(), RealtimeError> {
let updates = state.persister.collab_redis_stream.collab_updates( let updates = state.persister.collab_redis_stream.live_collab_updates(
&state.workspace_id, &state.workspace_id,
&state.object_id, &state.object_id,
None, None,
true,
); );
pin_mut!(updates); pin_mut!(updates);
loop { loop {
@ -936,34 +935,22 @@ impl CollabPersister {
// 2. consume all Redis updates on top of it (keep redis msg id) // 2. consume all Redis updates on top of it (keep redis msg id)
let mut last_message_id = None; let mut last_message_id = None;
let mut tx = collab.transact_mut(); let mut tx = collab.transact_mut();
let stream = self.collab_redis_stream.collab_updates( let updates = self
&self.workspace_id, .collab_redis_stream
&self.object_id, .current_collab_updates(
None, //TODO: store Redis last msg id somewhere in doc state snapshot and replay from there &self.workspace_id,
false, // read only data currently existing in the stream &self.object_id,
); None, //TODO: store Redis last msg id somewhere in doc state snapshot and replay from there
pin_mut!(stream); )
.await?;
let mut i = 0; let mut i = 0;
while let Some(res) = stream.next().await { for (message_id, update) in updates {
match res { i += 1;
Ok((message_id, update)) => { let update: Update = update.into_update()?;
i += 1; tx.apply_update(update)
let update: Update = update.into_update()?; .map_err(|err| RTProtocolError::YrsApplyUpdate(err.to_string()))?;
tx.apply_update(update) last_message_id = Some(message_id); //TODO: shouldn't this happen before decoding?
.map_err(|err| RTProtocolError::YrsApplyUpdate(err.to_string()))?; self.metrics.apply_update_count.inc();
last_message_id = Some(message_id); //TODO: shouldn't this happen before decoding?
self.metrics.apply_update_count.inc();
},
Err(err) => {
self.metrics.apply_update_failed_count.inc();
tracing::error!(
"`{}` failed to resolve collab update: {}",
self.object_id,
err
);
break;
},
}
} }
drop(tx); drop(tx);
tracing::trace!( tracing::trace!(
@ -988,49 +975,33 @@ impl CollabPersister {
/// waiting to be merged into main document state. /// waiting to be merged into main document state.
async fn load_if_changed(&self) -> Result<Option<CollabSnapshot>, RealtimeError> { async fn load_if_changed(&self) -> Result<Option<CollabSnapshot>, RealtimeError> {
// 1. load pending Redis updates // 1. load pending Redis updates
let stream = self.collab_redis_stream.collab_updates( let updates = self
&self.workspace_id, .collab_redis_stream
&self.object_id, .current_collab_updates(&self.workspace_id, &self.object_id, None)
None, //TODO: store Redis last msg id somewhere in doc state snapshot and replay from there .await?;
false, // read only data currently existing in the stream
);
pin_mut!(stream);
let start = Instant::now(); let start = Instant::now();
let mut i = 0; let mut i = 0;
let mut collab = None; let mut collab = None;
let mut last_message_id = None; let mut last_message_id = None;
while let Some(res) = stream.next().await { for (message_id, update) in updates {
match res { i += 1;
Ok((message_id, update)) => { let update: Update = update.into_update()?;
i += 1; if collab.is_none() {
let update: Update = update.into_update()?; collab = Some(match self.load_collab_full(true).await? {
if collab.is_none() { Some(collab) => collab,
collab = Some(match self.load_collab_full(true).await? { None => {
Some(collab) => collab, Collab::new_with_origin(CollabOrigin::Server, self.object_id.clone(), vec![], true)
None => { },
Collab::new_with_origin(CollabOrigin::Server, self.object_id.clone(), vec![], true) })
}, };
}) let collab = collab.as_mut().unwrap();
}; collab
let collab = collab.as_mut().unwrap(); .transact_mut()
collab .apply_update(update)
.transact_mut() .map_err(|err| RTProtocolError::YrsApplyUpdate(err.to_string()))?;
.apply_update(update) last_message_id = Some(message_id); //TODO: shouldn't this happen before decoding?
.map_err(|err| RTProtocolError::YrsApplyUpdate(err.to_string()))?; self.metrics.apply_update_count.inc();
last_message_id = Some(message_id); //TODO: shouldn't this happen before decoding?
self.metrics.apply_update_count.inc();
},
Err(err) => {
self.metrics.apply_update_failed_count.inc();
tracing::error!(
"`{}` failed to resolve collab update: {}",
self.object_id,
err
);
break;
},
}
} }
// if there were no Redis updates, collab is still not initialized // if there were no Redis updates, collab is still not initialized

View File

@ -46,7 +46,7 @@ async fn run_multiple_text_edits() {
// run test scenario // run test scenario
let collab = writer.collabs.get(&object_id).unwrap().collab.clone(); let collab = writer.collabs.get(&object_id).unwrap().collab.clone();
let expected = test_scenario.execute(collab, 10_000).await; let expected = test_scenario.execute(collab, 50_000).await;
// wait for the writer to complete sync // wait for the writer to complete sync
writer.wait_object_sync_complete(&object_id).await.unwrap(); writer.wait_object_sync_complete(&object_id).await.unwrap();