From 0d6b595ee805267cada8a2cff31a7a9a8840026c Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Fri, 4 Oct 2024 17:19:29 +0200 Subject: [PATCH] chore: create collab update sink and stream --- libs/collab-stream/src/client.rs | 10 ++- libs/collab-stream/src/collab_update_sink.rs | 35 +++++++++++ libs/collab-stream/src/lib.rs | 1 + libs/collab-stream/src/model.rs | 17 ++++- .../collab_stream_test/stream_group_test.rs | 46 +++++++++----- .../src/group/group_init.rs | 62 +++++++++++-------- services/appflowy-history/src/core/manager.rs | 2 +- .../tests/stream_test/update_stream_test.rs | 10 +-- 8 files changed, 133 insertions(+), 50 deletions(-) create mode 100644 libs/collab-stream/src/collab_update_sink.rs diff --git a/libs/collab-stream/src/client.rs b/libs/collab-stream/src/client.rs index 0c17e83d..f3d941a8 100644 --- a/libs/collab-stream/src/client.rs +++ b/libs/collab-stream/src/client.rs @@ -1,3 +1,4 @@ +use crate::collab_update_sink::CollabUpdateSink; use crate::error::StreamError; use crate::model::{CollabStreamUpdate, CollabStreamUpdateBatch, CollabUpdateEvent, MessageId}; use crate::pubsub::{CollabStreamPub, CollabStreamSub}; @@ -45,7 +46,7 @@ impl CollabRedisStream { Ok(group) } - pub async fn collab_update_stream( + pub async fn collab_update_stream_group( &self, workspace_id: &str, oid: &str, @@ -66,6 +67,11 @@ impl CollabRedisStream { Ok(group) } + pub fn collab_update_sink(&self, workspace_id: &str, object_id: &str) -> CollabUpdateSink { + let stream_key = CollabStreamUpdate::stream_key(workspace_id, object_id); + CollabUpdateSink::new(self.connection_manager.clone(), stream_key) + } + pub fn collab_updates( &self, workspace_id: &str, @@ -74,7 +80,7 @@ impl CollabRedisStream { ) -> impl Stream> { // use `:` separator as it adheres to Redis naming conventions let mut conn = self.connection_manager.clone(); - let stream_key = format!("af_update:{}:{}", workspace_id, object_id); + let stream_key = CollabStreamUpdate::stream_key(workspace_id, object_id); let read_options = StreamReadOptions::default().count(100); let mut since = since.unwrap_or_default(); async_stream::try_stream! { diff --git a/libs/collab-stream/src/collab_update_sink.rs b/libs/collab-stream/src/collab_update_sink.rs new file mode 100644 index 00000000..4926088f --- /dev/null +++ b/libs/collab-stream/src/collab_update_sink.rs @@ -0,0 +1,35 @@ +use crate::error::StreamError; +use crate::model::{CollabStreamUpdate, MessageId}; +use redis::aio::ConnectionManager; +use redis::cmd; +use tokio::sync::Mutex; + +pub struct CollabUpdateSink { + conn: Mutex, + stream_key: String, +} + +impl CollabUpdateSink { + pub fn new(conn: ConnectionManager, stream_key: String) -> Self { + CollabUpdateSink { + conn: conn.into(), + stream_key, + } + } + + pub async fn send(&self, msg: &CollabStreamUpdate) -> Result { + let mut lock = self.conn.lock().await; + let msg_id: MessageId = cmd("XADD") + .arg(&self.stream_key) + .arg("*") + .arg("flags") + .arg(msg.flags) + .arg("sender") + .arg(msg.sender.to_string()) + .arg("data") + .arg(&msg.data) + .query_async(&mut *lock) + .await?; + Ok(msg_id) + } +} diff --git a/libs/collab-stream/src/lib.rs b/libs/collab-stream/src/lib.rs index c2f42255..43102ce2 100644 --- a/libs/collab-stream/src/lib.rs +++ b/libs/collab-stream/src/lib.rs @@ -1,4 +1,5 @@ pub mod client; +pub mod collab_update_sink; pub mod error; pub mod model; pub mod pubsub; diff --git a/libs/collab-stream/src/model.rs b/libs/collab-stream/src/model.rs index d3a2311c..926062d5 100644 --- a/libs/collab-stream/src/model.rs +++ b/libs/collab-stream/src/model.rs @@ -5,7 +5,7 @@ use collab_entity::proto::collab::collab_update_event::Update; use collab_entity::{proto, CollabType}; use prost::Message; use redis::streams::StreamId; -use redis::{FromRedisValue, RedisError, RedisResult, Value}; +use redis::{FromRedisValue, RedisError, RedisResult, RedisWrite, ToRedisArgs, Value}; use serde::{Deserialize, Serialize}; use std::collections::{BTreeMap, HashMap}; use std::fmt::{Display, Formatter}; @@ -374,6 +374,11 @@ impl CollabStreamUpdate { flags: flags.into(), } } + + /// Returns Redis stream key, that's storing entries mapped to/from [CollabStreamUpdate]. + pub fn stream_key(workspace_id: &str, object_id: &str) -> String { + format!("af_update:{}:{}", workspace_id, object_id) + } } pub(crate) struct CollabStreamUpdateBatch { @@ -476,6 +481,16 @@ impl UpdateFlags { } } +impl ToRedisArgs for UpdateFlags { + #[inline] + fn write_redis_args(&self, out: &mut W) + where + W: ?Sized + RedisWrite, + { + self.0.write_redis_args(out) + } +} + impl From for UpdateFlags { #[inline] fn from(value: u8) -> Self { diff --git a/libs/collab-stream/tests/collab_stream_test/stream_group_test.rs b/libs/collab-stream/tests/collab_stream_test/stream_group_test.rs index 7bb218f2..1d2b0d23 100644 --- a/libs/collab-stream/tests/collab_stream_test/stream_group_test.rs +++ b/libs/collab-stream/tests/collab_stream_test/stream_group_test.rs @@ -10,7 +10,7 @@ async fn single_group_read_message_test() { let oid = format!("o{}", random_i64()); let client = stream_client().await; let mut group = client - .collab_update_stream(workspace_id, &oid, "g1") + .collab_update_stream_group(workspace_id, &oid, "g1") .await .unwrap(); let msg = StreamBinary(vec![1, 2, 3, 4, 5]); @@ -18,7 +18,7 @@ async fn single_group_read_message_test() { { let client = stream_client().await; let mut group = client - .collab_update_stream(workspace_id, &oid, "g2") + .collab_update_stream_group(workspace_id, &oid, "g2") .await .unwrap(); group.insert_binary(msg).await.unwrap(); @@ -45,7 +45,7 @@ async fn single_group_async_read_message_test() { let oid = format!("o{}", random_i64()); let client = stream_client().await; let mut group = client - .collab_update_stream(workspace_id, &oid, "g1") + .collab_update_stream_group(workspace_id, &oid, "g1") .await .unwrap(); @@ -54,7 +54,7 @@ async fn single_group_async_read_message_test() { { let client = stream_client().await; let mut group = client - .collab_update_stream(workspace_id, &oid, "g2") + .collab_update_stream_group(workspace_id, &oid, "g2") .await .unwrap(); group.insert_binary(msg).await.unwrap(); @@ -79,14 +79,23 @@ async fn single_group_async_read_message_test() { async fn different_group_read_message_test() { let oid = format!("o{}", random_i64()); let client = stream_client().await; - let mut group_1 = client.collab_update_stream("w1", &oid, "g1").await.unwrap(); - let mut group_2 = client.collab_update_stream("w1", &oid, "g2").await.unwrap(); + let mut group_1 = client + .collab_update_stream_group("w1", &oid, "g1") + .await + .unwrap(); + let mut group_2 = client + .collab_update_stream_group("w1", &oid, "g2") + .await + .unwrap(); let msg = StreamBinary(vec![1, 2, 3, 4, 5]); { let client = stream_client().await; - let mut group = client.collab_update_stream("w1", &oid, "g2").await.unwrap(); + let mut group = client + .collab_update_stream_group("w1", &oid, "g2") + .await + .unwrap(); group.insert_binary(msg).await.unwrap(); } @@ -106,13 +115,13 @@ async fn read_specific_num_of_message_test() { let object_id = format!("o{}", random_i64()); let client = stream_client().await; let mut group_1 = client - .collab_update_stream("w1", &object_id, "g1") + .collab_update_stream_group("w1", &object_id, "g1") .await .unwrap(); { let client = stream_client().await; let mut group = client - .collab_update_stream("w1", &object_id, "g2") + .collab_update_stream_group("w1", &object_id, "g2") .await .unwrap(); let mut messages = vec![]; @@ -143,13 +152,13 @@ async fn read_all_message_test() { let object_id = format!("o{}", random_i64()); let client = stream_client().await; let mut group = client - .collab_update_stream("w1", &object_id, "g1") + .collab_update_stream_group("w1", &object_id, "g1") .await .unwrap(); { let client = stream_client().await; let mut group_2 = client - .collab_update_stream("w1", &object_id, "g2") + .collab_update_stream_group("w1", &object_id, "g2") .await .unwrap(); let mut messages = vec![]; @@ -177,10 +186,16 @@ async fn group_already_exist_test() { let client = stream_client().await; // create group - client.collab_update_stream("w1", &oid, "g2").await.unwrap(); + client + .collab_update_stream_group("w1", &oid, "g2") + .await + .unwrap(); // create same group - client.collab_update_stream("w1", &oid, "g2").await.unwrap(); + client + .collab_update_stream_group("w1", &oid, "g2") + .await + .unwrap(); } #[tokio::test] @@ -189,7 +204,10 @@ async fn group_not_exist_test() { let client = stream_client().await; // create group - let mut group = client.collab_update_stream("w1", &oid, "g2").await.unwrap(); + let mut group = client + .collab_update_stream_group("w1", &oid, "g2") + .await + .unwrap(); group.destroy_group().await; let err = group diff --git a/services/appflowy-collaborate/src/group/group_init.rs b/services/appflowy-collaborate/src/group/group_init.rs index 971cf0dc..e7bafc28 100644 --- a/services/appflowy-collaborate/src/group/group_init.rs +++ b/services/appflowy-collaborate/src/group/group_init.rs @@ -7,7 +7,7 @@ use collab::lock::RwLock; use collab::preclude::Collab; use collab_entity::CollabType; use dashmap::DashMap; -use futures::{Sink, Stream}; +use futures::{pin_mut, Sink, Stream}; use futures_util::{SinkExt, StreamExt}; use std::collections::VecDeque; use std::fmt::{Display, Formatter}; @@ -29,8 +29,11 @@ use collab_rt_entity::{AckCode, BroadcastSync, CollabAck, MessageByObjectId, Msg use collab_rt_entity::{ClientCollabMessage, CollabMessage}; use collab_rt_protocol::{decode_update, Message, MessageReader, RTProtocolError, SyncMessage}; use collab_stream::client::CollabRedisStream; +use collab_stream::collab_update_sink::CollabUpdateSink; use collab_stream::error::StreamError; -use collab_stream::model::{CollabStreamUpdate, CollabUpdateEvent, StreamBinary}; +use collab_stream::model::{ + CollabStreamUpdate, CollabUpdateEvent, MessageId, StreamBinary, UpdateFlags, +}; use collab_stream::stream_group::StreamGroup; use database::collab::CollabStorage; @@ -89,6 +92,15 @@ impl CollabGroup { seq_no: AtomicU32::new(0), }); + /* + NOTE: we don't want to pass `Weak` to tasks and terminate them when they + cannot be upgraded since we want to be sure that ie. when collab group is to be removed, + that we're going to call for a final save of the document state. + + For that we use `CancellationToken` instead, which is racing against internal loops of child + tasks and triggered when this `CollabGroup` is dropped. + */ + // setup task used to receive messages from Redis { let state = state.clone(); @@ -99,16 +111,6 @@ impl CollabGroup { }); } - // setup task used to send messages to Redis - { - let state = state.clone(); - tokio::spawn(async move { - if let Err(err) = Self::outbound_task(state).await { - tracing::warn!("failed to send message: {}", err); - } - }); - } - // setup periodic snapshot { tokio::spawn(Self::snapshot_task( @@ -138,6 +140,7 @@ impl CollabGroup { &state.object_id, None, ); + pin_mut!(updates); loop { tokio::select! { _ = state.shutdown.cancelled() => { @@ -150,7 +153,7 @@ impl CollabGroup { state.last_activity.store(Arc::new(Instant::now())); }, Some(Err(err)) => { - tracing::warn!("failed to handle incoming update for collab `{}`: {}", state.object_id, err) + tracing::warn!("failed to handle incoming update for collab `{}`: {}", state.object_id, err); break; }, None => { @@ -183,11 +186,6 @@ impl CollabGroup { } } - /// Task used to send messages to Redis. - async fn outbound_task(state: Arc) -> Result<(), RealtimeError> { - todo!() - } - async fn snapshot_task(state: Arc, interval: Duration, is_new_collab: bool) { if is_new_collab { if let Err(err) = state.persister.save().await { @@ -267,7 +265,6 @@ impl CollabGroup { // create new subscription for new subscriber let subscriber_shutdown = self.state.shutdown.child_token(); - //TODO: spawn task for receiving messages from stream tokio::spawn(Self::receive_from_client_task( self.state.clone(), sink.clone(), @@ -543,7 +540,7 @@ impl CollabGroup { ) -> Result>, RTProtocolError> { state.metrics.apply_update_size.observe(update.len() as f64); let start = tokio::time::Instant::now(); - state.persister.send_update(origin, update).await; + state.persister.send_update(origin.clone(), update).await; let elapsed = start.elapsed(); state .metrics @@ -676,7 +673,7 @@ impl CollabUpdateStreamingImpl { collab_redis_stream: &CollabRedisStream, ) -> Result { let stream = collab_redis_stream - .collab_update_stream(workspace_id, object_id, "collaborate_update_producer") + .collab_update_stream_group(workspace_id, object_id, "collaborate_update_producer") .await?; let (sender, receiver) = mpsc::unbounded_channel(); tokio::spawn(async move { @@ -765,6 +762,7 @@ struct CollabPersister { indexer: Option>, /// Collab stored temporarily. temp_collab: ArcSwapOption, + update_sink: CollabUpdateSink, } impl CollabPersister { @@ -776,6 +774,7 @@ impl CollabPersister { collab_redis_stream: Arc, indexer: Option>, ) -> Self { + let update_sink = collab_redis_stream.collab_update_sink(&workspace_id, &object_id); Self { workspace_id, object_id, @@ -783,6 +782,7 @@ impl CollabPersister { storage, collab_redis_stream, indexer, + update_sink, temp_collab: Default::default(), } } @@ -792,8 +792,21 @@ impl CollabPersister { self.temp_collab.store(None); // cleanup temp collab } - async fn send_update(&self, sender_session: &CollabOrigin, update: Vec) { + async fn send_update( + &self, + sender: CollabOrigin, + update: Vec, + ) -> Result { // send updates to redis queue + let msg_id = self + .update_sink + .send(&CollabStreamUpdate::new( + update, + sender, + UpdateFlags::default(), + )) + .await?; + Ok(msg_id) } async fn send_awareness(&self, sender_session: &CollabOrigin, awareness_update: AwarenessUpdate) { @@ -814,11 +827,6 @@ impl CollabPersister { todo!() } - async fn receive_updates(&self) -> UpdateStream { - // 1. loop with yield on incoming Redis stream updates - todo!() - } - async fn save(&self) -> Result<(), RealtimeError> { // 1. try to acquire lock // 2. if successful -> self.load() diff --git a/services/appflowy-history/src/core/manager.rs b/services/appflowy-history/src/core/manager.rs index dc44bdc6..4ea7b31d 100644 --- a/services/appflowy-history/src/core/manager.rs +++ b/services/appflowy-history/src/core/manager.rs @@ -239,7 +239,7 @@ async fn init_collab_handle( ) -> Result { let group_name = format!("history_{}:{}", workspace_id, object_id); let update_stream = redis_stream - .collab_update_stream(workspace_id, object_id, &group_name) + .collab_update_stream_group(workspace_id, object_id, &group_name) .await .unwrap(); diff --git a/services/appflowy-history/tests/stream_test/update_stream_test.rs b/services/appflowy-history/tests/stream_test/update_stream_test.rs index 4b2b6e16..ab1ac86a 100644 --- a/services/appflowy-history/tests/stream_test/update_stream_test.rs +++ b/services/appflowy-history/tests/stream_test/update_stream_test.rs @@ -10,7 +10,7 @@ async fn single_reader_single_sender_update_stream_test() { let object_id = uuid::Uuid::new_v4().to_string(); let mut send_group = redis_stream - .collab_update_stream(&workspace, &object_id, "write") + .collab_update_stream_group(&workspace, &object_id, "write") .await .unwrap(); for i in 0..5 { @@ -18,7 +18,7 @@ async fn single_reader_single_sender_update_stream_test() { } let mut recv_group = redis_stream - .collab_update_stream(&workspace, &object_id, "read1") + .collab_update_stream_group(&workspace, &object_id, "read1") .await .unwrap(); @@ -55,19 +55,19 @@ async fn multiple_reader_single_sender_update_stream_test() { let object_id = uuid::Uuid::new_v4().to_string(); let mut send_group = redis_stream - .collab_update_stream(&workspace, &object_id, "write") + .collab_update_stream_group(&workspace, &object_id, "write") .await .unwrap(); send_group.insert_message(vec![1, 2, 3]).await.unwrap(); send_group.insert_message(vec![4, 5, 6]).await.unwrap(); let recv_group_1 = redis_stream - .collab_update_stream(&workspace, &object_id, "read1") + .collab_update_stream_group(&workspace, &object_id, "read1") .await .unwrap(); let recv_group_2 = redis_stream - .collab_update_stream(&workspace, &object_id, "read2") + .collab_update_stream_group(&workspace, &object_id, "read2") .await .unwrap(); // Both groups should have the same messages