chore: wrap spawn blocking when decoding collab (#767)

This commit is contained in:
Nathan.fooo 2024-08-29 22:53:33 +08:00 committed by GitHub
parent 8f7c35c491
commit 54b811125e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 39 additions and 21 deletions

1
Cargo.lock generated
View File

@ -2245,6 +2245,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"bincode",
"bytes",
"chrono",
"collab-entity",
"futures",

View File

@ -15,6 +15,7 @@ futures = "0.3.30"
tracing = "0.1"
serde = { version = "1", features = ["derive"] }
bincode = "1.3.3"
bytes.workspace = true
collab-entity.workspace = true
serde_json.workspace = true
chrono = "0.4"

View File

@ -1,15 +1,15 @@
use crate::error::{internal, StreamError};
use bytes::Bytes;
use collab_entity::proto::collab::collab_update_event::Update;
use collab_entity::{proto, CollabType};
use std::collections::BTreeMap;
use std::fmt::{Display, Formatter};
use std::ops::Deref;
use std::str::FromStr;
use crate::error::{internal, StreamError};
use prost::Message;
use redis::streams::StreamId;
use redis::{FromRedisValue, RedisError, RedisResult, Value};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::fmt::{Display, Formatter};
use std::ops::Deref;
use std::str::FromStr;
/// The [MessageId] generated by XADD has two parts: a timestamp and a sequence number, separated by
/// a hyphen (-). The timestamp is based on the server's time when the message is added, and the
@ -20,7 +20,7 @@ use serde::{Deserialize, Serialize};
///
/// An example message ID might look like this: 1631020452097-0. In this example, 1631020452097 is
/// the timestamp in milliseconds, and 0 is the sequence number.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct MessageId {
pub timestamp_ms: u64,
pub sequence_number: u16,
@ -120,9 +120,9 @@ impl FromRedisValue for StreamMessageByStreamKey {
}
/// A message in the Redis stream. It's the same as [StreamBinary] but with additional metadata.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct StreamMessage {
pub data: Vec<u8>,
pub data: Bytes,
/// only applicable when reading from redis
pub id: MessageId,
}
@ -158,7 +158,10 @@ impl FromRedisValue for StreamMessage {
verify_field(&fields[0], "data")?;
let raw_data = Vec::<u8>::from_redis_value(&fields[1])?;
Ok(StreamMessage { data: raw_data, id })
Ok(StreamMessage {
data: Bytes::from(raw_data),
id,
})
}
}
@ -179,7 +182,7 @@ pub struct StreamBinary(pub Vec<u8>);
impl From<StreamMessage> for StreamBinary {
fn from(m: StreamMessage) -> Self {
Self(m.data)
Self(m.data.to_vec())
}
}

View File

@ -124,9 +124,15 @@ where
};
let params = {
let lock = collab.read().await;
let mut params = get_encode_collab(&workspace_id, &object_id, &lock, &collab_type)?;
let cloned_collab = collab.clone();
let (workspace_id, mut params, object_id) = tokio::task::spawn_blocking(move || {
let collab = cloned_collab.blocking_read();
let params = get_encode_collab(&workspace_id, &object_id, &collab, &collab_type)?;
Ok::<_, AppError>((workspace_id, params, object_id))
})
.await??;
let lock = collab.read().await;
if let Some(indexer) = &self.indexer {
match indexer.embedding_params(&lock) {
Ok(embedding_params) => {

View File

@ -64,14 +64,15 @@ impl CollabHistory {
return Ok(None);
}
trace!("[History] prepare to save snapshots to disk");
let (doc_state, state_vector) = {
let lock = collab.read().await;
let (doc_state, state_vector) = tokio::task::spawn_blocking(move || {
let lock = collab.blocking_read();
let txn = lock.transact();
// TODO(nathan): reduce the size of doc_state_v2 by encoding the previous [CollabStateSnapshot] doc_state_v2
let doc_state_v2 = txn.encode_state_as_update_v2(&StateVector::default());
let state_vector = txn.state_vector();
(doc_state_v2, state_vector)
};
Ok::<_, HistoryError>((doc_state_v2, state_vector))
})
.await
.map_err(|err| HistoryError::Internal(err.into()))??;
let state = CollabSnapshotState::new(
object_id,

View File

@ -183,9 +183,15 @@ async fn process_messages(
_object_id: &str,
_collab_type: &CollabType,
) -> Result<(), HistoryError> {
let mut lock = collab.write().await;
apply_updates(&messages, &mut lock)?;
drop(lock);
let cloned_message = messages.clone();
tokio::task::spawn_blocking(move || {
let mut lock = collab.blocking_write();
apply_updates(&cloned_message, &mut lock)?;
drop(lock);
Ok::<_, HistoryError>(())
})
.await
.map_err(|e| HistoryError::Internal(e.into()))??;
update_stream.ack_messages(&messages).await?;
Ok(())
}