From 9c40c3128b1846008d74913971fb87788685b2fe Mon Sep 17 00:00:00 2001 From: nathan Date: Fri, 20 Dec 2024 11:08:01 +0800 Subject: [PATCH] chore: clippy --- libs/collab-stream/src/stream_router.rs | 9 +- libs/database-entity/src/dto.rs | 6 +- services/appflowy-collaborate/src/error.rs | 1 - .../src/group/group_init.rs | 116 ++++++++++-------- xtask/src/main.rs | 8 +- 5 files changed, 79 insertions(+), 61 deletions(-) diff --git a/libs/collab-stream/src/stream_router.rs b/libs/collab-stream/src/stream_router.rs index 8be5da0d..fb048c1a 100644 --- a/libs/collab-stream/src/stream_router.rs +++ b/libs/collab-stream/src/stream_router.rs @@ -286,7 +286,7 @@ mod test { const ROUTES_COUNT: usize = 200; const MSG_PER_ROUTE: usize = 10; let mut client = Client::open("redis://127.0.0.1/").unwrap(); - let mut keys = init_streams(&mut client, ROUTES_COUNT, MSG_PER_ROUTE); + let keys = init_streams(&mut client, ROUTES_COUNT, MSG_PER_ROUTE); let router = StreamRouter::new(&client).unwrap(); @@ -312,13 +312,14 @@ mod test { const ROUTES_COUNT: usize = 200; const MSG_PER_ROUTE: usize = 10; let mut client = Client::open("redis://127.0.0.1/").unwrap(); - let mut keys = init_streams(&mut client, ROUTES_COUNT, 0); + let keys = init_streams(&mut client, ROUTES_COUNT, 0); let router = StreamRouter::new(&client).unwrap(); let mut join_set = JoinSet::new(); - for key in keys.iter().cloned() { + for key in keys.iter() { let mut observer = router.observe(key.clone(), None); + let key = key.clone(); join_set.spawn(async move { for i in 0..MSG_PER_ROUTE { let (_msg_id, map) = observer.recv().await.unwrap(); @@ -331,7 +332,7 @@ mod test { for msg_idx in 0..MSG_PER_ROUTE { for key in keys.iter() { let data = format!("{}-{}", key, msg_idx); - let _: String = client.xadd(&key, "*", &[("data", data)]).unwrap(); + let _: String = client.xadd(key, "*", &[("data", data)]).unwrap(); } } diff --git a/libs/database-entity/src/dto.rs b/libs/database-entity/src/dto.rs index 49c25857..9a4e1024 100644 --- a/libs/database-entity/src/dto.rs +++ b/libs/database-entity/src/dto.rs @@ -107,16 +107,16 @@ impl Display for CollabParams { } impl CollabParams { - pub fn new( + pub fn new>( object_id: T, collab_type: CollabType, - encoded_collab_v1: Vec, + encoded_collab_v1: B, ) -> Self { let object_id = object_id.to_string(); Self { object_id, collab_type, - encoded_collab_v1: Bytes::from(encoded_collab_v1), + encoded_collab_v1: encoded_collab_v1.into(), } } diff --git a/services/appflowy-collaborate/src/error.rs b/services/appflowy-collaborate/src/error.rs index c3790401..ec8bca42 100644 --- a/services/appflowy-collaborate/src/error.rs +++ b/services/appflowy-collaborate/src/error.rs @@ -1,4 +1,3 @@ -use app_error::AppError; use collab::error::CollabError; use collab_stream::error::StreamError; use std::fmt::Display; diff --git a/services/appflowy-collaborate/src/group/group_init.rs b/services/appflowy-collaborate/src/group/group_init.rs index c2feb857..79f83cfe 100644 --- a/services/appflowy-collaborate/src/group/group_init.rs +++ b/services/appflowy-collaborate/src/group/group_init.rs @@ -20,6 +20,7 @@ use collab_stream::collab_update_sink::{AwarenessUpdateSink, CollabUpdateSink}; use crate::indexer::IndexerScheduler; use crate::metrics::CollabRealtimeMetrics; +use bytes::Bytes; use collab_stream::error::StreamError; use collab_stream::model::{AwarenessStreamUpdate, CollabStreamUpdate, MessageId, UpdateFlags}; use dashmap::DashMap; @@ -1145,45 +1146,16 @@ impl CollabPersister { let sv = tx.state_vector().encode_v1(); let doc_state_full = tx.encode_state_as_update_v1(&StateVector::default()); let full_len = doc_state_full.len(); - self - .metrics - .full_collab_size - .observe(doc_state_full.len() as f64); - let params = InsertSnapshotParams { - object_id: self.object_id.clone(), - doc_state: doc_state_full.into(), - workspace_id: self.workspace_id.clone(), - collab_type: self.collab_type.clone(), - }; - self - .storage - .create_snapshot(params) - .await - .map_err(|err| RealtimeError::CreateSnapshotFailed(err.to_string()))?; + self.write_doc_state_full(doc_state_full).await?; // 2. Generate document state with GC turned on and save it. tx.force_gc(); drop(tx); - let doc_state_light = collab .transact() .encode_state_as_update_v1(&StateVector::default()); let light_len = doc_state_light.len(); - let encoded_collab = EncodedCollab::new_v1(sv, doc_state_light) - .encode_to_bytes() - .map_err(|err| RealtimeError::Internal(err.into()))?; - self - .metrics - .collab_size - .observe(encoded_collab.len() as f64); - let params = CollabParams::new(&self.object_id, self.collab_type.clone(), encoded_collab); - let encoded_collab = params.encoded_collab_v1.clone(); - - self - .storage - .queue_insert_or_update_collab(&self.workspace_id, &self.uid, params, true) - .await - .map_err(|err| RealtimeError::Internal(err.into()))?; + self.write_doc_state_light(sv, doc_state_light).await?; tracing::debug!( "persisted collab {} snapshot at {}: {} and {} bytes", @@ -1193,23 +1165,6 @@ impl CollabPersister { light_len ); - let indexed_collab = IndexedCollab { - object_id: self.object_id.clone(), - collab_type: self.collab_type.clone(), - encoded_collab, - }; - if let Err(err) = self - .indexer_scheduler - .index_encoded_collab_one(&self.workspace_id, indexed_collab) - { - tracing::warn!( - "failed to index collab `{}/{}`: {}", - self.workspace_id, - self.object_id, - err - ); - } - // 3. finally we can drop Redis messages let now = SystemTime::UNIX_EPOCH.elapsed().unwrap().as_millis(); let msg_id = MessageId { @@ -1228,6 +1183,71 @@ impl CollabPersister { Ok(()) } + async fn write_doc_state_light( + &self, + sv: Vec, + doc_state_light: Vec, + ) -> Result<(), RealtimeError> { + let encoded_collab = EncodedCollab::new_v1(sv, doc_state_light) + .encode_to_bytes() + .map(Bytes::from) + .map_err(|err| RealtimeError::BincodeEncode(err.to_string()))?; + self + .metrics + .collab_size + .observe(encoded_collab.len() as f64); + let params = CollabParams::new( + &self.object_id, + self.collab_type.clone(), + encoded_collab.clone(), + ); + self + .storage + .queue_insert_or_update_collab(&self.workspace_id, &self.uid, params, true) + .await + .map_err(|err| RealtimeError::Internal(err.into()))?; + self.index_encoded_collab(encoded_collab); + Ok(()) + } + + async fn write_doc_state_full(&self, doc_state_full: Vec) -> Result<(), RealtimeError> { + self + .metrics + .full_collab_size + .observe(doc_state_full.len() as f64); + let params = InsertSnapshotParams { + object_id: self.object_id.clone(), + doc_state: doc_state_full.into(), + workspace_id: self.workspace_id.clone(), + collab_type: self.collab_type.clone(), + }; + self + .storage + .create_snapshot(params) + .await + .map_err(|err| RealtimeError::CreateSnapshotFailed(err.to_string()))?; + Ok(()) + } + + fn index_encoded_collab(&self, encoded_collab: Bytes) { + let indexed_collab = IndexedCollab { + object_id: self.object_id.clone(), + collab_type: self.collab_type.clone(), + encoded_collab, + }; + if let Err(err) = self + .indexer_scheduler + .index_encoded_collab_one(&self.workspace_id, indexed_collab) + { + tracing::warn!( + "failed to index collab `{}/{}`: {}", + self.workspace_id, + self.object_id, + err + ); + } + } + async fn load_collab_full(&self, keep_history: bool) -> Result, RealtimeError> { let doc_state = if keep_history { // if we want history-keeping variant, we need to get a snapshot diff --git a/xtask/src/main.rs b/xtask/src/main.rs index a48864a6..8b9d8681 100644 --- a/xtask/src/main.rs +++ b/xtask/src/main.rs @@ -102,11 +102,9 @@ fn spawn_server( cmd.stdout(Stdio::null()).stderr(Stdio::null()); } - Ok( - cmd - .spawn() - .context(format!("Failed to start {} process", name))?, - ) + cmd + .spawn() + .context(format!("Failed to start {} process", name)) } async fn kill_existing_process(process_identifier: &str) -> Result<()> {