chore: remove state vector from redis udpates

This commit is contained in:
Bartosz Sypytkowski 2024-12-17 04:29:41 +01:00
parent 73872a2b1d
commit d120860312
6 changed files with 31 additions and 40 deletions

View File

@ -1,6 +1,5 @@
use crate::error::StreamError;
use crate::model::{AwarenessStreamUpdate, CollabStreamUpdate, MessageId};
use collab::preclude::updates::encoder::Encode;
use redis::aio::ConnectionManager;
use redis::cmd;
use tokio::sync::Mutex;
@ -19,7 +18,6 @@ impl CollabUpdateSink {
}
pub async fn send(&self, msg: &CollabStreamUpdate) -> Result<MessageId, StreamError> {
let sv = msg.state_vector.encode_v1();
let mut lock = self.conn.lock().await;
let msg_id: MessageId = cmd("XADD")
.arg(&self.stream_key)
@ -28,8 +26,6 @@ impl CollabUpdateSink {
.arg(msg.flags)
.arg("sender")
.arg(msg.sender.to_string())
.arg("sv")
.arg(&sv)
.arg("data")
.arg(&*msg.data)
.query_async(&mut *lock)

View File

@ -2,7 +2,6 @@ use crate::error::{internal, StreamError};
use bytes::Bytes;
use collab::core::origin::{CollabClient, CollabOrigin};
use collab::preclude::updates::decoder::Decode;
use collab::preclude::StateVector;
use collab_entity::proto::collab::collab_update_event::Update;
use collab_entity::{proto, CollabType};
use prost::Message;
@ -369,13 +368,12 @@ impl TryFrom<CollabUpdateEvent> for StreamBinary {
pub struct CollabStreamUpdate {
pub data: Vec<u8>, // yrs::Update::encode_v1
pub state_vector: StateVector,
pub sender: CollabOrigin,
pub flags: UpdateFlags,
}
impl CollabStreamUpdate {
pub fn new<B, F>(data: B, state_vector: StateVector, sender: CollabOrigin, flags: F) -> Self
pub fn new<B, F>(data: B, sender: CollabOrigin, flags: F) -> Self
where
B: Into<Vec<u8>>,
F: Into<UpdateFlags>,
@ -383,7 +381,6 @@ impl CollabStreamUpdate {
CollabStreamUpdate {
data: data.into(),
sender,
state_vector,
flags: flags.into(),
}
}
@ -420,15 +417,6 @@ impl TryFrom<HashMap<String, redis::Value>> for CollabStreamUpdate {
collab_origin_from_str(&raw_origin)?
},
};
let state_vector = match fields.get("sv") {
Some(value) => {
let bytes = Bytes::from_redis_value(value)?;
let state_vector =
StateVector::decode_v1(&bytes).map_err(|err| internal(err.to_string()))?;
Ok(state_vector)
},
None => Err(internal("expecting field `sv`")),
}?;
let flags = match fields.get("flags") {
None => UpdateFlags::default(),
Some(flags) => u8::from_redis_value(flags).unwrap_or(0).into(),
@ -440,7 +428,6 @@ impl TryFrom<HashMap<String, redis::Value>> for CollabStreamUpdate {
Ok(CollabStreamUpdate {
data,
sender,
state_vector,
flags,
})
}

View File

@ -1,4 +1,6 @@
use bytes::Bytes;
use collab::entity::EncodedCollab;
use collab_entity::CollabType;
use futures_util::{stream, StreamExt};
use itertools::{Either, Itertools};
use sqlx::{PgPool, Transaction};
@ -186,6 +188,11 @@ impl CollabCache {
// when the data is written to the disk cache but fails to be written to the memory cache
// we log the error and continue.
self.cache_collab(object_id, collab_type, encode_collab_data);
Ok(())
}
fn cache_collab(&self, object_id: String, collab_type: CollabType, encode_collab_data: Bytes) {
let mem_cache = self.mem_cache.clone();
tokio::spawn(async move {
if let Err(err) = mem_cache
@ -203,20 +210,6 @@ impl CollabCache {
);
}
});
Ok(())
}
pub async fn get_encode_collab_from_disk(
&self,
workspace_id: &str,
query: QueryCollab,
) -> Result<EncodedCollab, AppError> {
let encode_collab = self
.disk_cache
.get_collab_encoded_from_disk(workspace_id, query)
.await?;
Ok(encode_collab)
}
pub async fn insert_encode_collab_to_disk(
@ -225,10 +218,12 @@ impl CollabCache {
uid: &i64,
params: CollabParams,
) -> Result<(), AppError> {
let p = params.clone();
self
.disk_cache
.upsert_collab(workspace_id, uid, params)
.await?;
self.cache_collab(p.object_id, p.collab_type, p.encoded_collab_v1);
Ok(())
}

View File

@ -357,7 +357,7 @@ impl CollabDiskCache {
while let Err(err) = s3.put_blob(&key, doc_state.clone().into(), None).await {
match err {
AppError::ServiceTemporaryUnavailable(err) if retries > 0 => {
tracing::info!(
tracing::debug!(
"S3 service is temporarily unavailable: {}. Remaining retries: {}",
err,
retries
@ -371,6 +371,7 @@ impl CollabDiskCache {
},
}
}
tracing::trace!("saved collab to S3: {}", key);
Ok(())
}

View File

@ -154,6 +154,7 @@ impl CollabMemCache {
timestamp: i64,
expiration_seconds: Option<u64>,
) -> redis::RedisResult<()> {
tracing::trace!("insert collab {} to memory cache", object_id);
self
.insert_data_with_timestamp(object_id, data, timestamp, expiration_seconds)
.await

View File

@ -203,9 +203,21 @@ impl CollabGroup {
async fn handle_inbound_update(state: &CollabGroupState, update: CollabStreamUpdate) {
// update state vector based on incoming message
let mut sv = state.state_vector.write().await;
sv.merge(update.state_vector);
drop(sv);
match Update::decode_v1(&update.data) {
Ok(update) => state
.state_vector
.write()
.await
.merge(update.state_vector()),
Err(err) => {
tracing::error!(
"received malformed update for collab `{}`: {}",
state.object_id,
err
);
return;
},
}
let seq_num = state.seq_no.fetch_add(1, Ordering::SeqCst) + 1;
tracing::trace!(
@ -740,10 +752,9 @@ impl CollabGroup {
tracing::debug!("subscriber {} send update with missing data", origin);
Ok(Some(msg.encode_v1()))
} else {
let upper_state_vector = decoded_update.state_vector();
state
.persister
.send_update(origin.clone(), update, upper_state_vector)
.send_update(origin.clone(), update)
.await
.map_err(|err| RTProtocolError::Internal(err.into()))?;
let elapsed = start.elapsed();
@ -940,11 +951,10 @@ impl CollabPersister {
&self,
sender: CollabOrigin,
update: Vec<u8>,
state_vector: StateVector,
) -> Result<MessageId, StreamError> {
let len = update.len();
// send updates to redis queue
let update = CollabStreamUpdate::new(update, state_vector, sender, UpdateFlags::default());
let update = CollabStreamUpdate::new(update, sender, UpdateFlags::default());
let msg_id = self.update_sink.send(&update).await?;
tracing::trace!(
"persisted update from {} ({} bytes) - msg id: {}",
@ -979,6 +989,7 @@ impl CollabPersister {
/// Loads collab without its history. Used for handling y-sync protocol messages.
async fn load_compact(&self) -> Result<CollabSnapshot, RealtimeError> {
tracing::trace!("requested to load compact collab {}", self.object_id);
// 1. Try to load the latest snapshot from storage
let start = Instant::now();
let mut collab = match self.load_collab_full(false).await? {