diff --git a/Cargo.lock b/Cargo.lock index a00d97e7..0f725908 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2452,6 +2452,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-stream", + "async-trait", "bincode", "bytes", "chrono", diff --git a/libs/collab-rt-protocol/src/protocol.rs b/libs/collab-rt-protocol/src/protocol.rs index 936d9a89..29d5c051 100644 --- a/libs/collab-rt-protocol/src/protocol.rs +++ b/libs/collab-rt-protocol/src/protocol.rs @@ -238,14 +238,14 @@ pub trait CollabSyncProtocol { } } -const LARGE_UPDATE_THRESHOLD: usize = 1024 * 1024; // 1MB +pub const LARGE_UPDATE_THRESHOLD: usize = 1024 * 1024; // 1MB #[inline] -pub async fn decode_update(update: Vec) -> Result { +pub async fn decode_update(update: Vec) -> Result { let update = if update.len() > LARGE_UPDATE_THRESHOLD { spawn_blocking(move || Update::decode_v1(&update)) .await - .map_err(|err| RTProtocolError::Internal(err.into()))? + .map_err(|err| yrs::encoding::read::Error::Custom(err.to_string()))? } else { Update::decode_v1(&update) }?; diff --git a/libs/collab-stream/Cargo.toml b/libs/collab-stream/Cargo.toml index dbecd26c..c8d50407 100644 --- a/libs/collab-stream/Cargo.toml +++ b/libs/collab-stream/Cargo.toml @@ -23,6 +23,7 @@ chrono = "0.4" tokio-util = { version = "0.7" } prost.workspace = true async-stream.workspace = true +async-trait.workspace = true [dev-dependencies] diff --git a/libs/collab-stream/src/client.rs b/libs/collab-stream/src/client.rs index f3d941a8..196576d5 100644 --- a/libs/collab-stream/src/client.rs +++ b/libs/collab-stream/src/client.rs @@ -1,5 +1,6 @@ use crate::collab_update_sink::CollabUpdateSink; use crate::error::StreamError; +use crate::lease::{Lease, LeaseAcquisition}; use crate::model::{CollabStreamUpdate, CollabStreamUpdateBatch, CollabUpdateEvent, MessageId}; use crate::pubsub::{CollabStreamPub, CollabStreamSub}; use crate::stream_group::{StreamConfig, StreamGroup}; @@ -7,6 +8,7 @@ use futures::Stream; use redis::aio::ConnectionManager; use redis::streams::{StreamReadOptions, StreamReadReply}; use redis::AsyncCommands; +use std::time::Duration; use tracing::error; pub const CONTROL_STREAM_KEY: &str = "af_collab_control"; @@ -17,6 +19,8 @@ pub struct CollabRedisStream { } impl CollabRedisStream { + pub const LEASE_TTL: Duration = Duration::from_secs(60); + pub async fn new(redis_client: redis::Client) -> Result { let connection_manager = redis_client.get_connection_manager().await?; Ok(Self::new_with_connection_manager(connection_manager)) @@ -26,6 +30,18 @@ impl CollabRedisStream { Self { connection_manager } } + pub async fn lease( + &self, + workspace_id: &str, + object_id: &str, + ) -> Result, StreamError> { + let lease_key = format!("af_snapshot:lease:{}:{}", workspace_id, object_id); + self + .connection_manager + .lease(lease_key, Self::LEASE_TTL) + .await + } + pub async fn collab_control_stream( &self, key: &str, diff --git a/libs/collab-stream/src/collab_update_sink.rs b/libs/collab-stream/src/collab_update_sink.rs index 4926088f..449f1cd3 100644 --- a/libs/collab-stream/src/collab_update_sink.rs +++ b/libs/collab-stream/src/collab_update_sink.rs @@ -1,5 +1,6 @@ use crate::error::StreamError; use crate::model::{CollabStreamUpdate, MessageId}; +use collab::preclude::updates::encoder::Encode; use redis::aio::ConnectionManager; use redis::cmd; use tokio::sync::Mutex; @@ -18,6 +19,7 @@ impl CollabUpdateSink { } pub async fn send(&self, msg: &CollabStreamUpdate) -> Result { + let sv = msg.state_vector.encode_v1(); let mut lock = self.conn.lock().await; let msg_id: MessageId = cmd("XADD") .arg(&self.stream_key) @@ -26,8 +28,10 @@ impl CollabUpdateSink { .arg(msg.flags) .arg("sender") .arg(msg.sender.to_string()) + .arg("sv") + .arg(&sv) .arg("data") - .arg(&msg.data) + .arg(&*msg.data) .query_async(&mut *lock) .await?; Ok(msg_id) diff --git a/services/appflowy-collaborate/src/group/lease.rs b/libs/collab-stream/src/lease.rs similarity index 50% rename from services/appflowy-collaborate/src/group/lease.rs rename to libs/collab-stream/src/lease.rs index 9c8360f3..9647dc60 100644 --- a/services/appflowy-collaborate/src/group/lease.rs +++ b/libs/collab-stream/src/lease.rs @@ -1,11 +1,11 @@ -use std::sync::Arc; -use std::time::{Duration, Instant, UNIX_EPOCH}; - -use crate::error::RealtimeError; +use crate::client::CollabRedisStream; +use crate::error::StreamError; use async_trait::async_trait; +use chrono::{DateTime, NaiveDateTime}; use redis::aio::ConnectionManager; use redis::{RedisResult, Value}; -use uuid::Uuid; +use std::sync::Arc; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; const RELEASE_SCRIPT: &str = r#" if redis.call("GET", KEYS[1]) == ARGV[1] then @@ -15,10 +15,46 @@ else end "#; -#[derive(Debug)] pub struct LeaseAcquisition { - key: String, - token: Uuid, + conn: Option, + stream_key: String, + token: u128, +} + +impl LeaseAcquisition { + pub async fn release(&mut self) -> Result { + if let Some(conn) = self.conn.take() { + Self::release_internal(conn, &self.stream_key, self.token).await + } else { + Ok(false) + } + } + + async fn release_internal>( + mut conn: ConnectionManager, + stream_key: S, + token: u128, + ) -> Result { + let script = redis::Script::new(RELEASE_SCRIPT); + let result: i32 = script + .key(stream_key.as_ref()) + .arg(token.to_le_bytes().as_slice()) + .invoke_async(&mut conn) + .await?; + Ok(result == 1) + } +} + +impl Drop for LeaseAcquisition { + fn drop(&mut self) { + if let Some(conn) = self.conn.take() { + tokio::spawn(Self::release_internal( + conn, + self.stream_key.clone(), + self.token, + )); + } + } } /// This is Redlock algorithm implementation. @@ -28,63 +64,55 @@ pub trait Lease { /// Attempt to acquire lease on a stream for a given time-to-live. /// Returns `None` if the lease could not be acquired. async fn lease( - &mut self, - stream_id: Arc, + &self, + stream_key: String, ttl: Duration, - ) -> Result, RealtimeError>; - - /// Releases a previously acquired lease (via: [Lease::lease]). - async fn release(&mut self, acq: LeaseAcquisition) -> Result; + ) -> Result, StreamError>; } #[async_trait] impl Lease for ConnectionManager { async fn lease( - &mut self, - stream_id: Arc, + &self, + stream_key: String, ttl: Duration, - ) -> Result, RealtimeError> { + ) -> Result, StreamError> { + let mut conn = self.clone(); let ttl = ttl.as_millis() as u64; - let token = Uuid::new_v4(); - let key = format!("{}-lease", stream_id); - tracing::trace!("acquiring lease {} for {}ms", key, ttl); - let result: RedisResult = redis::cmd("SET") - .arg(&key) - .arg(token.as_bytes()) + let token = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis(); + tracing::trace!("acquiring lease `{}` for {}ms", stream_key, ttl); + let result: Value = redis::cmd("SET") + .arg(&stream_key) + .arg(token.to_le_bytes().as_slice()) .arg("NX") .arg("PX") .arg(ttl) - .query_async(self) - .await; + .query_async(&mut conn) + .await?; match result { - Ok(Value::Okay) => Ok(Some(LeaseAcquisition { key, token })), - Ok(o) => { + Value::Okay => Ok(Some(LeaseAcquisition { + conn: Some(conn), + stream_key, + token, + })), + o => { tracing::trace!("lease locked: {:?}", o); Ok(None) }, - Err(err) => Err(RealtimeError::Lease(err.into())), } } - - async fn release(&mut self, acq: LeaseAcquisition) -> Result { - let script = redis::Script::new(RELEASE_SCRIPT); - let result: i32 = script - .key(acq.key) - .arg(acq.token.as_bytes()) - .invoke_async(self) - .await - .map_err(|err| RealtimeError::Lease(err.into()))?; - Ok(result == 1) - } } #[cfg(test)] mod test { - use crate::group::lease::Lease; - use redis::Client; + use crate::lease::Lease; + use redis::Client; - #[tokio::test] + #[tokio::test] async fn lease_acquisition() { let redis_client = Client::open("redis://localhost:6379").unwrap(); let mut conn = redis_client.get_connection_manager().await.unwrap(); @@ -103,7 +131,7 @@ mod test { assert!(l2.is_none(), "should fail to acquire lease"); - conn.release(l1.unwrap()).await.unwrap(); + l1.unwrap().release().await.unwrap(); let l3 = conn .lease("stream1".into(), std::time::Duration::from_secs(1)) diff --git a/libs/collab-stream/src/lib.rs b/libs/collab-stream/src/lib.rs index 43102ce2..bb928b85 100644 --- a/libs/collab-stream/src/lib.rs +++ b/libs/collab-stream/src/lib.rs @@ -1,6 +1,7 @@ pub mod client; pub mod collab_update_sink; pub mod error; +pub mod lease; pub mod model; pub mod pubsub; pub mod stream_group; diff --git a/libs/collab-stream/src/model.rs b/libs/collab-stream/src/model.rs index 926062d5..9617e183 100644 --- a/libs/collab-stream/src/model.rs +++ b/libs/collab-stream/src/model.rs @@ -1,6 +1,8 @@ use crate::error::{internal, StreamError}; use bytes::Bytes; use collab::core::origin::{CollabClient, CollabOrigin}; +use collab::preclude::updates::decoder::Decode; +use collab::preclude::StateVector; use collab_entity::proto::collab::collab_update_event::Update; use collab_entity::{proto, CollabType}; use prost::Message; @@ -27,6 +29,15 @@ pub struct MessageId { pub sequence_number: u16, } +impl MessageId { + pub fn new(timestamp_ms: u64, sequence_number: u16) -> Self { + MessageId { + timestamp_ms, + sequence_number, + } + } +} + impl Display for MessageId { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}-{}", self.timestamp_ms, self.sequence_number) @@ -358,12 +369,13 @@ impl TryFrom for StreamBinary { pub struct CollabStreamUpdate { pub data: Vec, + pub state_vector: StateVector, pub sender: CollabOrigin, pub flags: UpdateFlags, } impl CollabStreamUpdate { - pub fn new(data: B, sender: CollabOrigin, flags: F) -> Self + pub fn new(data: B, state_vector: StateVector, sender: CollabOrigin, flags: F) -> Self where B: Into>, F: Into, @@ -371,6 +383,7 @@ impl CollabStreamUpdate { CollabStreamUpdate { data: data.into(), sender, + state_vector, flags: flags.into(), } } @@ -403,6 +416,15 @@ impl FromRedisValue for CollabStreamUpdateBatch { collab_origin }, }; + let state_vector = match fields.get("sv") { + Some(value) => { + let bytes = Bytes::from_redis_value(value)?; + let state_vector = + StateVector::decode_v1(&bytes).map_err(|err| internal(err.to_string()))?; + Ok(state_vector) + }, + None => Err(internal("expecting field `sv`")), + }?; let flags = match fields.get("flags") { None => UpdateFlags::default(), Some(flags) => u8::from_redis_value(flags).unwrap_or(0).into(), @@ -416,6 +438,7 @@ impl FromRedisValue for CollabStreamUpdateBatch { CollabStreamUpdate { data, sender, + state_vector, flags, }, ); diff --git a/libs/database-entity/src/dto.rs b/libs/database-entity/src/dto.rs index c10b1664..62e164a4 100644 --- a/libs/database-entity/src/dto.rs +++ b/libs/database-entity/src/dto.rs @@ -249,13 +249,13 @@ impl Display for QueryCollabParams { } impl QueryCollabParams { - pub fn new( + pub fn new, T2: Into>( object_id: T1, collab_type: CollabType, workspace_id: T2, ) -> Self { - let workspace_id = workspace_id.to_string(); - let object_id = object_id.to_string(); + let workspace_id = workspace_id.into(); + let object_id = object_id.into(); let inner = QueryCollab { object_id, collab_type, diff --git a/services/appflowy-collaborate/src/group/group_init.rs b/services/appflowy-collaborate/src/group/group_init.rs index e7bafc28..f440c102 100644 --- a/services/appflowy-collaborate/src/group/group_init.rs +++ b/services/appflowy-collaborate/src/group/group_init.rs @@ -1,29 +1,16 @@ +use crate::error::RealtimeError; +use crate::indexer::Indexer; +use crate::metrics::CollabRealtimeMetrics; +use crate::state::RedisConnectionManager; use anyhow::anyhow; use arc_swap::{ArcSwap, ArcSwapAny, ArcSwapOption}; use bytes::Bytes; +use collab::core::collab::DataSource; use collab::core::origin::CollabOrigin; use collab::entity::EncodedCollab; use collab::lock::RwLock; use collab::preclude::Collab; use collab_entity::CollabType; -use dashmap::DashMap; -use futures::{pin_mut, Sink, Stream}; -use futures_util::{SinkExt, StreamExt}; -use std::collections::VecDeque; -use std::fmt::{Display, Formatter}; -use std::pin::Pin; -use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, Ordering}; -use std::sync::Arc; -use std::time::{Duration, Instant}; -use tokio::sync::{mpsc, Mutex}; -use tokio::time::{interval, MissedTickBehavior}; -use tokio_util::sync::{CancellationToken, DropGuard}; -use tracing::{error, event, info, trace}; -use yrs::sync::AwarenessUpdate; -use yrs::updates::decoder::{Decode, DecoderV1, DecoderV2}; -use yrs::updates::encoder::{Encode, Encoder, EncoderV1}; -use yrs::{ReadTxn, StateVector, Update}; - use collab_rt_entity::user::RealtimeUser; use collab_rt_entity::{AckCode, BroadcastSync, CollabAck, MessageByObjectId, MsgId}; use collab_rt_entity::{ClientCollabMessage, CollabMessage}; @@ -35,18 +22,53 @@ use collab_stream::model::{ CollabStreamUpdate, CollabUpdateEvent, MessageId, StreamBinary, UpdateFlags, }; use collab_stream::stream_group::StreamGroup; -use database::collab::CollabStorage; - -use crate::error::RealtimeError; -use crate::indexer::Indexer; -use crate::metrics::CollabRealtimeMetrics; -use crate::state::RedisConnectionManager; +use dashmap::DashMap; +use database::collab::{CollabStorage, GetCollabOrigin}; +use database_entity::dto::QueryCollabParams; +use futures::{pin_mut, Sink, Stream}; +use futures_util::{SinkExt, StreamExt}; +use std::collections::VecDeque; +use std::fmt::{Display, Formatter}; +use std::os::linux::raw::stat; +use std::pin::Pin; +use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, Ordering}; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::sync::{mpsc, Mutex}; +use tokio::time::{interval, MissedTickBehavior}; +use tokio_util::sync::{CancellationToken, DropGuard}; +use tracing::{error, event, info, trace}; +use yrs::encoding::read::Error; +use yrs::sync::AwarenessUpdate; +use yrs::updates::decoder::{Decode, DecoderV1, DecoderV2}; +use yrs::updates::encoder::{Encode, Encoder, EncoderV1}; +use yrs::{ReadTxn, StateVector, Update}; /// A group used to manage a single [Collab] object pub struct CollabGroup { state: Arc, } +/// Inner state of [CollabGroup] that's private and hidden behind Arc, so that it can be moved into +/// tasks. +struct CollabGroupState { + workspace_id: String, + object_id: String, + collab_type: CollabType, + /// A list of subscribers to this group. Each subscriber will receive updates from the + /// broadcast. + subscribers: DashMap, + persister: CollabPersister, + metrics: Arc, + /// Cancellation token triggered when current collab group is about to be stopped. + /// This will also shut down all subsequent [Subscription]s. + shutdown: CancellationToken, + last_activity: ArcSwap, + seq_no: AtomicU32, + /// The most recent state vector from a redis update. + state_vector: RwLock, +} + impl Drop for CollabGroup { fn drop(&mut self) { // we're going to use state shutdown to cancel subsequent tasks @@ -90,6 +112,7 @@ impl CollabGroup { persister, last_activity: ArcSwap::new(Instant::now().into()), seq_no: AtomicU32::new(0), + state_vector: Default::default(), }); /* @@ -150,7 +173,6 @@ impl CollabGroup { match res { Some(Ok(update)) => { Self::handle_inbound_update(&state, update).await; - state.last_activity.store(Arc::new(Instant::now())); }, Some(Err(err)) => { tracing::warn!("failed to handle incoming update for collab `{}`: {}", state.object_id, err); @@ -167,6 +189,15 @@ impl CollabGroup { } async fn handle_inbound_update(state: &CollabGroupState, update: CollabStreamUpdate) { + // we received new update, which means that our temp_collab within our persister's task + // is no longer up to date: we need to clear it + state.persister.clear_collab(); + + // update state vector based on incoming message + let mut sv = state.state_vector.write().await; + sv.merge(update.state_vector); + drop(sv); + let seq_num = state.seq_no.fetch_add(1, Ordering::SeqCst); let message = BroadcastSync::new(update.sender, state.object_id.clone(), update.data, seq_num); for mut e in state.subscribers.iter_mut() { @@ -183,6 +214,8 @@ impl CollabGroup { err ); } + + state.last_activity.store(Arc::new(Instant::now())); } } @@ -198,6 +231,7 @@ impl CollabGroup { } let mut snapshot_tick = tokio::time::interval(interval); + // if saving took longer than snapshot_tick, just skip it over and try in the next round snapshot_tick.set_missed_tick_behavior(MissedTickBehavior::Skip); loop { @@ -343,7 +377,7 @@ impl CollabGroup { continue; } for message in messages { - match Self::handle_client_message(state, sink, message).await { + match Self::handle_client_message(state, message).await { Ok(response) => { trace!("[realtime]: sending response: {}", response); match sink.send(response.into()).await { @@ -368,14 +402,10 @@ impl CollabGroup { } /// Handle the message sent from the client - async fn handle_client_message( + async fn handle_client_message( state: &CollabGroupState, - sink: &mut Sink, collab_msg: ClientCollabMessage, - ) -> Result - where - Sink: SubscriptionSink + 'static, - { + ) -> Result { let msg_id = collab_msg.msg_id(); let message_origin = collab_msg.origin().clone(); @@ -403,7 +433,7 @@ impl CollabGroup { state.metrics.acquire_collab_lock_count.inc(); // Spawn a blocking task to handle the message - let result = Self::handle_message(state, sink, &payload, &message_origin, msg_id).await; + let result = Self::handle_message(state, &payload, &message_origin, msg_id).await; match result { Ok(inner_result) => match inner_result { @@ -417,16 +447,12 @@ impl CollabGroup { } } - async fn handle_message( + async fn handle_message( state: &CollabGroupState, - sink: &mut Sink, payload: &[u8], message_origin: &CollabOrigin, msg_id: MsgId, - ) -> Result, RealtimeError> - where - Sink: SubscriptionSink + 'static, - { + ) -> Result, RealtimeError> { let mut decoder = DecoderV1::from(payload); let reader = MessageReader::new(&mut decoder); let mut ack_response = None; @@ -435,7 +461,7 @@ impl CollabGroup { match msg { Ok(msg) => { is_sync_step2 = matches!(msg, Message::Sync(SyncMessage::SyncStep2(_))); - match Self::handle_protocol_message(state, sink, message_origin, msg).await { + match Self::handle_protocol_message(state, message_origin, msg).await { Ok(payload) => { state.metrics.apply_update_count.inc(); // One ClientCollabMessage can have multiple Yrs [Message] in it, but we only need to @@ -487,18 +513,14 @@ impl CollabGroup { Ok(ack_response) } - async fn handle_protocol_message( + async fn handle_protocol_message( state: &CollabGroupState, - sink: &mut Sink, origin: &CollabOrigin, msg: Message, - ) -> Result>, RTProtocolError> - where - Sink: SubscriptionSink + 'static, - { + ) -> Result>, RTProtocolError> { match msg { Message::Sync(msg) => match msg { - SyncMessage::SyncStep1(sv) => Self::handle_sync_step1(state, sink, &sv).await, + SyncMessage::SyncStep1(sv) => Self::handle_sync_step1(state, &sv).await, SyncMessage::SyncStep2(update) => Self::handle_sync_step2(state, origin, update).await, SyncMessage::Update(update) => Self::handle_update(state, origin, update).await, }, @@ -509,11 +531,18 @@ impl CollabGroup { } } - async fn handle_sync_step1( + async fn handle_sync_step1( state: &CollabGroupState, - sink: &mut Sink, remote_sv: &StateVector, ) -> Result>, RTProtocolError> { + if let Ok(sv) = state.state_vector.try_read() { + // we optimistically try to obtain state vector lock for a fast track: + // if we remote sv is up-to-date with current one, we don't need to do anything + if remote_sv == &*sv { + return Ok(None); + } + } + let snapshot = state .persister .load() @@ -540,7 +569,24 @@ impl CollabGroup { ) -> Result>, RTProtocolError> { state.metrics.apply_update_size.observe(update.len() as f64); let start = tokio::time::Instant::now(); - state.persister.send_update(origin.clone(), update).await; + // we try to decode update to make sure it's not malformed and to extract state vector + let (update, decoded_update) = if update.len() <= collab_rt_protocol::LARGE_UPDATE_THRESHOLD { + let decoded_update = Update::decode_v1(&update)?; + (update, decoded_update) + } else { + tokio::task::spawn_blocking(move || { + let decoded_update = Update::decode_v1(&update)?; + Ok::<(Vec, yrs::Update), yrs::encoding::read::Error>((update, decoded_update)) + }) + .await + .map_err(|err| RTProtocolError::Internal(err.into()))?? + }; + let state_vector = decoded_update.state_vector(); + state + .persister + .send_update(origin.clone(), update, state_vector) + .await + .map_err(|err| RTProtocolError::Internal(err.into()))?; let elapsed = start.elapsed(); state .metrics @@ -562,12 +608,12 @@ impl CollabGroup { origin: &CollabOrigin, update: AwarenessUpdate, ) -> Result>, RTProtocolError> { - state.persister.send_awareness(origin, update).await; - - //let mut lock = collab.write().await; - //let collab = (*lock).borrow_mut(); - //collab.get_awareness().apply_update(update)?; - todo!() + state + .persister + .send_awareness(origin, update) + .await + .map_err(|err| RTProtocolError::Internal(err.into()))?; + Ok(None) } #[inline] @@ -644,24 +690,6 @@ impl CollabGroup { } } -/// Inner state of [CollabGroup] that's private and hidden behind Arc, so that it can be moved into -/// tasks. -struct CollabGroupState { - workspace_id: String, - object_id: String, - collab_type: CollabType, - /// A list of subscribers to this group. Each subscriber will receive updates from the - /// broadcast. - subscribers: DashMap, - persister: CollabPersister, - metrics: Arc, - /// Cancellation token triggered when current collab group is about to be stopped. - /// This will also shut down all subsequent [Subscription]s. - shutdown: CancellationToken, - last_activity: ArcSwap, - seq_no: AtomicU32, -} - struct CollabUpdateStreamingImpl { sender: mpsc::UnboundedSender>, } @@ -788,7 +816,7 @@ impl CollabPersister { } /// Drop temp collab i.e. because it was no longer up to date or was not accessed for too long. - fn reset(&self) { + fn clear_collab(&self) { self.temp_collab.store(None); // cleanup temp collab } @@ -796,12 +824,14 @@ impl CollabPersister { &self, sender: CollabOrigin, update: Vec, + state_vector: StateVector, ) -> Result { // send updates to redis queue let msg_id = self .update_sink .send(&CollabStreamUpdate::new( update, + state_vector, sender, UpdateFlags::default(), )) @@ -809,7 +839,11 @@ impl CollabPersister { Ok(msg_id) } - async fn send_awareness(&self, sender_session: &CollabOrigin, awareness_update: AwarenessUpdate) { + async fn send_awareness( + &self, + sender_session: &CollabOrigin, + awareness_update: AwarenessUpdate, + ) -> Result { // send awareness updates to redis queue: is it needed? What are we using awareness for here? todo!() } @@ -817,28 +851,49 @@ impl CollabPersister { async fn load(&self) -> Result, RealtimeError> { match self.temp_collab.load_full() { Some(collab) => Ok(collab), // return cached collab - None => self.force_load().await, + None => self.load_internal().await, } } - async fn force_load(&self) -> Result, RealtimeError> { + async fn load_internal(&self) -> Result, RealtimeError> { // 1. Try to load the latest snapshot from storage + let params = QueryCollabParams::new( + self.object_id.clone(), + self.collab_type.clone(), + self.workspace_id.clone(), + ); + let encoded_collab = self + .storage + .get_encode_collab(GetCollabOrigin::Server, params, false) + .await?; + let collab = Collab::new_with_source( + CollabOrigin::Server, + &self.object_id, + DataSource::DocStateV1(encoded_collab.doc_state.into()), + vec![], + false, + )?; // 2. consume all Redis updates on top of it (keep redis msg id) todo!() } async fn save(&self) -> Result<(), RealtimeError> { // 1. try to acquire lock - // 2. if successful -> self.load() - // 3. if collab has any changes (any redis updates were applied): - // 4. generate embeddings - // 5. store collab - // 6. prune any redis msg ids older than 5 min. since collab snapshot time - todo!() + if let Some(lease) = self + .collab_redis_stream + .lease(&self.workspace_id, &self.object_id) + { + // 3. if collab has any changes (any redis updates were applied): + // 4. generate embeddings + // 5. store collab + // 6. prune any redis msg ids older than 5 min. since collab snapshot time + todo!() + } + Ok(()) } } pub struct CollabSnapshot { pub collab: Collab, - pub last_msg_id: String, + pub last_msg_id: MessageId, } diff --git a/services/appflowy-collaborate/src/group/mod.rs b/services/appflowy-collaborate/src/group/mod.rs index 093aeb3e..0acedf57 100644 --- a/services/appflowy-collaborate/src/group/mod.rs +++ b/services/appflowy-collaborate/src/group/mod.rs @@ -1,6 +1,5 @@ pub(crate) mod cmd; pub(crate) mod group_init; -mod lease; pub(crate) mod manager; mod plugin; pub(crate) mod protocol;