chore: draft of stateless collab group
This commit is contained in:
parent
860a921ea5
commit
3433cad5cf
|
|
@ -709,6 +709,7 @@ dependencies = [
|
|||
"anyhow",
|
||||
"app-error",
|
||||
"appflowy-ai-client",
|
||||
"arc-swap",
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
"authentication",
|
||||
|
|
|
|||
|
|
@ -282,6 +282,7 @@ sanitize-filename = "0.5.0"
|
|||
base64 = "0.22"
|
||||
md5 = "0.7.0"
|
||||
pin-project = "1.1.5"
|
||||
arc-swap = { version = "1.7" }
|
||||
|
||||
# collaboration
|
||||
yrs = { version = "0.21.3", features = ["sync"] }
|
||||
|
|
|
|||
|
|
@ -38,7 +38,7 @@ serde_json.workspace = true
|
|||
serde.workspace = true
|
||||
app-error = { workspace = true, features = ["tokio_error", "bincode_error"] }
|
||||
scraper = { version = "0.17.1", optional = true }
|
||||
arc-swap = "1.7"
|
||||
arc-swap.workspace = true
|
||||
|
||||
shared-entity = { workspace = true }
|
||||
collab-rt-entity = { workspace = true }
|
||||
|
|
|
|||
|
|
@ -61,6 +61,7 @@ thiserror = "1.0.56"
|
|||
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
|
||||
anyhow = "1"
|
||||
bytes.workspace = true
|
||||
arc-swap.workspace = true
|
||||
|
||||
collab = { workspace = true }
|
||||
collab-entity = { workspace = true }
|
||||
|
|
|
|||
|
|
@ -1,11 +1,14 @@
|
|||
use std::borrow::BorrowMut;
|
||||
use std::pin::Pin;
|
||||
use std::sync::{Arc, Weak};
|
||||
|
||||
use anyhow::anyhow;
|
||||
use arc_swap::{ArcSwap, ArcSwapOption};
|
||||
use bytes::Bytes;
|
||||
use collab::core::origin::CollabOrigin;
|
||||
use collab::lock::RwLock;
|
||||
use collab::preclude::Collab;
|
||||
use futures::Stream;
|
||||
use futures_util::{SinkExt, StreamExt};
|
||||
use tokio::select;
|
||||
use tokio::sync::broadcast::{channel, Sender};
|
||||
|
|
@ -30,8 +33,13 @@ use crate::group::group_init::EditState;
|
|||
use crate::group::protocol::ServerSyncProtocol;
|
||||
use crate::metrics::CollabRealtimeMetrics;
|
||||
|
||||
pub type DataStream =
|
||||
Pin<Box<dyn Stream<Item = Result<Vec<u8>, RealtimeError>> + Send + Sync + 'static>>;
|
||||
|
||||
pub trait CollabUpdateStreaming: 'static + Send + Sync {
|
||||
fn send_update(&self, update: Vec<u8>) -> Result<(), RealtimeError>;
|
||||
fn receive_updates(&self) -> DataStream;
|
||||
fn receive_awareness_updates(&self) -> DataStream;
|
||||
}
|
||||
/// A broadcast can be used to propagate updates produced by yrs [yrs::Doc] and [Awareness]
|
||||
/// to subscribes. One broadcast can be used to propagate updates for a single document with
|
||||
|
|
@ -40,19 +48,12 @@ pub trait CollabUpdateStreaming: 'static + Send + Sync {
|
|||
pub struct CollabBroadcast {
|
||||
object_id: String,
|
||||
broadcast_sender: Sender<CollabMessage>,
|
||||
awareness_sub: Option<YrsSubscription>,
|
||||
/// Keep the lifetime of the document observer subscription. The subscription will be stopped
|
||||
/// when the broadcast is dropped.
|
||||
doc_subscription: Option<YrsSubscription>,
|
||||
edit_state: Arc<EditState>,
|
||||
/// The last modified time of the document.
|
||||
pub modified_at: Arc<parking_lot::Mutex<Instant>>,
|
||||
modified_at: Arc<ArcSwap<Instant>>,
|
||||
update_streaming: Arc<dyn CollabUpdateStreaming>,
|
||||
}
|
||||
|
||||
unsafe impl Send for CollabBroadcast {}
|
||||
unsafe impl Sync for CollabBroadcast {}
|
||||
|
||||
impl Drop for CollabBroadcast {
|
||||
fn drop(&mut self) {
|
||||
trace!("Drop collab broadcast:{}", self.object_id);
|
||||
|
|
@ -70,27 +71,25 @@ impl CollabBroadcast {
|
|||
object_id: &str,
|
||||
buffer_capacity: usize,
|
||||
edit_state: Arc<EditState>,
|
||||
collab: &Collab,
|
||||
update_streaming: impl CollabUpdateStreaming,
|
||||
update_streaming: Arc<impl CollabUpdateStreaming>,
|
||||
) -> Self {
|
||||
let update_streaming = Arc::new(update_streaming);
|
||||
let update_stream = update_streaming.receive_updates();
|
||||
let awareness_update_stream = update_streaming.receive_awareness_updates();
|
||||
let object_id = object_id.to_owned();
|
||||
// broadcast channel
|
||||
let (sender, _) = channel(buffer_capacity);
|
||||
let mut this = CollabBroadcast {
|
||||
object_id,
|
||||
broadcast_sender: sender,
|
||||
awareness_sub: Default::default(),
|
||||
doc_subscription: Default::default(),
|
||||
edit_state,
|
||||
modified_at: Arc::new(parking_lot::Mutex::new(Instant::now())),
|
||||
modified_at: Arc::new(ArcSwap::new(Arc::new(Instant::now()))),
|
||||
update_streaming,
|
||||
};
|
||||
this.observe_collab_changes(collab);
|
||||
this.observe_collab_changes(update_stream, awareness_update_stream);
|
||||
this
|
||||
}
|
||||
|
||||
fn observe_collab_changes(&mut self, collab: &Collab) {
|
||||
fn observe_collab_changes(&mut self, mut updates: DataStream, mut awareness_updates: DataStream) {
|
||||
let (doc_sub, awareness_sub) = {
|
||||
// Observer the document's update and broadcast it to all subscribers.
|
||||
let cloned_oid = self.object_id.clone();
|
||||
|
|
@ -145,9 +144,10 @@ impl CollabBroadcast {
|
|||
});
|
||||
(doc_sub, awareness_sub)
|
||||
};
|
||||
}
|
||||
|
||||
self.doc_subscription = Some(doc_sub);
|
||||
self.awareness_sub = Some(awareness_sub);
|
||||
pub fn modified_at(&self) -> Instant {
|
||||
*self.modified_at.load_full()
|
||||
}
|
||||
|
||||
/// Subscribes a new connection to a broadcast group
|
||||
|
|
@ -184,7 +184,6 @@ impl CollabBroadcast {
|
|||
subscriber_origin: CollabOrigin,
|
||||
mut sink: Sink,
|
||||
mut stream: Stream,
|
||||
collab: Weak<RwLock<Collab>>,
|
||||
metrics_calculate: Arc<CollabRealtimeMetrics>,
|
||||
) -> Subscription
|
||||
where
|
||||
|
|
@ -252,16 +251,7 @@ impl CollabBroadcast {
|
|||
break
|
||||
}
|
||||
let message_map = result.unwrap();
|
||||
match collab.upgrade() {
|
||||
None => {
|
||||
trace!("{} stop receiving user:{} messages because of collab is drop", user.user_device(), object_id);
|
||||
// break the loop if the collab is dropped
|
||||
break
|
||||
},
|
||||
Some(collab) => {
|
||||
handle_client_messages(&object_id, message_map, &mut sink, collab, &metrics_calculate, &edit_state).await;
|
||||
}
|
||||
}
|
||||
handle_client_messages(&object_id, message_map, &mut sink, &metrics_calculate, &edit_state).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -281,7 +271,6 @@ async fn handle_client_messages<Sink>(
|
|||
object_id: &str,
|
||||
message_map: MessageByObjectId,
|
||||
sink: &mut Sink,
|
||||
collab: Arc<RwLock<dyn BorrowMut<Collab> + Send + Sync + 'static>>,
|
||||
metrics_calculate: &Arc<CollabRealtimeMetrics>,
|
||||
edit_state: &Arc<EditState>,
|
||||
) where
|
||||
|
|
@ -304,14 +293,8 @@ async fn handle_client_messages<Sink>(
|
|||
}
|
||||
|
||||
for collab_message in collab_messages {
|
||||
match handle_one_client_message(
|
||||
object_id,
|
||||
&collab_message,
|
||||
&collab,
|
||||
metrics_calculate,
|
||||
edit_state,
|
||||
)
|
||||
.await
|
||||
match handle_one_client_message(object_id, &collab_message, metrics_calculate, edit_state)
|
||||
.await
|
||||
{
|
||||
Ok(response) => {
|
||||
trace!("[realtime]: sending response: {}", response);
|
||||
|
|
@ -339,7 +322,6 @@ async fn handle_client_messages<Sink>(
|
|||
async fn handle_one_client_message(
|
||||
object_id: &str,
|
||||
collab_msg: &ClientCollabMessage,
|
||||
collab: &Arc<RwLock<dyn BorrowMut<Collab> + Send + Sync + 'static>>,
|
||||
metrics_calculate: &Arc<CollabRealtimeMetrics>,
|
||||
edit_state: &Arc<EditState>,
|
||||
) -> Result<CollabAck, RealtimeError> {
|
||||
|
|
@ -347,7 +329,7 @@ async fn handle_one_client_message(
|
|||
let message_origin = collab_msg.origin().clone();
|
||||
|
||||
// If the payload is empty, we don't need to apply any updates .
|
||||
// Currently, only the ping message should has an empty payload.
|
||||
// Currently, only the ping message should have an empty payload.
|
||||
if collab_msg.payload().is_empty() {
|
||||
if !matches!(collab_msg, ClientCollabMessage::ClientCollabStateCheck(_)) {
|
||||
error!("receive unexpected empty payload message:{}", collab_msg);
|
||||
|
|
@ -371,7 +353,6 @@ async fn handle_one_client_message(
|
|||
message_origin.clone(),
|
||||
msg_id,
|
||||
collab_msg.payload(),
|
||||
collab,
|
||||
metrics_calculate,
|
||||
edit_state,
|
||||
)
|
||||
|
|
@ -384,7 +365,6 @@ async fn handle_one_message_payload(
|
|||
message_origin: CollabOrigin,
|
||||
msg_id: MsgId,
|
||||
payload: &Bytes,
|
||||
collab: &Arc<RwLock<dyn BorrowMut<Collab> + Send + Sync + 'static>>,
|
||||
metrics_calculate: &Arc<CollabRealtimeMetrics>,
|
||||
edit_state: &Arc<EditState>,
|
||||
) -> Result<CollabAck, RealtimeError> {
|
||||
|
|
@ -395,7 +375,6 @@ async fn handle_one_message_payload(
|
|||
let result = handle_message(
|
||||
&payload,
|
||||
&message_origin,
|
||||
collab,
|
||||
metrics_calculate,
|
||||
object_id,
|
||||
msg_id,
|
||||
|
|
@ -418,7 +397,6 @@ async fn handle_one_message_payload(
|
|||
async fn handle_message(
|
||||
payload: &Bytes,
|
||||
message_origin: &CollabOrigin,
|
||||
collab: &Arc<RwLock<dyn BorrowMut<Collab> + Send + Sync + 'static>>,
|
||||
metrics_calculate: &Arc<CollabRealtimeMetrics>,
|
||||
object_id: &str,
|
||||
msg_id: MsgId,
|
||||
|
|
|
|||
|
|
@ -1,9 +1,4 @@
|
|||
use std::collections::VecDeque;
|
||||
use std::fmt::Display;
|
||||
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use arc_swap::{ArcSwap, ArcSwapOption};
|
||||
use collab::core::origin::CollabOrigin;
|
||||
use collab::entity::EncodedCollab;
|
||||
use collab::lock::RwLock;
|
||||
|
|
@ -11,7 +6,14 @@ use collab::preclude::Collab;
|
|||
use collab_entity::CollabType;
|
||||
use dashmap::DashMap;
|
||||
use futures_util::{SinkExt, StreamExt};
|
||||
use std::collections::VecDeque;
|
||||
use std::fmt::Display;
|
||||
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::time::{interval, MissedTickBehavior};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, event, info, trace};
|
||||
use yrs::updates::decoder::Decode;
|
||||
use yrs::updates::encoder::Encode;
|
||||
|
|
@ -27,30 +29,28 @@ use collab_stream::stream_group::StreamGroup;
|
|||
use database::collab::CollabStorage;
|
||||
|
||||
use crate::error::RealtimeError;
|
||||
use crate::group::broadcast::{CollabBroadcast, CollabUpdateStreaming, Subscription};
|
||||
use crate::group::persistence::GroupPersistence;
|
||||
use crate::group::broadcast::{CollabBroadcast, CollabUpdateStreaming, DataStream, Subscription};
|
||||
use crate::indexer::Indexer;
|
||||
use crate::metrics::CollabRealtimeMetrics;
|
||||
|
||||
/// A group used to manage a single [Collab] object
|
||||
pub struct CollabGroup {
|
||||
pub workspace_id: String,
|
||||
pub object_id: String,
|
||||
collab: Arc<RwLock<Collab>>,
|
||||
workspace_id: String,
|
||||
object_id: String,
|
||||
collab_type: CollabType,
|
||||
/// A broadcast used to propagate updates produced by yrs [yrs::Doc] and [Awareness]
|
||||
/// to subscribes.
|
||||
broadcast: CollabBroadcast,
|
||||
/// A list of subscribers to this group. Each subscriber will receive updates from the
|
||||
/// broadcast.
|
||||
subscribers: DashMap<RealtimeUser, Subscription>,
|
||||
subscribers: Arc<DashMap<RealtimeUser, Subscription>>,
|
||||
persister: Arc<CollabPersister>,
|
||||
metrics_calculate: Arc<CollabRealtimeMetrics>,
|
||||
destroy_group_tx: mpsc::Sender<Arc<RwLock<Collab>>>,
|
||||
/// Cancellation token triggered when current collab group is about to be stopped.
|
||||
/// This will also shut down all subsequent [Subscription]s.
|
||||
shutdown: CancellationToken,
|
||||
}
|
||||
|
||||
impl Drop for CollabGroup {
|
||||
fn drop(&mut self) {
|
||||
trace!("Drop collab group:{}", self.object_id);
|
||||
self.shutdown.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -61,65 +61,107 @@ impl CollabGroup {
|
|||
workspace_id: String,
|
||||
object_id: String,
|
||||
collab_type: CollabType,
|
||||
collab: Arc<RwLock<Collab>>,
|
||||
metrics_calculate: Arc<CollabRealtimeMetrics>,
|
||||
storage: Arc<S>,
|
||||
is_new_collab: bool,
|
||||
collab_redis_stream: Arc<CollabRedisStream>,
|
||||
persistence_interval: Duration,
|
||||
edit_state_max_count: u32,
|
||||
edit_state_max_secs: i64,
|
||||
indexer: Option<Arc<dyn Indexer>>,
|
||||
) -> Result<Self, StreamError>
|
||||
where
|
||||
S: CollabStorage,
|
||||
{
|
||||
let edit_state = Arc::new(EditState::new(
|
||||
edit_state_max_count,
|
||||
edit_state_max_secs,
|
||||
is_new_collab,
|
||||
let persister = Arc::new(CollabPersister::new(
|
||||
workspace_id.clone(),
|
||||
object_id.clone(),
|
||||
collab_type.clone(),
|
||||
storage,
|
||||
collab_redis_stream,
|
||||
indexer,
|
||||
));
|
||||
let broadcast = {
|
||||
let lock = collab.read().await;
|
||||
CollabBroadcast::new(
|
||||
&object_id,
|
||||
10,
|
||||
edit_state.clone(),
|
||||
&lock,
|
||||
CollabUpdateStreamingImpl::new(&workspace_id, &object_id, &collab_redis_stream).await?,
|
||||
)
|
||||
};
|
||||
let (destroy_group_tx, rx) = mpsc::channel(1);
|
||||
|
||||
tokio::spawn(
|
||||
GroupPersistence::new(
|
||||
workspace_id.clone(),
|
||||
object_id.clone(),
|
||||
uid,
|
||||
storage,
|
||||
edit_state.clone(),
|
||||
Arc::downgrade(&collab),
|
||||
collab_type.clone(),
|
||||
let shutdown = CancellationToken::new();
|
||||
let subscribers = Arc::new(DashMap::new());
|
||||
|
||||
// setup task used to receive messages from Redis
|
||||
{
|
||||
let oid = object_id.clone();
|
||||
let shutdown = shutdown.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(err) = Self::inbound_task(shutdown).await {
|
||||
tracing::warn!("`{}` failed to receive message: {}", oid, err);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// setup task used to send messages to Redis
|
||||
{
|
||||
let oid = object_id.clone();
|
||||
let shutdown = shutdown.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(err) = Self::outbound_task(shutdown).await {
|
||||
tracing::warn!("`{}` failed to send message: {}", oid, err);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// setup periodic snapshot
|
||||
{
|
||||
let shutdown = shutdown.clone();
|
||||
tokio::spawn(Self::snapshot_task(
|
||||
persistence_interval,
|
||||
indexer,
|
||||
)
|
||||
.run(rx),
|
||||
);
|
||||
persister.clone(),
|
||||
shutdown,
|
||||
));
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
workspace_id,
|
||||
object_id,
|
||||
collab_type,
|
||||
collab,
|
||||
broadcast,
|
||||
subscribers: Default::default(),
|
||||
subscribers,
|
||||
metrics_calculate,
|
||||
destroy_group_tx,
|
||||
shutdown,
|
||||
persister,
|
||||
})
|
||||
}
|
||||
|
||||
/// Task used to receive messages from Redis.
|
||||
async fn inbound_task(shutdown: CancellationToken) -> Result<(), RealtimeError> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
/// Task used to send messages to Redis.
|
||||
async fn outbound_task(shutdown: CancellationToken) -> Result<(), RealtimeError> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn snapshot_task(
|
||||
interval: Duration,
|
||||
persister: Arc<CollabPersister>,
|
||||
shutdown: CancellationToken,
|
||||
) {
|
||||
let mut snapshot_tick = tokio::time::interval(interval);
|
||||
snapshot_tick.set_missed_tick_behavior(MissedTickBehavior::Skip);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = snapshot_tick.tick() => {
|
||||
if let Err(err) = persister.save().await {
|
||||
tracing::warn!("failed to persist document `{}`: {}", persister.object_id, err);
|
||||
}
|
||||
},
|
||||
_ = shutdown.cancelled() => {
|
||||
if let Err(err) = persister.save().await {
|
||||
tracing::warn!("failed to persist document `{}`: {}", persister.object_id, err);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn encode_collab(&self) -> Result<EncodedCollab, RealtimeError> {
|
||||
let lock = self.collab.read().await;
|
||||
let lock = self.load().await?;
|
||||
let encode_collab = lock.encode_collab_v1(|collab| {
|
||||
self
|
||||
.collab_type
|
||||
|
|
@ -158,14 +200,10 @@ impl CollabGroup {
|
|||
<Sink as futures_util::Sink<CollabMessage>>::Error: std::error::Error + Send + Sync,
|
||||
{
|
||||
// create new subscription for new subscriber
|
||||
let sub = self.broadcast.subscribe(
|
||||
user,
|
||||
subscriber_origin,
|
||||
sink,
|
||||
stream,
|
||||
Arc::downgrade(&self.collab),
|
||||
self.metrics_calculate.clone(),
|
||||
);
|
||||
let metrics = self.metrics_calculate.clone();
|
||||
let sub = self
|
||||
.broadcast
|
||||
.subscribe(user, subscriber_origin, sink, stream, metrics);
|
||||
|
||||
if let Some(mut old) = self.subscribers.insert((*user).clone(), sub) {
|
||||
tracing::warn!("{}: remove old subscriber: {}", &self.object_id, user);
|
||||
|
|
@ -173,8 +211,7 @@ impl CollabGroup {
|
|||
}
|
||||
|
||||
if cfg!(debug_assertions) {
|
||||
event!(
|
||||
tracing::Level::TRACE,
|
||||
trace!(
|
||||
"{}: add new subscriber, current group member: {}",
|
||||
&self.object_id,
|
||||
self.user_count(),
|
||||
|
|
@ -193,7 +230,7 @@ impl CollabGroup {
|
|||
/// Check if the group is active. A group is considered active if it has at least one
|
||||
/// subscriber
|
||||
pub async fn is_inactive(&self) -> bool {
|
||||
let modified_at = self.broadcast.modified_at.lock();
|
||||
let modified_at = self.modified_at();
|
||||
|
||||
// In debug mode, we set the timeout to 60 seconds
|
||||
if cfg!(debug_assertions) {
|
||||
|
|
@ -235,7 +272,7 @@ impl CollabGroup {
|
|||
for mut entry in self.subscribers.iter_mut() {
|
||||
entry.value_mut().stop().await;
|
||||
}
|
||||
let _ = self.destroy_group_tx.send(self.collab.clone()).await;
|
||||
self.shutdown.cancel();
|
||||
}
|
||||
|
||||
/// Returns the timeout duration in seconds for different collaboration types.
|
||||
|
|
@ -260,131 +297,8 @@ impl CollabGroup {
|
|||
}
|
||||
}
|
||||
|
||||
pub(crate) struct EditState {
|
||||
/// Clients rely on `edit_count` to verify message ordering. A non-continuous sequence suggests
|
||||
/// missing updates, prompting the client to request an initial synchronization.
|
||||
/// Continuous sequence numbers ensure the client receives and displays updates in the correct order.
|
||||
///
|
||||
edit_counter: AtomicU32,
|
||||
prev_edit_count: AtomicU32,
|
||||
prev_flush_timestamp: AtomicI64,
|
||||
|
||||
max_edit_count: u32,
|
||||
max_secs: i64,
|
||||
/// Indicate the collab object is just created in the client and not exist in server database.
|
||||
is_new: AtomicBool,
|
||||
/// Indicate the collab is ready to save to disk.
|
||||
/// If is_ready_to_save is true, which means the collab contains the requirement data and ready to save to disk.
|
||||
is_ready_to_save: AtomicBool,
|
||||
}
|
||||
|
||||
impl Display for EditState {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"EditState {{ edit_counter: {}, prev_edit_count: {}, max_edit_count: {}, max_secs: {}, is_new: {}, is_ready_to_save: {}",
|
||||
self.edit_counter.load(Ordering::SeqCst),
|
||||
self.prev_edit_count.load(Ordering::SeqCst),
|
||||
self.max_edit_count,
|
||||
self.max_secs,
|
||||
self.is_new.load(Ordering::SeqCst),
|
||||
self.is_ready_to_save.load(Ordering::SeqCst),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl EditState {
|
||||
fn new(max_edit_count: u32, max_secs: i64, is_new: bool) -> Self {
|
||||
Self {
|
||||
edit_counter: AtomicU32::new(0),
|
||||
prev_edit_count: Default::default(),
|
||||
prev_flush_timestamp: AtomicI64::new(chrono::Utc::now().timestamp()),
|
||||
max_edit_count,
|
||||
max_secs,
|
||||
is_new: AtomicBool::new(is_new),
|
||||
is_ready_to_save: AtomicBool::new(false),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn edit_count(&self) -> u32 {
|
||||
self.edit_counter.load(Ordering::SeqCst)
|
||||
}
|
||||
|
||||
/// Increments the edit count and returns the old value
|
||||
pub(crate) fn increment_edit_count(&self) -> u32 {
|
||||
self
|
||||
.edit_counter
|
||||
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |current| {
|
||||
Some(current + 1)
|
||||
})
|
||||
// safety: unwrap when returning the new value
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
pub(crate) fn tick(&self) {
|
||||
self
|
||||
.prev_edit_count
|
||||
.store(self.edit_counter.load(Ordering::SeqCst), Ordering::SeqCst);
|
||||
self
|
||||
.prev_flush_timestamp
|
||||
.store(chrono::Utc::now().timestamp(), Ordering::SeqCst);
|
||||
}
|
||||
|
||||
pub(crate) fn is_edit(&self) -> bool {
|
||||
self.edit_counter.load(Ordering::SeqCst) != self.prev_edit_count.load(Ordering::SeqCst)
|
||||
}
|
||||
|
||||
pub(crate) fn is_new(&self) -> bool {
|
||||
self.is_new.load(Ordering::SeqCst)
|
||||
}
|
||||
|
||||
pub(crate) fn set_is_new(&self, is_new: bool) {
|
||||
self.is_new.store(is_new, Ordering::SeqCst);
|
||||
}
|
||||
|
||||
pub(crate) fn set_ready_to_save(&self) {
|
||||
self.is_ready_to_save.store(true, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub(crate) fn should_save_to_disk(&self) -> bool {
|
||||
if !self.is_ready_to_save.load(Ordering::Relaxed) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if self.is_new.load(Ordering::Relaxed) {
|
||||
return true;
|
||||
}
|
||||
|
||||
let current_edit_count = self.edit_counter.load(Ordering::SeqCst);
|
||||
let prev_edit_count = self.prev_edit_count.load(Ordering::SeqCst);
|
||||
|
||||
// If the collab is new, save it to disk and reset the flag
|
||||
if self.is_new.load(Ordering::SeqCst) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if current_edit_count == prev_edit_count {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Check if the edit count exceeds the maximum allowed since the last save
|
||||
let edit_count_exceeded = (current_edit_count > prev_edit_count)
|
||||
&& ((current_edit_count - prev_edit_count) >= self.max_edit_count);
|
||||
|
||||
// Calculate the time since the last flush and check if it exceeds the maximum allowed
|
||||
let now = chrono::Utc::now().timestamp();
|
||||
let prev_flush_timestamp = self.prev_flush_timestamp.load(Ordering::SeqCst);
|
||||
let time_exceeded =
|
||||
(now > prev_flush_timestamp) && (now - prev_flush_timestamp >= self.max_secs);
|
||||
|
||||
// Determine if we should save based on either condition being met
|
||||
edit_count_exceeded || (current_edit_count != prev_edit_count && time_exceeded)
|
||||
}
|
||||
}
|
||||
|
||||
struct CollabUpdateStreamingImpl {
|
||||
sender: mpsc::UnboundedSender<Vec<u8>>,
|
||||
stopped: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl CollabUpdateStreamingImpl {
|
||||
|
|
@ -396,16 +310,13 @@ impl CollabUpdateStreamingImpl {
|
|||
let stream = collab_redis_stream
|
||||
.collab_update_stream(workspace_id, object_id, "collaborate_update_producer")
|
||||
.await?;
|
||||
let stopped = Arc::new(AtomicBool::new(false));
|
||||
let (sender, receiver) = mpsc::unbounded_channel();
|
||||
let cloned_stopped = stopped.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(err) = Self::consume_messages(receiver, stream).await {
|
||||
error!("Failed to consume incoming updates: {}", err);
|
||||
}
|
||||
cloned_stopped.store(true, Ordering::SeqCst);
|
||||
});
|
||||
Ok(Self { sender, stopped })
|
||||
Ok(Self { sender })
|
||||
}
|
||||
|
||||
async fn consume_messages(
|
||||
|
|
@ -438,42 +349,104 @@ impl CollabUpdateStreamingImpl {
|
|||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn is_stopped(&self) -> bool {
|
||||
self.stopped.load(Ordering::SeqCst)
|
||||
}
|
||||
}
|
||||
|
||||
impl CollabUpdateStreaming for CollabUpdateStreamingImpl {
|
||||
fn send_update(&self, update: Vec<u8>) -> Result<(), RealtimeError> {
|
||||
if self.is_stopped() {
|
||||
if let Err(_) = self.sender.send(update) {
|
||||
Err(RealtimeError::Internal(anyhow::anyhow!(
|
||||
"stream stopped processing incoming updates"
|
||||
)))
|
||||
} else if let Err(err) = self.sender.send(update) {
|
||||
Err(RealtimeError::Internal(err.into()))
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::group::group_init::EditState;
|
||||
fn receive_updates(&self) -> DataStream {
|
||||
todo!()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn edit_state_test() {
|
||||
let edit_state = EditState::new(10, 10, false);
|
||||
edit_state.set_ready_to_save();
|
||||
edit_state.increment_edit_count();
|
||||
|
||||
for _ in 0..10 {
|
||||
edit_state.increment_edit_count();
|
||||
}
|
||||
assert!(edit_state.should_save_to_disk());
|
||||
assert!(edit_state.should_save_to_disk());
|
||||
edit_state.tick();
|
||||
assert!(!edit_state.should_save_to_disk());
|
||||
fn receive_awareness_updates(&self) -> DataStream {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
struct CollabPersister {
|
||||
workspace_id: String,
|
||||
object_id: String,
|
||||
collab_type: CollabType,
|
||||
storage: Arc<dyn CollabStorage>,
|
||||
collab_redis_stream: Arc<CollabRedisStream>,
|
||||
indexer: Option<Arc<dyn Indexer>>,
|
||||
/// Collab stored temporarily.
|
||||
temp_collab: ArcSwapOption<CollabSnapshot>,
|
||||
}
|
||||
|
||||
impl CollabPersister {
|
||||
pub fn new(
|
||||
workspace_id: String,
|
||||
object_id: String,
|
||||
collab_type: CollabType,
|
||||
storage: Arc<dyn CollabStorage>,
|
||||
collab_redis_stream: Arc<CollabRedisStream>,
|
||||
indexer: Option<Arc<dyn Indexer>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
workspace_id,
|
||||
object_id,
|
||||
collab_type,
|
||||
storage,
|
||||
collab_redis_stream,
|
||||
indexer,
|
||||
temp_collab: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Drop temp collab i.e. because it was no longer up to date or was not accessed for too long.
|
||||
fn reset(&self) {
|
||||
self.temp_collab.store(None); // cleanup temp collab
|
||||
}
|
||||
|
||||
async fn send_update(&self, update: Vec<u8>) {
|
||||
// send updates to redis queue
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn send_awareness(&self, awareness_update: Vec<u8>) {
|
||||
// send awareness updates to redis queue: is it needed? What are we using awareness for here?
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn load(&self) -> Result<Arc<CollabSnapshot>, RealtimeError> {
|
||||
match self.temp_collab.load_full() {
|
||||
Some(collab) => Ok(collab), // return cached collab
|
||||
None => self.force_load().await,
|
||||
}
|
||||
}
|
||||
|
||||
async fn force_load(&self) -> Result<Arc<CollabSnapshot>, RealtimeError> {
|
||||
// 1. Try to load the latest snapshot from storage
|
||||
// 2. consume all Redis updates on top of it (keep redis msg id)
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn receive_updates(&self) -> DataStream {
|
||||
// 1. loop with yield on incoming Redis stream updates
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn save(&self) -> Result<(), RealtimeError> {
|
||||
// 1. try to acquire lock
|
||||
// 2. if successful -> self.load()
|
||||
// 3. if collab has any changes (any redis updates were applied):
|
||||
// 4. generate embeddings
|
||||
// 5. store collab
|
||||
// 6. prune any redis msg ids older than 5 min. since collab snapshot time
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct CollabSnapshot {
|
||||
pub collab: Collab,
|
||||
pub last_msg_id: String,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -229,14 +229,11 @@ where
|
|||
workspace_id.to_string(),
|
||||
object_id.to_string(),
|
||||
collab_type,
|
||||
collab,
|
||||
self.metrics_calculate.clone(),
|
||||
self.storage.clone(),
|
||||
is_new_collab,
|
||||
self.collab_redis_stream.clone(),
|
||||
self.persistence_interval,
|
||||
self.edit_state_max_count,
|
||||
self.edit_state_max_secs,
|
||||
indexer,
|
||||
)
|
||||
.await?,
|
||||
|
|
|
|||
|
|
@ -2,7 +2,6 @@ pub(crate) mod broadcast;
|
|||
pub(crate) mod cmd;
|
||||
pub(crate) mod group_init;
|
||||
pub(crate) mod manager;
|
||||
mod persistence;
|
||||
mod plugin;
|
||||
pub(crate) mod protocol;
|
||||
mod state;
|
||||
|
|
|
|||
Loading…
Reference in New Issue