chore: remove server sync protocol (not used anymore)
This commit is contained in:
parent
00f5a5bb7e
commit
a1546909c3
|
|
@ -19,11 +19,7 @@ use collab_rt_protocol::{Message, MessageReader, RTProtocolError, SyncMessage};
|
||||||
use collab_stream::client::CollabRedisStream;
|
use collab_stream::client::CollabRedisStream;
|
||||||
use collab_stream::collab_update_sink::{AwarenessUpdateSink, CollabUpdateSink};
|
use collab_stream::collab_update_sink::{AwarenessUpdateSink, CollabUpdateSink};
|
||||||
use collab_stream::error::StreamError;
|
use collab_stream::error::StreamError;
|
||||||
use collab_stream::model::{
|
use collab_stream::model::{AwarenessStreamUpdate, CollabStreamUpdate, MessageId, UpdateFlags};
|
||||||
AwarenessStreamUpdate, CollabStreamUpdate, CollabUpdateEvent, MessageId, StreamBinary,
|
|
||||||
UpdateFlags,
|
|
||||||
};
|
|
||||||
use collab_stream::stream_group::StreamGroup;
|
|
||||||
use dashmap::DashMap;
|
use dashmap::DashMap;
|
||||||
use database::collab::{CollabStorage, GetCollabOrigin};
|
use database::collab::{CollabStorage, GetCollabOrigin};
|
||||||
use database_entity::dto::{
|
use database_entity::dto::{
|
||||||
|
|
@ -31,11 +27,9 @@ use database_entity::dto::{
|
||||||
};
|
};
|
||||||
use futures::{pin_mut, Sink, Stream};
|
use futures::{pin_mut, Sink, Stream};
|
||||||
use futures_util::{SinkExt, StreamExt};
|
use futures_util::{SinkExt, StreamExt};
|
||||||
use std::collections::VecDeque;
|
|
||||||
use std::sync::atomic::{AtomicU32, Ordering};
|
use std::sync::atomic::{AtomicU32, Ordering};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
use tokio::sync::mpsc;
|
|
||||||
use tokio::time::MissedTickBehavior;
|
use tokio::time::MissedTickBehavior;
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
use tracing::{error, info, trace};
|
use tracing::{error, info, trace};
|
||||||
|
|
@ -162,6 +156,7 @@ impl CollabGroup {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
|
#[allow(dead_code)]
|
||||||
pub fn object_id(&self) -> &str {
|
pub fn object_id(&self) -> &str {
|
||||||
&self.state.object_id
|
&self.state.object_id
|
||||||
}
|
}
|
||||||
|
|
@ -754,60 +749,6 @@ impl CollabGroup {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct CollabUpdateStreamingImpl {
|
|
||||||
sender: mpsc::UnboundedSender<Vec<u8>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl CollabUpdateStreamingImpl {
|
|
||||||
async fn new(
|
|
||||||
workspace_id: &str,
|
|
||||||
object_id: &str,
|
|
||||||
collab_redis_stream: &CollabRedisStream,
|
|
||||||
) -> Result<Self, StreamError> {
|
|
||||||
let stream = collab_redis_stream
|
|
||||||
.collab_update_stream_group(workspace_id, object_id, "collaborate_update_producer")
|
|
||||||
.await?;
|
|
||||||
let (sender, receiver) = mpsc::unbounded_channel();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
if let Err(err) = Self::consume_messages(receiver, stream).await {
|
|
||||||
error!("Failed to consume incoming updates: {}", err);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
Ok(Self { sender })
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn consume_messages(
|
|
||||||
mut receiver: mpsc::UnboundedReceiver<Vec<u8>>,
|
|
||||||
mut stream: StreamGroup,
|
|
||||||
) -> Result<(), RealtimeError> {
|
|
||||||
while let Some(update) = receiver.recv().await {
|
|
||||||
let mut update_count = 1;
|
|
||||||
let update = {
|
|
||||||
let mut updates = VecDeque::new();
|
|
||||||
// there may be already more messages inside waiting, try to read them all right away
|
|
||||||
while let Ok(update) = receiver.try_recv() {
|
|
||||||
updates.push_back(Update::decode_v1(&update)?);
|
|
||||||
}
|
|
||||||
if updates.is_empty() {
|
|
||||||
update // no following messages
|
|
||||||
} else {
|
|
||||||
update_count += updates.len();
|
|
||||||
// prepend first update and merge them all together
|
|
||||||
updates.push_front(Update::decode_v1(&update)?);
|
|
||||||
Update::merge_updates(updates).encode_v1()
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let msg = StreamBinary::try_from(CollabUpdateEvent::UpdateV1 {
|
|
||||||
encode_update: update,
|
|
||||||
})?;
|
|
||||||
stream.insert_messages(vec![msg]).await?;
|
|
||||||
trace!("Sent cumulative ({}) collab update to redis", update_count);
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub trait SubscriptionSink:
|
pub trait SubscriptionSink:
|
||||||
Sink<CollabMessage, Error = RealtimeError> + Send + Sync + Unpin
|
Sink<CollabMessage, Error = RealtimeError> + Send + Sync + Unpin
|
||||||
{
|
{
|
||||||
|
|
|
||||||
|
|
@ -2,5 +2,4 @@ pub(crate) mod cmd;
|
||||||
pub(crate) mod group_init;
|
pub(crate) mod group_init;
|
||||||
pub(crate) mod manager;
|
pub(crate) mod manager;
|
||||||
mod plugin;
|
mod plugin;
|
||||||
pub(crate) mod protocol;
|
|
||||||
mod state;
|
mod state;
|
||||||
|
|
|
||||||
|
|
@ -1,127 +0,0 @@
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use async_trait::async_trait;
|
|
||||||
use collab::core::collab::{TransactionExt, TransactionMutExt};
|
|
||||||
use collab::core::origin::CollabOrigin;
|
|
||||||
use tokio::time::Instant;
|
|
||||||
use yrs::updates::encoder::{Encode, Encoder, EncoderV1};
|
|
||||||
use yrs::{ReadTxn, StateVector, Transact};
|
|
||||||
|
|
||||||
use collab_rt_protocol::CollabSyncProtocol;
|
|
||||||
use collab_rt_protocol::{
|
|
||||||
decode_update, CollabRef, CustomMessage, Message, RTProtocolError, SyncMessage,
|
|
||||||
};
|
|
||||||
|
|
||||||
use crate::CollabRealtimeMetrics;
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct ServerSyncProtocol {
|
|
||||||
metrics: Arc<CollabRealtimeMetrics>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ServerSyncProtocol {
|
|
||||||
pub fn new(metrics: Arc<CollabRealtimeMetrics>) -> Self {
|
|
||||||
Self { metrics }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl CollabSyncProtocol for ServerSyncProtocol {
|
|
||||||
async fn handle_sync_step1(
|
|
||||||
&self,
|
|
||||||
collab: &CollabRef,
|
|
||||||
sv: StateVector,
|
|
||||||
) -> Result<Option<Vec<u8>>, RTProtocolError> {
|
|
||||||
let (doc_state, state_vector) = {
|
|
||||||
let lock = collab.read().await;
|
|
||||||
let collab = (*lock).borrow();
|
|
||||||
|
|
||||||
let txn = collab.get_awareness().doc().try_transact().map_err(|err| {
|
|
||||||
RTProtocolError::YrsTransaction(format!("fail to handle sync step1. error: {}", err))
|
|
||||||
})?;
|
|
||||||
|
|
||||||
let doc_state = txn.try_encode_state_as_update_v1(&sv).map_err(|err| {
|
|
||||||
RTProtocolError::YrsEncodeState(format!(
|
|
||||||
"fail to encode state as update. error: {}\ninit state vector: {:?}\ndocument state: {:#?}",
|
|
||||||
err,
|
|
||||||
sv,
|
|
||||||
txn.store()
|
|
||||||
))
|
|
||||||
})?;
|
|
||||||
(doc_state, txn.state_vector())
|
|
||||||
};
|
|
||||||
|
|
||||||
// Retrieve the latest document state from the client after they return online from offline editing.
|
|
||||||
let mut encoder = EncoderV1::new();
|
|
||||||
Message::Sync(SyncMessage::SyncStep2(doc_state)).encode(&mut encoder);
|
|
||||||
|
|
||||||
//FIXME: this should never happen as response to sync step 1 from the client, but rather be
|
|
||||||
// send when a connection is established
|
|
||||||
Message::Sync(SyncMessage::SyncStep1(state_vector)).encode(&mut encoder);
|
|
||||||
Ok(Some(encoder.to_vec()))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_sync_step2(
|
|
||||||
&self,
|
|
||||||
origin: &CollabOrigin,
|
|
||||||
collab: &CollabRef,
|
|
||||||
update: Vec<u8>,
|
|
||||||
) -> Result<Option<Vec<u8>>, RTProtocolError> {
|
|
||||||
self.metrics.apply_update_size.observe(update.len() as f64);
|
|
||||||
let start = Instant::now();
|
|
||||||
let result = {
|
|
||||||
let update = decode_update(update).await?;
|
|
||||||
let mut lock = collab.write().await;
|
|
||||||
let collab = (*lock).borrow_mut();
|
|
||||||
|
|
||||||
let mut txn = collab
|
|
||||||
.get_awareness()
|
|
||||||
.doc()
|
|
||||||
.try_transact_mut_with(origin.clone())
|
|
||||||
.map_err(|err| {
|
|
||||||
RTProtocolError::YrsTransaction(format!("sync step2 transaction acquire: {}", err))
|
|
||||||
})?;
|
|
||||||
txn.try_apply_update(update).map_err(|err| {
|
|
||||||
RTProtocolError::YrsApplyUpdate(format!(
|
|
||||||
"sync step2 apply update: {}\ndocument state: {:#?}",
|
|
||||||
err,
|
|
||||||
txn.store()
|
|
||||||
))
|
|
||||||
})?;
|
|
||||||
|
|
||||||
// If server can't apply updates sent by client, which means the server is missing some updates
|
|
||||||
// from the client or the client is missing some updates from the server.
|
|
||||||
// If the client can't apply broadcast from server, which means the client is missing some
|
|
||||||
// updates.
|
|
||||||
match txn.store().pending_update() {
|
|
||||||
Some(_update) => {
|
|
||||||
// let state_vector_v1 = update.missing.encode_v1();
|
|
||||||
// for the moment, we don't need to send missing updates to the client. passing None
|
|
||||||
// instead, which will trigger a sync step 0 on client
|
|
||||||
let state_vector_v1 = txn.state_vector().encode_v1();
|
|
||||||
Err(RTProtocolError::MissUpdates {
|
|
||||||
state_vector_v1: Some(state_vector_v1),
|
|
||||||
reason: "server miss updates".to_string(),
|
|
||||||
})
|
|
||||||
},
|
|
||||||
None => Ok(None),
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let elapsed = start.elapsed();
|
|
||||||
self
|
|
||||||
.metrics
|
|
||||||
.apply_update_time
|
|
||||||
.observe(elapsed.as_millis() as f64);
|
|
||||||
|
|
||||||
result
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_custom_message(
|
|
||||||
&self,
|
|
||||||
_collab: &CollabRef,
|
|
||||||
_msg: CustomMessage,
|
|
||||||
) -> Result<Option<Vec<u8>>, RTProtocolError> {
|
|
||||||
Ok(None)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Loading…
Reference in New Issue