chore: using spawn blocking (#467)
This commit is contained in:
parent
9a5636caec
commit
ecbc935d4b
|
|
@ -8,24 +8,31 @@ use collab_entity::CollabType;
|
|||
use tracing::instrument;
|
||||
|
||||
#[instrument(level = "trace", skip(data), fields(len = %data.len()))]
|
||||
pub fn validate_encode_collab(
|
||||
pub async fn validate_encode_collab(
|
||||
object_id: &str,
|
||||
data: &[u8],
|
||||
collab_type: &CollabType,
|
||||
) -> Result<(), RealtimeError> {
|
||||
let encoded_collab =
|
||||
EncodedCollab::decode_from_bytes(data).map_err(|err| RealtimeError::Internal(err.into()))?;
|
||||
let collab = Collab::new_with_doc_state(
|
||||
CollabOrigin::Empty,
|
||||
object_id,
|
||||
DocStateSource::FromDocState(encoded_collab.doc_state.to_vec()),
|
||||
vec![],
|
||||
false,
|
||||
)
|
||||
.map_err(|err| RealtimeError::Internal(err.into()))?;
|
||||
let collab_type = collab_type.clone();
|
||||
let object_id = object_id.to_string();
|
||||
let data = data.to_vec();
|
||||
|
||||
collab_type
|
||||
.validate(&collab)
|
||||
.map_err(|err| RealtimeError::NoRequiredCollabData(err.to_string()))?;
|
||||
Ok(())
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let encoded_collab =
|
||||
EncodedCollab::decode_from_bytes(&data).map_err(|err| RealtimeError::Internal(err.into()))?;
|
||||
let collab = Collab::new_with_doc_state(
|
||||
CollabOrigin::Empty,
|
||||
&object_id,
|
||||
DocStateSource::FromDocState(encoded_collab.doc_state.to_vec()),
|
||||
vec![],
|
||||
false,
|
||||
)
|
||||
.map_err(|err| RealtimeError::Internal(err.into()))?;
|
||||
|
||||
collab_type
|
||||
.validate(&collab)
|
||||
.map_err(|err| RealtimeError::NoRequiredCollabData(err.to_string()))?;
|
||||
Ok::<(), RealtimeError>(())
|
||||
})
|
||||
.await?
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ use tokio::runtime;
|
|||
use tokio::runtime::Runtime;
|
||||
use tokio::sync::Notify;
|
||||
use tokio::time::interval;
|
||||
use tracing::{error, trace};
|
||||
use tracing::{error, info, trace};
|
||||
|
||||
lazy_static! {
|
||||
pub(crate) static ref COLLAB_RUNTIME: Runtime = default_tokio_runtime().unwrap();
|
||||
|
|
@ -51,6 +51,10 @@ where
|
|||
metrics: Arc<CollabRealtimeMetrics>,
|
||||
command_recv: RTCommandReceiver,
|
||||
) -> Result<Self, RealtimeError> {
|
||||
if cfg!(feature = "multi-thread") {
|
||||
info!("CollaborationServer with multi-thread feature enabled");
|
||||
}
|
||||
|
||||
let metrics_calculate = CollabMetricsCalculate::default();
|
||||
let connect_state = ConnectState::new();
|
||||
let access_control = Arc::new(access_control);
|
||||
|
|
|
|||
|
|
@ -184,7 +184,9 @@ where
|
|||
¶ms.object_id,
|
||||
¶ms.encoded_collab_v1,
|
||||
¶ms.collab_type,
|
||||
) {
|
||||
)
|
||||
.await
|
||||
{
|
||||
return Err(AppError::NoRequiredData(format!(
|
||||
"collab doc state is not correct:{},{}",
|
||||
params.object_id, err
|
||||
|
|
|
|||
|
|
@ -53,6 +53,7 @@ impl DerefMut for PendingQueue {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct PendingItem {
|
||||
pub(crate) workspace_id: String,
|
||||
pub(crate) object_id: String,
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ use database_entity::dto::{AFSnapshotMeta, AFSnapshotMetas, InsertSnapshotParams
|
|||
use futures_util::StreamExt;
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
|
||||
use sqlx::{Acquire, PgPool};
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
|
@ -249,50 +250,49 @@ impl SnapshotCommandRunner {
|
|||
}
|
||||
|
||||
async fn process_next_batch(&self) -> Result<(), AppError> {
|
||||
let next_item = match self.queue.write().await.pop() {
|
||||
let mut queue = self.queue.write().await;
|
||||
|
||||
let next_item = match queue.pop() {
|
||||
Some(item) => item,
|
||||
None => return Ok(()), // No items to process
|
||||
None => return Ok(()),
|
||||
};
|
||||
|
||||
let key = SnapshotKey::from_object_id(&next_item.object_id);
|
||||
self.total_attempts.fetch_add(1, Ordering::Relaxed);
|
||||
let key = SnapshotKey::from_object_id(&next_item.object_id);
|
||||
|
||||
// Attempt to fetch the collab data from the cache
|
||||
let encoded_collab_v1 = match self.cache.try_get(&key.0).await {
|
||||
Ok(Some(data)) => {
|
||||
// This step is not necessary, but use it to check if the data is valid. Will be removed
|
||||
// in the future.
|
||||
match validate_encode_collab(&next_item.object_id, &data, &next_item.collab_type) {
|
||||
Ok(_) => data,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"Collab doc state is not correct when creating snapshot: {},{}",
|
||||
next_item.object_id, err
|
||||
);
|
||||
return Ok(());
|
||||
},
|
||||
}
|
||||
},
|
||||
Ok(None) => {
|
||||
warn!("Failed to get snapshot from cache: {}", key.0);
|
||||
return Ok(());
|
||||
},
|
||||
Ok(Some(data)) => data,
|
||||
Ok(None) => return Ok(()), // Cache miss, no data to process
|
||||
Err(_) => {
|
||||
if cfg!(debug_assertions) {
|
||||
error!("Failed to get snapshot from cache: {}", key.0);
|
||||
}
|
||||
self.queue.write().await.push_item(next_item);
|
||||
queue.push_item(next_item); // Push back to queue on error
|
||||
return Ok(());
|
||||
},
|
||||
};
|
||||
|
||||
// Validate collab data before processing
|
||||
let result = validate_encode_collab(
|
||||
&next_item.object_id,
|
||||
&encoded_collab_v1,
|
||||
&next_item.collab_type,
|
||||
)
|
||||
.await;
|
||||
|
||||
if result.is_err() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Start a transaction
|
||||
let transaction = match self.pg_pool.try_begin().await {
|
||||
Ok(Some(tx)) => tx,
|
||||
_ => {
|
||||
debug!("Failed to start transaction to write snapshot, retrying later");
|
||||
self.queue.write().await.push_item(next_item);
|
||||
queue.push_item(next_item);
|
||||
return Ok(());
|
||||
},
|
||||
};
|
||||
|
||||
// Create the snapshot and enforce limits
|
||||
match create_snapshot_and_maintain_limit(
|
||||
transaction,
|
||||
&next_item.workspace_id,
|
||||
|
|
@ -306,16 +306,13 @@ impl SnapshotCommandRunner {
|
|||
trace!(
|
||||
"successfully created snapshot for {}, remaining task: {}",
|
||||
next_item.object_id,
|
||||
self.queue.read().await.len()
|
||||
queue.len()
|
||||
);
|
||||
let _ = self.cache.remove(&key.0).await;
|
||||
self.success_attempts.fetch_add(1, Ordering::Relaxed);
|
||||
Ok(())
|
||||
},
|
||||
Err(e) => {
|
||||
// self.queue.write().await.push_item(next_item);
|
||||
Err(e)
|
||||
},
|
||||
Err(e) => Err(e), // Return the error if snapshot creation fails
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue