From cbad6f22bec94e2958287b0bcc9150538768b9bb Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Wed, 11 Sep 2024 21:47:14 +0800 Subject: [PATCH] chore: use rayon to do compression on client api (#813) * chore: use rayon to do compression on client api * chore: update client deps --- Cargo.lock | 1 + libs/client-api/Cargo.toml | 1 + libs/client-api/src/http.rs | 40 ++++++++++++++++------- libs/client-api/src/http_collab.rs | 4 +-- libs/client-api/src/native/http_native.rs | 36 +++++++++----------- script/client_api_deps_check.sh | 2 +- 6 files changed, 49 insertions(+), 35 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index de045a43..f69fb626 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1973,6 +1973,7 @@ dependencies = [ "percent-encoding", "pin-project", "prost", + "rayon", "reqwest 0.11.27", "scraper", "semver", diff --git a/libs/client-api/Cargo.toml b/libs/client-api/Cargo.toml index 506d498c..5b536935 100644 --- a/libs/client-api/Cargo.toml +++ b/libs/client-api/Cargo.toml @@ -52,6 +52,7 @@ lazy_static = { workspace = true } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] tokio-retry = "0.3" tokio-util = "0.7" +rayon = "1.10.0" infra = { workspace = true, features = ["file_util"] } [target.'cfg(not(target_arch = "wasm32"))'.dependencies.tokio] diff --git a/libs/client-api/src/http.rs b/libs/client-api/src/http.rs index 2f4b5571..e3823c3b 100644 --- a/libs/client-api/src/http.rs +++ b/libs/client-api/src/http.rs @@ -1028,25 +1028,41 @@ pub(crate) fn log_request_id(resp: &reqwest::Response) { } #[cfg(feature = "enable_brotli")] -pub async fn spawn_blocking_brotli_compress( +pub fn brotli_compress( data: Vec, quality: u32, buffer_size: usize, ) -> Result, AppError> { - tokio::task::spawn_blocking(move || { - let mut compressor = brotli::CompressorReader::new(&*data, buffer_size, quality, 22); - let mut compressed_data = Vec::new(); - compressor - .read_to_end(&mut compressed_data) - .map_err(|err| AppError::InvalidRequest(format!("Failed to compress data: {}", err)))?; - Ok(compressed_data) - }) - .await - .map_err(AppError::from)? + let mut compressor = brotli::CompressorReader::new(&*data, buffer_size, quality, 22); + let mut compressed_data = Vec::new(); + compressor + .read_to_end(&mut compressed_data) + .map_err(|err| AppError::InvalidRequest(format!("Failed to compress data: {}", err)))?; + Ok(compressed_data) +} + +#[cfg(feature = "enable_brotli")] +pub async fn blocking_brotli_compress( + data: Vec, + quality: u32, + buffer_size: usize, +) -> Result, AppError> { + tokio::task::spawn_blocking(move || brotli_compress(data, quality, buffer_size)) + .await + .map_err(AppError::from)? } #[cfg(not(feature = "enable_brotli"))] -pub async fn spawn_blocking_brotli_compress( +pub async fn blocking_brotli_compress( + data: Vec, + _quality: u32, + _buffer_size: usize, +) -> Result, AppError> { + Ok(data) +} + +#[cfg(not(feature = "enable_brotli"))] +pub fn brotli_compress( data: Vec, _quality: u32, _buffer_size: usize, diff --git a/libs/client-api/src/http_collab.rs b/libs/client-api/src/http_collab.rs index 5deaa7e4..cdeb6f06 100644 --- a/libs/client-api/src/http_collab.rs +++ b/libs/client-api/src/http_collab.rs @@ -1,5 +1,5 @@ use crate::http::log_request_id; -use crate::{spawn_blocking_brotli_compress, Client}; +use crate::{blocking_brotli_compress, Client}; use app_error::AppError; use client_api_entity::{ BatchQueryCollabParams, BatchQueryCollabResult, CreateCollabParams, DeleteCollabParams, @@ -20,7 +20,7 @@ impl Client { .to_bytes() .map_err(|err| AppError::Internal(err.into()))?; - let compress_bytes = spawn_blocking_brotli_compress( + let compress_bytes = blocking_brotli_compress( bytes, self.config.compression_quality, self.config.compression_buffer_size, diff --git a/libs/client-api/src/native/http_native.rs b/libs/client-api/src/native/http_native.rs index 25ef1f6b..8ab0764b 100644 --- a/libs/client-api/src/native/http_native.rs +++ b/libs/client-api/src/native/http_native.rs @@ -1,11 +1,12 @@ use crate::http::log_request_id; use crate::native::GetCollabAction; use crate::ws::{ConnectInfo, WSClientConnectURLProvider, WSClientHttpSender, WSError}; -use crate::{spawn_blocking_brotli_compress, Client}; +use crate::{blocking_brotli_compress, brotli_compress, Client}; use crate::{RefreshTokenAction, RefreshTokenRetryCondition}; use anyhow::anyhow; use app_error::AppError; use async_trait::async_trait; +use rayon::iter::ParallelIterator; use bytes::Bytes; use client_api_entity::{CollabParams, PublishCollabItem, QueryCollabParams}; @@ -26,6 +27,7 @@ use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC}; use std::pin::Pin; use std::sync::atomic::Ordering; +use rayon::prelude::IntoParallelIterator; use std::task::{Context, Poll}; use std::time::Duration; use tokio_retry::strategy::{ExponentialBackoff, FixedInterval}; @@ -150,8 +152,7 @@ impl Client { ) -> Result<(), AppResponseError> { let device_id = device_id.to_string(); let payload = - spawn_blocking_brotli_compress(msg.into_data(), 6, self.config.compression_buffer_size) - .await?; + blocking_brotli_compress(msg.into_data(), 6, self.config.compression_buffer_size).await?; let msg = HttpRealtimeMessage { device_id, payload }.encode_to_vec(); let body = Body::wrap_stream(stream::iter(vec![Ok::<_, reqwest::Error>(msg)])); @@ -174,27 +175,22 @@ impl Client { ) -> Result<(), AppResponseError> { let url = self.batch_create_collab_url(workspace_id); - // Parallel compression - let compression_tasks: Vec<_> = params_list - .into_iter() - .map(|params| { - let config = self.config.clone(); - af_spawn(async move { - let data = params.to_bytes().map_err(AppError::from)?; - spawn_blocking_brotli_compress( - data, - config.compression_quality, - config.compression_buffer_size, - ) - .await - }) + let compression_tasks = params_list + .into_par_iter() + .filter_map(|params| { + let data = params.to_bytes().ok()?; + brotli_compress( + data, + self.config.compression_quality, + self.config.compression_buffer_size, + ) + .ok() }) - .collect(); + .collect::>(); let mut framed_data = Vec::new(); let mut size_count = 0; - for task in compression_tasks { - let compressed = task.await??; + for compressed in compression_tasks { // The length of a u32 in bytes is 4. The server uses a u32 to read the size of each data frame, // hence the frame size header is always 4 bytes. It's crucial not to alter this size value, // as the server's logic for frame size reading is based on this fixed 4-byte length. diff --git a/script/client_api_deps_check.sh b/script/client_api_deps_check.sh index 6a981592..6c947187 100755 --- a/script/client_api_deps_check.sh +++ b/script/client_api_deps_check.sh @@ -3,7 +3,7 @@ # Generate the current dependency list cargo tree > current_deps.txt -BASELINE_COUNT=611 +BASELINE_COUNT=619 CURRENT_COUNT=$(cat current_deps.txt | wc -l) echo "Expected dependency count (baseline): $BASELINE_COUNT"