chore: collab streams
This commit is contained in:
parent
82af7edc3f
commit
1d7e35c2b9
|
|
@ -2452,6 +2452,7 @@ version = "0.1.0"
|
|||
dependencies = [
|
||||
"anyhow",
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
"bincode",
|
||||
"bytes",
|
||||
"chrono",
|
||||
|
|
|
|||
|
|
@ -238,14 +238,14 @@ pub trait CollabSyncProtocol {
|
|||
}
|
||||
}
|
||||
|
||||
const LARGE_UPDATE_THRESHOLD: usize = 1024 * 1024; // 1MB
|
||||
pub const LARGE_UPDATE_THRESHOLD: usize = 1024 * 1024; // 1MB
|
||||
|
||||
#[inline]
|
||||
pub async fn decode_update(update: Vec<u8>) -> Result<Update, RTProtocolError> {
|
||||
pub async fn decode_update(update: Vec<u8>) -> Result<Update, yrs::encoding::read::Error> {
|
||||
let update = if update.len() > LARGE_UPDATE_THRESHOLD {
|
||||
spawn_blocking(move || Update::decode_v1(&update))
|
||||
.await
|
||||
.map_err(|err| RTProtocolError::Internal(err.into()))?
|
||||
.map_err(|err| yrs::encoding::read::Error::Custom(err.to_string()))?
|
||||
} else {
|
||||
Update::decode_v1(&update)
|
||||
}?;
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ chrono = "0.4"
|
|||
tokio-util = { version = "0.7" }
|
||||
prost.workspace = true
|
||||
async-stream.workspace = true
|
||||
async-trait.workspace = true
|
||||
|
||||
|
||||
[dev-dependencies]
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
use crate::collab_update_sink::CollabUpdateSink;
|
||||
use crate::error::StreamError;
|
||||
use crate::lease::{Lease, LeaseAcquisition};
|
||||
use crate::model::{CollabStreamUpdate, CollabStreamUpdateBatch, CollabUpdateEvent, MessageId};
|
||||
use crate::pubsub::{CollabStreamPub, CollabStreamSub};
|
||||
use crate::stream_group::{StreamConfig, StreamGroup};
|
||||
|
|
@ -7,6 +8,7 @@ use futures::Stream;
|
|||
use redis::aio::ConnectionManager;
|
||||
use redis::streams::{StreamReadOptions, StreamReadReply};
|
||||
use redis::AsyncCommands;
|
||||
use std::time::Duration;
|
||||
use tracing::error;
|
||||
|
||||
pub const CONTROL_STREAM_KEY: &str = "af_collab_control";
|
||||
|
|
@ -17,6 +19,8 @@ pub struct CollabRedisStream {
|
|||
}
|
||||
|
||||
impl CollabRedisStream {
|
||||
pub const LEASE_TTL: Duration = Duration::from_secs(60);
|
||||
|
||||
pub async fn new(redis_client: redis::Client) -> Result<Self, redis::RedisError> {
|
||||
let connection_manager = redis_client.get_connection_manager().await?;
|
||||
Ok(Self::new_with_connection_manager(connection_manager))
|
||||
|
|
@ -26,6 +30,18 @@ impl CollabRedisStream {
|
|||
Self { connection_manager }
|
||||
}
|
||||
|
||||
pub async fn lease(
|
||||
&self,
|
||||
workspace_id: &str,
|
||||
object_id: &str,
|
||||
) -> Result<Option<LeaseAcquisition>, StreamError> {
|
||||
let lease_key = format!("af_snapshot:lease:{}:{}", workspace_id, object_id);
|
||||
self
|
||||
.connection_manager
|
||||
.lease(lease_key, Self::LEASE_TTL)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn collab_control_stream(
|
||||
&self,
|
||||
key: &str,
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
use crate::error::StreamError;
|
||||
use crate::model::{CollabStreamUpdate, MessageId};
|
||||
use collab::preclude::updates::encoder::Encode;
|
||||
use redis::aio::ConnectionManager;
|
||||
use redis::cmd;
|
||||
use tokio::sync::Mutex;
|
||||
|
|
@ -18,6 +19,7 @@ impl CollabUpdateSink {
|
|||
}
|
||||
|
||||
pub async fn send(&self, msg: &CollabStreamUpdate) -> Result<MessageId, StreamError> {
|
||||
let sv = msg.state_vector.encode_v1();
|
||||
let mut lock = self.conn.lock().await;
|
||||
let msg_id: MessageId = cmd("XADD")
|
||||
.arg(&self.stream_key)
|
||||
|
|
@ -26,8 +28,10 @@ impl CollabUpdateSink {
|
|||
.arg(msg.flags)
|
||||
.arg("sender")
|
||||
.arg(msg.sender.to_string())
|
||||
.arg("sv")
|
||||
.arg(&sv)
|
||||
.arg("data")
|
||||
.arg(&msg.data)
|
||||
.arg(&*msg.data)
|
||||
.query_async(&mut *lock)
|
||||
.await?;
|
||||
Ok(msg_id)
|
||||
|
|
|
|||
|
|
@ -1,11 +1,11 @@
|
|||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant, UNIX_EPOCH};
|
||||
|
||||
use crate::error::RealtimeError;
|
||||
use crate::client::CollabRedisStream;
|
||||
use crate::error::StreamError;
|
||||
use async_trait::async_trait;
|
||||
use chrono::{DateTime, NaiveDateTime};
|
||||
use redis::aio::ConnectionManager;
|
||||
use redis::{RedisResult, Value};
|
||||
use uuid::Uuid;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
|
||||
|
||||
const RELEASE_SCRIPT: &str = r#"
|
||||
if redis.call("GET", KEYS[1]) == ARGV[1] then
|
||||
|
|
@ -15,10 +15,46 @@ else
|
|||
end
|
||||
"#;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct LeaseAcquisition {
|
||||
key: String,
|
||||
token: Uuid,
|
||||
conn: Option<ConnectionManager>,
|
||||
stream_key: String,
|
||||
token: u128,
|
||||
}
|
||||
|
||||
impl LeaseAcquisition {
|
||||
pub async fn release(&mut self) -> Result<bool, StreamError> {
|
||||
if let Some(conn) = self.conn.take() {
|
||||
Self::release_internal(conn, &self.stream_key, self.token).await
|
||||
} else {
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
async fn release_internal<S: AsRef<str>>(
|
||||
mut conn: ConnectionManager,
|
||||
stream_key: S,
|
||||
token: u128,
|
||||
) -> Result<bool, StreamError> {
|
||||
let script = redis::Script::new(RELEASE_SCRIPT);
|
||||
let result: i32 = script
|
||||
.key(stream_key.as_ref())
|
||||
.arg(token.to_le_bytes().as_slice())
|
||||
.invoke_async(&mut conn)
|
||||
.await?;
|
||||
Ok(result == 1)
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for LeaseAcquisition {
|
||||
fn drop(&mut self) {
|
||||
if let Some(conn) = self.conn.take() {
|
||||
tokio::spawn(Self::release_internal(
|
||||
conn,
|
||||
self.stream_key.clone(),
|
||||
self.token,
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// This is Redlock algorithm implementation.
|
||||
|
|
@ -28,63 +64,55 @@ pub trait Lease {
|
|||
/// Attempt to acquire lease on a stream for a given time-to-live.
|
||||
/// Returns `None` if the lease could not be acquired.
|
||||
async fn lease(
|
||||
&mut self,
|
||||
stream_id: Arc<str>,
|
||||
&self,
|
||||
stream_key: String,
|
||||
ttl: Duration,
|
||||
) -> Result<Option<LeaseAcquisition>, RealtimeError>;
|
||||
|
||||
/// Releases a previously acquired lease (via: [Lease::lease]).
|
||||
async fn release(&mut self, acq: LeaseAcquisition) -> Result<bool, RealtimeError>;
|
||||
) -> Result<Option<LeaseAcquisition>, StreamError>;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Lease for ConnectionManager {
|
||||
async fn lease(
|
||||
&mut self,
|
||||
stream_id: Arc<str>,
|
||||
&self,
|
||||
stream_key: String,
|
||||
ttl: Duration,
|
||||
) -> Result<Option<LeaseAcquisition>, RealtimeError> {
|
||||
) -> Result<Option<LeaseAcquisition>, StreamError> {
|
||||
let mut conn = self.clone();
|
||||
let ttl = ttl.as_millis() as u64;
|
||||
let token = Uuid::new_v4();
|
||||
let key = format!("{}-lease", stream_id);
|
||||
tracing::trace!("acquiring lease {} for {}ms", key, ttl);
|
||||
let result: RedisResult<Value> = redis::cmd("SET")
|
||||
.arg(&key)
|
||||
.arg(token.as_bytes())
|
||||
let token = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis();
|
||||
tracing::trace!("acquiring lease `{}` for {}ms", stream_key, ttl);
|
||||
let result: Value = redis::cmd("SET")
|
||||
.arg(&stream_key)
|
||||
.arg(token.to_le_bytes().as_slice())
|
||||
.arg("NX")
|
||||
.arg("PX")
|
||||
.arg(ttl)
|
||||
.query_async(self)
|
||||
.await;
|
||||
.query_async(&mut conn)
|
||||
.await?;
|
||||
|
||||
match result {
|
||||
Ok(Value::Okay) => Ok(Some(LeaseAcquisition { key, token })),
|
||||
Ok(o) => {
|
||||
Value::Okay => Ok(Some(LeaseAcquisition {
|
||||
conn: Some(conn),
|
||||
stream_key,
|
||||
token,
|
||||
})),
|
||||
o => {
|
||||
tracing::trace!("lease locked: {:?}", o);
|
||||
Ok(None)
|
||||
},
|
||||
Err(err) => Err(RealtimeError::Lease(err.into())),
|
||||
}
|
||||
}
|
||||
|
||||
async fn release(&mut self, acq: LeaseAcquisition) -> Result<bool, RealtimeError> {
|
||||
let script = redis::Script::new(RELEASE_SCRIPT);
|
||||
let result: i32 = script
|
||||
.key(acq.key)
|
||||
.arg(acq.token.as_bytes())
|
||||
.invoke_async(self)
|
||||
.await
|
||||
.map_err(|err| RealtimeError::Lease(err.into()))?;
|
||||
Ok(result == 1)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use crate::group::lease::Lease;
|
||||
use redis::Client;
|
||||
use crate::lease::Lease;
|
||||
use redis::Client;
|
||||
|
||||
#[tokio::test]
|
||||
#[tokio::test]
|
||||
async fn lease_acquisition() {
|
||||
let redis_client = Client::open("redis://localhost:6379").unwrap();
|
||||
let mut conn = redis_client.get_connection_manager().await.unwrap();
|
||||
|
|
@ -103,7 +131,7 @@ mod test {
|
|||
|
||||
assert!(l2.is_none(), "should fail to acquire lease");
|
||||
|
||||
conn.release(l1.unwrap()).await.unwrap();
|
||||
l1.unwrap().release().await.unwrap();
|
||||
|
||||
let l3 = conn
|
||||
.lease("stream1".into(), std::time::Duration::from_secs(1))
|
||||
|
|
@ -1,6 +1,7 @@
|
|||
pub mod client;
|
||||
pub mod collab_update_sink;
|
||||
pub mod error;
|
||||
pub mod lease;
|
||||
pub mod model;
|
||||
pub mod pubsub;
|
||||
pub mod stream_group;
|
||||
|
|
|
|||
|
|
@ -1,6 +1,8 @@
|
|||
use crate::error::{internal, StreamError};
|
||||
use bytes::Bytes;
|
||||
use collab::core::origin::{CollabClient, CollabOrigin};
|
||||
use collab::preclude::updates::decoder::Decode;
|
||||
use collab::preclude::StateVector;
|
||||
use collab_entity::proto::collab::collab_update_event::Update;
|
||||
use collab_entity::{proto, CollabType};
|
||||
use prost::Message;
|
||||
|
|
@ -27,6 +29,15 @@ pub struct MessageId {
|
|||
pub sequence_number: u16,
|
||||
}
|
||||
|
||||
impl MessageId {
|
||||
pub fn new(timestamp_ms: u64, sequence_number: u16) -> Self {
|
||||
MessageId {
|
||||
timestamp_ms,
|
||||
sequence_number,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for MessageId {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}-{}", self.timestamp_ms, self.sequence_number)
|
||||
|
|
@ -358,12 +369,13 @@ impl TryFrom<CollabUpdateEvent> for StreamBinary {
|
|||
|
||||
pub struct CollabStreamUpdate {
|
||||
pub data: Vec<u8>,
|
||||
pub state_vector: StateVector,
|
||||
pub sender: CollabOrigin,
|
||||
pub flags: UpdateFlags,
|
||||
}
|
||||
|
||||
impl CollabStreamUpdate {
|
||||
pub fn new<B, F>(data: B, sender: CollabOrigin, flags: F) -> Self
|
||||
pub fn new<B, F>(data: B, state_vector: StateVector, sender: CollabOrigin, flags: F) -> Self
|
||||
where
|
||||
B: Into<Vec<u8>>,
|
||||
F: Into<UpdateFlags>,
|
||||
|
|
@ -371,6 +383,7 @@ impl CollabStreamUpdate {
|
|||
CollabStreamUpdate {
|
||||
data: data.into(),
|
||||
sender,
|
||||
state_vector,
|
||||
flags: flags.into(),
|
||||
}
|
||||
}
|
||||
|
|
@ -403,6 +416,15 @@ impl FromRedisValue for CollabStreamUpdateBatch {
|
|||
collab_origin
|
||||
},
|
||||
};
|
||||
let state_vector = match fields.get("sv") {
|
||||
Some(value) => {
|
||||
let bytes = Bytes::from_redis_value(value)?;
|
||||
let state_vector =
|
||||
StateVector::decode_v1(&bytes).map_err(|err| internal(err.to_string()))?;
|
||||
Ok(state_vector)
|
||||
},
|
||||
None => Err(internal("expecting field `sv`")),
|
||||
}?;
|
||||
let flags = match fields.get("flags") {
|
||||
None => UpdateFlags::default(),
|
||||
Some(flags) => u8::from_redis_value(flags).unwrap_or(0).into(),
|
||||
|
|
@ -416,6 +438,7 @@ impl FromRedisValue for CollabStreamUpdateBatch {
|
|||
CollabStreamUpdate {
|
||||
data,
|
||||
sender,
|
||||
state_vector,
|
||||
flags,
|
||||
},
|
||||
);
|
||||
|
|
|
|||
|
|
@ -249,13 +249,13 @@ impl Display for QueryCollabParams {
|
|||
}
|
||||
|
||||
impl QueryCollabParams {
|
||||
pub fn new<T1: ToString, T2: ToString>(
|
||||
pub fn new<T1: Into<String>, T2: Into<String>>(
|
||||
object_id: T1,
|
||||
collab_type: CollabType,
|
||||
workspace_id: T2,
|
||||
) -> Self {
|
||||
let workspace_id = workspace_id.to_string();
|
||||
let object_id = object_id.to_string();
|
||||
let workspace_id = workspace_id.into();
|
||||
let object_id = object_id.into();
|
||||
let inner = QueryCollab {
|
||||
object_id,
|
||||
collab_type,
|
||||
|
|
|
|||
|
|
@ -1,29 +1,16 @@
|
|||
use crate::error::RealtimeError;
|
||||
use crate::indexer::Indexer;
|
||||
use crate::metrics::CollabRealtimeMetrics;
|
||||
use crate::state::RedisConnectionManager;
|
||||
use anyhow::anyhow;
|
||||
use arc_swap::{ArcSwap, ArcSwapAny, ArcSwapOption};
|
||||
use bytes::Bytes;
|
||||
use collab::core::collab::DataSource;
|
||||
use collab::core::origin::CollabOrigin;
|
||||
use collab::entity::EncodedCollab;
|
||||
use collab::lock::RwLock;
|
||||
use collab::preclude::Collab;
|
||||
use collab_entity::CollabType;
|
||||
use dashmap::DashMap;
|
||||
use futures::{pin_mut, Sink, Stream};
|
||||
use futures_util::{SinkExt, StreamExt};
|
||||
use std::collections::VecDeque;
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::sync::{mpsc, Mutex};
|
||||
use tokio::time::{interval, MissedTickBehavior};
|
||||
use tokio_util::sync::{CancellationToken, DropGuard};
|
||||
use tracing::{error, event, info, trace};
|
||||
use yrs::sync::AwarenessUpdate;
|
||||
use yrs::updates::decoder::{Decode, DecoderV1, DecoderV2};
|
||||
use yrs::updates::encoder::{Encode, Encoder, EncoderV1};
|
||||
use yrs::{ReadTxn, StateVector, Update};
|
||||
|
||||
use collab_rt_entity::user::RealtimeUser;
|
||||
use collab_rt_entity::{AckCode, BroadcastSync, CollabAck, MessageByObjectId, MsgId};
|
||||
use collab_rt_entity::{ClientCollabMessage, CollabMessage};
|
||||
|
|
@ -35,18 +22,53 @@ use collab_stream::model::{
|
|||
CollabStreamUpdate, CollabUpdateEvent, MessageId, StreamBinary, UpdateFlags,
|
||||
};
|
||||
use collab_stream::stream_group::StreamGroup;
|
||||
use database::collab::CollabStorage;
|
||||
|
||||
use crate::error::RealtimeError;
|
||||
use crate::indexer::Indexer;
|
||||
use crate::metrics::CollabRealtimeMetrics;
|
||||
use crate::state::RedisConnectionManager;
|
||||
use dashmap::DashMap;
|
||||
use database::collab::{CollabStorage, GetCollabOrigin};
|
||||
use database_entity::dto::QueryCollabParams;
|
||||
use futures::{pin_mut, Sink, Stream};
|
||||
use futures_util::{SinkExt, StreamExt};
|
||||
use std::collections::VecDeque;
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::os::linux::raw::stat;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::sync::{mpsc, Mutex};
|
||||
use tokio::time::{interval, MissedTickBehavior};
|
||||
use tokio_util::sync::{CancellationToken, DropGuard};
|
||||
use tracing::{error, event, info, trace};
|
||||
use yrs::encoding::read::Error;
|
||||
use yrs::sync::AwarenessUpdate;
|
||||
use yrs::updates::decoder::{Decode, DecoderV1, DecoderV2};
|
||||
use yrs::updates::encoder::{Encode, Encoder, EncoderV1};
|
||||
use yrs::{ReadTxn, StateVector, Update};
|
||||
|
||||
/// A group used to manage a single [Collab] object
|
||||
pub struct CollabGroup {
|
||||
state: Arc<CollabGroupState>,
|
||||
}
|
||||
|
||||
/// Inner state of [CollabGroup] that's private and hidden behind Arc, so that it can be moved into
|
||||
/// tasks.
|
||||
struct CollabGroupState {
|
||||
workspace_id: String,
|
||||
object_id: String,
|
||||
collab_type: CollabType,
|
||||
/// A list of subscribers to this group. Each subscriber will receive updates from the
|
||||
/// broadcast.
|
||||
subscribers: DashMap<RealtimeUser, Subscription>,
|
||||
persister: CollabPersister,
|
||||
metrics: Arc<CollabRealtimeMetrics>,
|
||||
/// Cancellation token triggered when current collab group is about to be stopped.
|
||||
/// This will also shut down all subsequent [Subscription]s.
|
||||
shutdown: CancellationToken,
|
||||
last_activity: ArcSwap<Instant>,
|
||||
seq_no: AtomicU32,
|
||||
/// The most recent state vector from a redis update.
|
||||
state_vector: RwLock<StateVector>,
|
||||
}
|
||||
|
||||
impl Drop for CollabGroup {
|
||||
fn drop(&mut self) {
|
||||
// we're going to use state shutdown to cancel subsequent tasks
|
||||
|
|
@ -90,6 +112,7 @@ impl CollabGroup {
|
|||
persister,
|
||||
last_activity: ArcSwap::new(Instant::now().into()),
|
||||
seq_no: AtomicU32::new(0),
|
||||
state_vector: Default::default(),
|
||||
});
|
||||
|
||||
/*
|
||||
|
|
@ -150,7 +173,6 @@ impl CollabGroup {
|
|||
match res {
|
||||
Some(Ok(update)) => {
|
||||
Self::handle_inbound_update(&state, update).await;
|
||||
state.last_activity.store(Arc::new(Instant::now()));
|
||||
},
|
||||
Some(Err(err)) => {
|
||||
tracing::warn!("failed to handle incoming update for collab `{}`: {}", state.object_id, err);
|
||||
|
|
@ -167,6 +189,15 @@ impl CollabGroup {
|
|||
}
|
||||
|
||||
async fn handle_inbound_update(state: &CollabGroupState, update: CollabStreamUpdate) {
|
||||
// we received new update, which means that our temp_collab within our persister's task
|
||||
// is no longer up to date: we need to clear it
|
||||
state.persister.clear_collab();
|
||||
|
||||
// update state vector based on incoming message
|
||||
let mut sv = state.state_vector.write().await;
|
||||
sv.merge(update.state_vector);
|
||||
drop(sv);
|
||||
|
||||
let seq_num = state.seq_no.fetch_add(1, Ordering::SeqCst);
|
||||
let message = BroadcastSync::new(update.sender, state.object_id.clone(), update.data, seq_num);
|
||||
for mut e in state.subscribers.iter_mut() {
|
||||
|
|
@ -183,6 +214,8 @@ impl CollabGroup {
|
|||
err
|
||||
);
|
||||
}
|
||||
|
||||
state.last_activity.store(Arc::new(Instant::now()));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -198,6 +231,7 @@ impl CollabGroup {
|
|||
}
|
||||
|
||||
let mut snapshot_tick = tokio::time::interval(interval);
|
||||
// if saving took longer than snapshot_tick, just skip it over and try in the next round
|
||||
snapshot_tick.set_missed_tick_behavior(MissedTickBehavior::Skip);
|
||||
|
||||
loop {
|
||||
|
|
@ -343,7 +377,7 @@ impl CollabGroup {
|
|||
continue;
|
||||
}
|
||||
for message in messages {
|
||||
match Self::handle_client_message(state, sink, message).await {
|
||||
match Self::handle_client_message(state, message).await {
|
||||
Ok(response) => {
|
||||
trace!("[realtime]: sending response: {}", response);
|
||||
match sink.send(response.into()).await {
|
||||
|
|
@ -368,14 +402,10 @@ impl CollabGroup {
|
|||
}
|
||||
|
||||
/// Handle the message sent from the client
|
||||
async fn handle_client_message<Sink>(
|
||||
async fn handle_client_message(
|
||||
state: &CollabGroupState,
|
||||
sink: &mut Sink,
|
||||
collab_msg: ClientCollabMessage,
|
||||
) -> Result<CollabAck, RealtimeError>
|
||||
where
|
||||
Sink: SubscriptionSink + 'static,
|
||||
{
|
||||
) -> Result<CollabAck, RealtimeError> {
|
||||
let msg_id = collab_msg.msg_id();
|
||||
let message_origin = collab_msg.origin().clone();
|
||||
|
||||
|
|
@ -403,7 +433,7 @@ impl CollabGroup {
|
|||
state.metrics.acquire_collab_lock_count.inc();
|
||||
|
||||
// Spawn a blocking task to handle the message
|
||||
let result = Self::handle_message(state, sink, &payload, &message_origin, msg_id).await;
|
||||
let result = Self::handle_message(state, &payload, &message_origin, msg_id).await;
|
||||
|
||||
match result {
|
||||
Ok(inner_result) => match inner_result {
|
||||
|
|
@ -417,16 +447,12 @@ impl CollabGroup {
|
|||
}
|
||||
}
|
||||
|
||||
async fn handle_message<Sink>(
|
||||
async fn handle_message(
|
||||
state: &CollabGroupState,
|
||||
sink: &mut Sink,
|
||||
payload: &[u8],
|
||||
message_origin: &CollabOrigin,
|
||||
msg_id: MsgId,
|
||||
) -> Result<Option<CollabAck>, RealtimeError>
|
||||
where
|
||||
Sink: SubscriptionSink + 'static,
|
||||
{
|
||||
) -> Result<Option<CollabAck>, RealtimeError> {
|
||||
let mut decoder = DecoderV1::from(payload);
|
||||
let reader = MessageReader::new(&mut decoder);
|
||||
let mut ack_response = None;
|
||||
|
|
@ -435,7 +461,7 @@ impl CollabGroup {
|
|||
match msg {
|
||||
Ok(msg) => {
|
||||
is_sync_step2 = matches!(msg, Message::Sync(SyncMessage::SyncStep2(_)));
|
||||
match Self::handle_protocol_message(state, sink, message_origin, msg).await {
|
||||
match Self::handle_protocol_message(state, message_origin, msg).await {
|
||||
Ok(payload) => {
|
||||
state.metrics.apply_update_count.inc();
|
||||
// One ClientCollabMessage can have multiple Yrs [Message] in it, but we only need to
|
||||
|
|
@ -487,18 +513,14 @@ impl CollabGroup {
|
|||
Ok(ack_response)
|
||||
}
|
||||
|
||||
async fn handle_protocol_message<Sink>(
|
||||
async fn handle_protocol_message(
|
||||
state: &CollabGroupState,
|
||||
sink: &mut Sink,
|
||||
origin: &CollabOrigin,
|
||||
msg: Message,
|
||||
) -> Result<Option<Vec<u8>>, RTProtocolError>
|
||||
where
|
||||
Sink: SubscriptionSink + 'static,
|
||||
{
|
||||
) -> Result<Option<Vec<u8>>, RTProtocolError> {
|
||||
match msg {
|
||||
Message::Sync(msg) => match msg {
|
||||
SyncMessage::SyncStep1(sv) => Self::handle_sync_step1(state, sink, &sv).await,
|
||||
SyncMessage::SyncStep1(sv) => Self::handle_sync_step1(state, &sv).await,
|
||||
SyncMessage::SyncStep2(update) => Self::handle_sync_step2(state, origin, update).await,
|
||||
SyncMessage::Update(update) => Self::handle_update(state, origin, update).await,
|
||||
},
|
||||
|
|
@ -509,11 +531,18 @@ impl CollabGroup {
|
|||
}
|
||||
}
|
||||
|
||||
async fn handle_sync_step1<Sink>(
|
||||
async fn handle_sync_step1(
|
||||
state: &CollabGroupState,
|
||||
sink: &mut Sink,
|
||||
remote_sv: &StateVector,
|
||||
) -> Result<Option<Vec<u8>>, RTProtocolError> {
|
||||
if let Ok(sv) = state.state_vector.try_read() {
|
||||
// we optimistically try to obtain state vector lock for a fast track:
|
||||
// if we remote sv is up-to-date with current one, we don't need to do anything
|
||||
if remote_sv == &*sv {
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
|
||||
let snapshot = state
|
||||
.persister
|
||||
.load()
|
||||
|
|
@ -540,7 +569,24 @@ impl CollabGroup {
|
|||
) -> Result<Option<Vec<u8>>, RTProtocolError> {
|
||||
state.metrics.apply_update_size.observe(update.len() as f64);
|
||||
let start = tokio::time::Instant::now();
|
||||
state.persister.send_update(origin.clone(), update).await;
|
||||
// we try to decode update to make sure it's not malformed and to extract state vector
|
||||
let (update, decoded_update) = if update.len() <= collab_rt_protocol::LARGE_UPDATE_THRESHOLD {
|
||||
let decoded_update = Update::decode_v1(&update)?;
|
||||
(update, decoded_update)
|
||||
} else {
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let decoded_update = Update::decode_v1(&update)?;
|
||||
Ok::<(Vec<u8>, yrs::Update), yrs::encoding::read::Error>((update, decoded_update))
|
||||
})
|
||||
.await
|
||||
.map_err(|err| RTProtocolError::Internal(err.into()))??
|
||||
};
|
||||
let state_vector = decoded_update.state_vector();
|
||||
state
|
||||
.persister
|
||||
.send_update(origin.clone(), update, state_vector)
|
||||
.await
|
||||
.map_err(|err| RTProtocolError::Internal(err.into()))?;
|
||||
let elapsed = start.elapsed();
|
||||
state
|
||||
.metrics
|
||||
|
|
@ -562,12 +608,12 @@ impl CollabGroup {
|
|||
origin: &CollabOrigin,
|
||||
update: AwarenessUpdate,
|
||||
) -> Result<Option<Vec<u8>>, RTProtocolError> {
|
||||
state.persister.send_awareness(origin, update).await;
|
||||
|
||||
//let mut lock = collab.write().await;
|
||||
//let collab = (*lock).borrow_mut();
|
||||
//collab.get_awareness().apply_update(update)?;
|
||||
todo!()
|
||||
state
|
||||
.persister
|
||||
.send_awareness(origin, update)
|
||||
.await
|
||||
.map_err(|err| RTProtocolError::Internal(err.into()))?;
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
|
|
@ -644,24 +690,6 @@ impl CollabGroup {
|
|||
}
|
||||
}
|
||||
|
||||
/// Inner state of [CollabGroup] that's private and hidden behind Arc, so that it can be moved into
|
||||
/// tasks.
|
||||
struct CollabGroupState {
|
||||
workspace_id: String,
|
||||
object_id: String,
|
||||
collab_type: CollabType,
|
||||
/// A list of subscribers to this group. Each subscriber will receive updates from the
|
||||
/// broadcast.
|
||||
subscribers: DashMap<RealtimeUser, Subscription>,
|
||||
persister: CollabPersister,
|
||||
metrics: Arc<CollabRealtimeMetrics>,
|
||||
/// Cancellation token triggered when current collab group is about to be stopped.
|
||||
/// This will also shut down all subsequent [Subscription]s.
|
||||
shutdown: CancellationToken,
|
||||
last_activity: ArcSwap<Instant>,
|
||||
seq_no: AtomicU32,
|
||||
}
|
||||
|
||||
struct CollabUpdateStreamingImpl {
|
||||
sender: mpsc::UnboundedSender<Vec<u8>>,
|
||||
}
|
||||
|
|
@ -788,7 +816,7 @@ impl CollabPersister {
|
|||
}
|
||||
|
||||
/// Drop temp collab i.e. because it was no longer up to date or was not accessed for too long.
|
||||
fn reset(&self) {
|
||||
fn clear_collab(&self) {
|
||||
self.temp_collab.store(None); // cleanup temp collab
|
||||
}
|
||||
|
||||
|
|
@ -796,12 +824,14 @@ impl CollabPersister {
|
|||
&self,
|
||||
sender: CollabOrigin,
|
||||
update: Vec<u8>,
|
||||
state_vector: StateVector,
|
||||
) -> Result<MessageId, StreamError> {
|
||||
// send updates to redis queue
|
||||
let msg_id = self
|
||||
.update_sink
|
||||
.send(&CollabStreamUpdate::new(
|
||||
update,
|
||||
state_vector,
|
||||
sender,
|
||||
UpdateFlags::default(),
|
||||
))
|
||||
|
|
@ -809,7 +839,11 @@ impl CollabPersister {
|
|||
Ok(msg_id)
|
||||
}
|
||||
|
||||
async fn send_awareness(&self, sender_session: &CollabOrigin, awareness_update: AwarenessUpdate) {
|
||||
async fn send_awareness(
|
||||
&self,
|
||||
sender_session: &CollabOrigin,
|
||||
awareness_update: AwarenessUpdate,
|
||||
) -> Result<MessageId, StreamError> {
|
||||
// send awareness updates to redis queue: is it needed? What are we using awareness for here?
|
||||
todo!()
|
||||
}
|
||||
|
|
@ -817,28 +851,49 @@ impl CollabPersister {
|
|||
async fn load(&self) -> Result<Arc<CollabSnapshot>, RealtimeError> {
|
||||
match self.temp_collab.load_full() {
|
||||
Some(collab) => Ok(collab), // return cached collab
|
||||
None => self.force_load().await,
|
||||
None => self.load_internal().await,
|
||||
}
|
||||
}
|
||||
|
||||
async fn force_load(&self) -> Result<Arc<CollabSnapshot>, RealtimeError> {
|
||||
async fn load_internal(&self) -> Result<Arc<CollabSnapshot>, RealtimeError> {
|
||||
// 1. Try to load the latest snapshot from storage
|
||||
let params = QueryCollabParams::new(
|
||||
self.object_id.clone(),
|
||||
self.collab_type.clone(),
|
||||
self.workspace_id.clone(),
|
||||
);
|
||||
let encoded_collab = self
|
||||
.storage
|
||||
.get_encode_collab(GetCollabOrigin::Server, params, false)
|
||||
.await?;
|
||||
let collab = Collab::new_with_source(
|
||||
CollabOrigin::Server,
|
||||
&self.object_id,
|
||||
DataSource::DocStateV1(encoded_collab.doc_state.into()),
|
||||
vec![],
|
||||
false,
|
||||
)?;
|
||||
// 2. consume all Redis updates on top of it (keep redis msg id)
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn save(&self) -> Result<(), RealtimeError> {
|
||||
// 1. try to acquire lock
|
||||
// 2. if successful -> self.load()
|
||||
// 3. if collab has any changes (any redis updates were applied):
|
||||
// 4. generate embeddings
|
||||
// 5. store collab
|
||||
// 6. prune any redis msg ids older than 5 min. since collab snapshot time
|
||||
todo!()
|
||||
if let Some(lease) = self
|
||||
.collab_redis_stream
|
||||
.lease(&self.workspace_id, &self.object_id)
|
||||
{
|
||||
// 3. if collab has any changes (any redis updates were applied):
|
||||
// 4. generate embeddings
|
||||
// 5. store collab
|
||||
// 6. prune any redis msg ids older than 5 min. since collab snapshot time
|
||||
todo!()
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct CollabSnapshot {
|
||||
pub collab: Collab,
|
||||
pub last_msg_id: String,
|
||||
pub last_msg_id: MessageId,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,5 @@
|
|||
pub(crate) mod cmd;
|
||||
pub(crate) mod group_init;
|
||||
mod lease;
|
||||
pub(crate) mod manager;
|
||||
mod plugin;
|
||||
pub(crate) mod protocol;
|
||||
|
|
|
|||
Loading…
Reference in New Issue