diff --git a/libs/collab-rt/src/data_validation.rs b/libs/collab-rt/src/data_validation.rs index bbcc390e..219816e0 100644 --- a/libs/collab-rt/src/data_validation.rs +++ b/libs/collab-rt/src/data_validation.rs @@ -8,24 +8,31 @@ use collab_entity::CollabType; use tracing::instrument; #[instrument(level = "trace", skip(data), fields(len = %data.len()))] -pub fn validate_encode_collab( +pub async fn validate_encode_collab( object_id: &str, data: &[u8], collab_type: &CollabType, ) -> Result<(), RealtimeError> { - let encoded_collab = - EncodedCollab::decode_from_bytes(data).map_err(|err| RealtimeError::Internal(err.into()))?; - let collab = Collab::new_with_doc_state( - CollabOrigin::Empty, - object_id, - DocStateSource::FromDocState(encoded_collab.doc_state.to_vec()), - vec![], - false, - ) - .map_err(|err| RealtimeError::Internal(err.into()))?; + let collab_type = collab_type.clone(); + let object_id = object_id.to_string(); + let data = data.to_vec(); - collab_type - .validate(&collab) - .map_err(|err| RealtimeError::NoRequiredCollabData(err.to_string()))?; - Ok(()) + tokio::task::spawn_blocking(move || { + let encoded_collab = + EncodedCollab::decode_from_bytes(&data).map_err(|err| RealtimeError::Internal(err.into()))?; + let collab = Collab::new_with_doc_state( + CollabOrigin::Empty, + &object_id, + DocStateSource::FromDocState(encoded_collab.doc_state.to_vec()), + vec![], + false, + ) + .map_err(|err| RealtimeError::Internal(err.into()))?; + + collab_type + .validate(&collab) + .map_err(|err| RealtimeError::NoRequiredCollabData(err.to_string()))?; + Ok::<(), RealtimeError>(()) + }) + .await? } diff --git a/libs/collab-rt/src/rt_server.rs b/libs/collab-rt/src/rt_server.rs index d8bd271b..766bf980 100644 --- a/libs/collab-rt/src/rt_server.rs +++ b/libs/collab-rt/src/rt_server.rs @@ -22,7 +22,7 @@ use tokio::runtime; use tokio::runtime::Runtime; use tokio::sync::Notify; use tokio::time::interval; -use tracing::{error, trace}; +use tracing::{error, info, trace}; lazy_static! { pub(crate) static ref COLLAB_RUNTIME: Runtime = default_tokio_runtime().unwrap(); @@ -51,6 +51,10 @@ where metrics: Arc, command_recv: RTCommandReceiver, ) -> Result { + if cfg!(feature = "multi-thread") { + info!("CollaborationServer with multi-thread feature enabled"); + } + let metrics_calculate = CollabMetricsCalculate::default(); let connect_state = ConnectState::new(); let access_control = Arc::new(access_control); diff --git a/src/biz/collab/storage.rs b/src/biz/collab/storage.rs index 12c80892..bce20b19 100644 --- a/src/biz/collab/storage.rs +++ b/src/biz/collab/storage.rs @@ -184,7 +184,9 @@ where ¶ms.object_id, ¶ms.encoded_collab_v1, ¶ms.collab_type, - ) { + ) + .await + { return Err(AppError::NoRequiredData(format!( "collab doc state is not correct:{},{}", params.object_id, err diff --git a/src/biz/snapshot/queue.rs b/src/biz/snapshot/queue.rs index 59c50e50..64dac539 100644 --- a/src/biz/snapshot/queue.rs +++ b/src/biz/snapshot/queue.rs @@ -53,6 +53,7 @@ impl DerefMut for PendingQueue { } } +#[derive(Debug, Clone)] pub(crate) struct PendingItem { pub(crate) workspace_id: String, pub(crate) object_id: String, diff --git a/src/biz/snapshot/snapshot_control.rs b/src/biz/snapshot/snapshot_control.rs index 65a58853..1599b728 100644 --- a/src/biz/snapshot/snapshot_control.rs +++ b/src/biz/snapshot/snapshot_control.rs @@ -14,6 +14,7 @@ use database_entity::dto::{AFSnapshotMeta, AFSnapshotMetas, InsertSnapshotParams use futures_util::StreamExt; use chrono::{DateTime, Utc}; + use sqlx::{Acquire, PgPool}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; @@ -249,50 +250,49 @@ impl SnapshotCommandRunner { } async fn process_next_batch(&self) -> Result<(), AppError> { - let next_item = match self.queue.write().await.pop() { + let mut queue = self.queue.write().await; + + let next_item = match queue.pop() { Some(item) => item, - None => return Ok(()), // No items to process + None => return Ok(()), }; - let key = SnapshotKey::from_object_id(&next_item.object_id); self.total_attempts.fetch_add(1, Ordering::Relaxed); + let key = SnapshotKey::from_object_id(&next_item.object_id); + + // Attempt to fetch the collab data from the cache let encoded_collab_v1 = match self.cache.try_get(&key.0).await { - Ok(Some(data)) => { - // This step is not necessary, but use it to check if the data is valid. Will be removed - // in the future. - match validate_encode_collab(&next_item.object_id, &data, &next_item.collab_type) { - Ok(_) => data, - Err(err) => { - warn!( - "Collab doc state is not correct when creating snapshot: {},{}", - next_item.object_id, err - ); - return Ok(()); - }, - } - }, - Ok(None) => { - warn!("Failed to get snapshot from cache: {}", key.0); - return Ok(()); - }, + Ok(Some(data)) => data, + Ok(None) => return Ok(()), // Cache miss, no data to process Err(_) => { - if cfg!(debug_assertions) { - error!("Failed to get snapshot from cache: {}", key.0); - } - self.queue.write().await.push_item(next_item); + queue.push_item(next_item); // Push back to queue on error return Ok(()); }, }; + // Validate collab data before processing + let result = validate_encode_collab( + &next_item.object_id, + &encoded_collab_v1, + &next_item.collab_type, + ) + .await; + + if result.is_err() { + return Ok(()); + } + + // Start a transaction let transaction = match self.pg_pool.try_begin().await { Ok(Some(tx)) => tx, _ => { debug!("Failed to start transaction to write snapshot, retrying later"); - self.queue.write().await.push_item(next_item); + queue.push_item(next_item); return Ok(()); }, }; + // Create the snapshot and enforce limits match create_snapshot_and_maintain_limit( transaction, &next_item.workspace_id, @@ -306,16 +306,13 @@ impl SnapshotCommandRunner { trace!( "successfully created snapshot for {}, remaining task: {}", next_item.object_id, - self.queue.read().await.len() + queue.len() ); let _ = self.cache.remove(&key.0).await; self.success_attempts.fetch_add(1, Ordering::Relaxed); Ok(()) }, - Err(e) => { - // self.queue.write().await.push_item(next_item); - Err(e) - }, + Err(e) => Err(e), // Return the error if snapshot creation fails } } }