chore: clippy

This commit is contained in:
nathan 2024-12-20 11:08:01 +08:00
parent 011a5b59c9
commit 9c40c3128b
5 changed files with 79 additions and 61 deletions

View File

@ -286,7 +286,7 @@ mod test {
const ROUTES_COUNT: usize = 200;
const MSG_PER_ROUTE: usize = 10;
let mut client = Client::open("redis://127.0.0.1/").unwrap();
let mut keys = init_streams(&mut client, ROUTES_COUNT, MSG_PER_ROUTE);
let keys = init_streams(&mut client, ROUTES_COUNT, MSG_PER_ROUTE);
let router = StreamRouter::new(&client).unwrap();
@ -312,13 +312,14 @@ mod test {
const ROUTES_COUNT: usize = 200;
const MSG_PER_ROUTE: usize = 10;
let mut client = Client::open("redis://127.0.0.1/").unwrap();
let mut keys = init_streams(&mut client, ROUTES_COUNT, 0);
let keys = init_streams(&mut client, ROUTES_COUNT, 0);
let router = StreamRouter::new(&client).unwrap();
let mut join_set = JoinSet::new();
for key in keys.iter().cloned() {
for key in keys.iter() {
let mut observer = router.observe(key.clone(), None);
let key = key.clone();
join_set.spawn(async move {
for i in 0..MSG_PER_ROUTE {
let (_msg_id, map) = observer.recv().await.unwrap();
@ -331,7 +332,7 @@ mod test {
for msg_idx in 0..MSG_PER_ROUTE {
for key in keys.iter() {
let data = format!("{}-{}", key, msg_idx);
let _: String = client.xadd(&key, "*", &[("data", data)]).unwrap();
let _: String = client.xadd(key, "*", &[("data", data)]).unwrap();
}
}

View File

@ -107,16 +107,16 @@ impl Display for CollabParams {
}
impl CollabParams {
pub fn new<T: ToString>(
pub fn new<T: ToString, B: Into<Bytes>>(
object_id: T,
collab_type: CollabType,
encoded_collab_v1: Vec<u8>,
encoded_collab_v1: B,
) -> Self {
let object_id = object_id.to_string();
Self {
object_id,
collab_type,
encoded_collab_v1: Bytes::from(encoded_collab_v1),
encoded_collab_v1: encoded_collab_v1.into(),
}
}

View File

@ -1,4 +1,3 @@
use app_error::AppError;
use collab::error::CollabError;
use collab_stream::error::StreamError;
use std::fmt::Display;

View File

@ -20,6 +20,7 @@ use collab_stream::collab_update_sink::{AwarenessUpdateSink, CollabUpdateSink};
use crate::indexer::IndexerScheduler;
use crate::metrics::CollabRealtimeMetrics;
use bytes::Bytes;
use collab_stream::error::StreamError;
use collab_stream::model::{AwarenessStreamUpdate, CollabStreamUpdate, MessageId, UpdateFlags};
use dashmap::DashMap;
@ -1145,45 +1146,16 @@ impl CollabPersister {
let sv = tx.state_vector().encode_v1();
let doc_state_full = tx.encode_state_as_update_v1(&StateVector::default());
let full_len = doc_state_full.len();
self
.metrics
.full_collab_size
.observe(doc_state_full.len() as f64);
let params = InsertSnapshotParams {
object_id: self.object_id.clone(),
doc_state: doc_state_full.into(),
workspace_id: self.workspace_id.clone(),
collab_type: self.collab_type.clone(),
};
self
.storage
.create_snapshot(params)
.await
.map_err(|err| RealtimeError::CreateSnapshotFailed(err.to_string()))?;
self.write_doc_state_full(doc_state_full).await?;
// 2. Generate document state with GC turned on and save it.
tx.force_gc();
drop(tx);
let doc_state_light = collab
.transact()
.encode_state_as_update_v1(&StateVector::default());
let light_len = doc_state_light.len();
let encoded_collab = EncodedCollab::new_v1(sv, doc_state_light)
.encode_to_bytes()
.map_err(|err| RealtimeError::Internal(err.into()))?;
self
.metrics
.collab_size
.observe(encoded_collab.len() as f64);
let params = CollabParams::new(&self.object_id, self.collab_type.clone(), encoded_collab);
let encoded_collab = params.encoded_collab_v1.clone();
self
.storage
.queue_insert_or_update_collab(&self.workspace_id, &self.uid, params, true)
.await
.map_err(|err| RealtimeError::Internal(err.into()))?;
self.write_doc_state_light(sv, doc_state_light).await?;
tracing::debug!(
"persisted collab {} snapshot at {}: {} and {} bytes",
@ -1193,23 +1165,6 @@ impl CollabPersister {
light_len
);
let indexed_collab = IndexedCollab {
object_id: self.object_id.clone(),
collab_type: self.collab_type.clone(),
encoded_collab,
};
if let Err(err) = self
.indexer_scheduler
.index_encoded_collab_one(&self.workspace_id, indexed_collab)
{
tracing::warn!(
"failed to index collab `{}/{}`: {}",
self.workspace_id,
self.object_id,
err
);
}
// 3. finally we can drop Redis messages
let now = SystemTime::UNIX_EPOCH.elapsed().unwrap().as_millis();
let msg_id = MessageId {
@ -1228,6 +1183,71 @@ impl CollabPersister {
Ok(())
}
async fn write_doc_state_light(
&self,
sv: Vec<u8>,
doc_state_light: Vec<u8>,
) -> Result<(), RealtimeError> {
let encoded_collab = EncodedCollab::new_v1(sv, doc_state_light)
.encode_to_bytes()
.map(Bytes::from)
.map_err(|err| RealtimeError::BincodeEncode(err.to_string()))?;
self
.metrics
.collab_size
.observe(encoded_collab.len() as f64);
let params = CollabParams::new(
&self.object_id,
self.collab_type.clone(),
encoded_collab.clone(),
);
self
.storage
.queue_insert_or_update_collab(&self.workspace_id, &self.uid, params, true)
.await
.map_err(|err| RealtimeError::Internal(err.into()))?;
self.index_encoded_collab(encoded_collab);
Ok(())
}
async fn write_doc_state_full(&self, doc_state_full: Vec<u8>) -> Result<(), RealtimeError> {
self
.metrics
.full_collab_size
.observe(doc_state_full.len() as f64);
let params = InsertSnapshotParams {
object_id: self.object_id.clone(),
doc_state: doc_state_full.into(),
workspace_id: self.workspace_id.clone(),
collab_type: self.collab_type.clone(),
};
self
.storage
.create_snapshot(params)
.await
.map_err(|err| RealtimeError::CreateSnapshotFailed(err.to_string()))?;
Ok(())
}
fn index_encoded_collab(&self, encoded_collab: Bytes) {
let indexed_collab = IndexedCollab {
object_id: self.object_id.clone(),
collab_type: self.collab_type.clone(),
encoded_collab,
};
if let Err(err) = self
.indexer_scheduler
.index_encoded_collab_one(&self.workspace_id, indexed_collab)
{
tracing::warn!(
"failed to index collab `{}/{}`: {}",
self.workspace_id,
self.object_id,
err
);
}
}
async fn load_collab_full(&self, keep_history: bool) -> Result<Option<Collab>, RealtimeError> {
let doc_state = if keep_history {
// if we want history-keeping variant, we need to get a snapshot

View File

@ -102,11 +102,9 @@ fn spawn_server(
cmd.stdout(Stdio::null()).stderr(Stdio::null());
}
Ok(
cmd
.spawn()
.context(format!("Failed to start {} process", name))?,
)
cmd
.spawn()
.context(format!("Failed to start {} process", name))
}
async fn kill_existing_process(process_identifier: &str) -> Result<()> {