chore: use rayon to do compression on client api (#813)

* chore: use rayon to do compression on client api

* chore: update client deps
This commit is contained in:
Nathan.fooo 2024-09-11 21:47:14 +08:00 committed by GitHub
parent abae8d2d1b
commit cbad6f22be
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 49 additions and 35 deletions

1
Cargo.lock generated
View File

@ -1973,6 +1973,7 @@ dependencies = [
"percent-encoding",
"pin-project",
"prost",
"rayon",
"reqwest 0.11.27",
"scraper",
"semver",

View File

@ -52,6 +52,7 @@ lazy_static = { workspace = true }
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio-retry = "0.3"
tokio-util = "0.7"
rayon = "1.10.0"
infra = { workspace = true, features = ["file_util"] }
[target.'cfg(not(target_arch = "wasm32"))'.dependencies.tokio]

View File

@ -1028,25 +1028,41 @@ pub(crate) fn log_request_id(resp: &reqwest::Response) {
}
#[cfg(feature = "enable_brotli")]
pub async fn spawn_blocking_brotli_compress(
pub fn brotli_compress(
data: Vec<u8>,
quality: u32,
buffer_size: usize,
) -> Result<Vec<u8>, AppError> {
tokio::task::spawn_blocking(move || {
let mut compressor = brotli::CompressorReader::new(&*data, buffer_size, quality, 22);
let mut compressed_data = Vec::new();
compressor
.read_to_end(&mut compressed_data)
.map_err(|err| AppError::InvalidRequest(format!("Failed to compress data: {}", err)))?;
Ok(compressed_data)
})
.await
.map_err(AppError::from)?
let mut compressor = brotli::CompressorReader::new(&*data, buffer_size, quality, 22);
let mut compressed_data = Vec::new();
compressor
.read_to_end(&mut compressed_data)
.map_err(|err| AppError::InvalidRequest(format!("Failed to compress data: {}", err)))?;
Ok(compressed_data)
}
#[cfg(feature = "enable_brotli")]
pub async fn blocking_brotli_compress(
data: Vec<u8>,
quality: u32,
buffer_size: usize,
) -> Result<Vec<u8>, AppError> {
tokio::task::spawn_blocking(move || brotli_compress(data, quality, buffer_size))
.await
.map_err(AppError::from)?
}
#[cfg(not(feature = "enable_brotli"))]
pub async fn spawn_blocking_brotli_compress(
pub async fn blocking_brotli_compress(
data: Vec<u8>,
_quality: u32,
_buffer_size: usize,
) -> Result<Vec<u8>, AppError> {
Ok(data)
}
#[cfg(not(feature = "enable_brotli"))]
pub fn brotli_compress(
data: Vec<u8>,
_quality: u32,
_buffer_size: usize,

View File

@ -1,5 +1,5 @@
use crate::http::log_request_id;
use crate::{spawn_blocking_brotli_compress, Client};
use crate::{blocking_brotli_compress, Client};
use app_error::AppError;
use client_api_entity::{
BatchQueryCollabParams, BatchQueryCollabResult, CreateCollabParams, DeleteCollabParams,
@ -20,7 +20,7 @@ impl Client {
.to_bytes()
.map_err(|err| AppError::Internal(err.into()))?;
let compress_bytes = spawn_blocking_brotli_compress(
let compress_bytes = blocking_brotli_compress(
bytes,
self.config.compression_quality,
self.config.compression_buffer_size,

View File

@ -1,11 +1,12 @@
use crate::http::log_request_id;
use crate::native::GetCollabAction;
use crate::ws::{ConnectInfo, WSClientConnectURLProvider, WSClientHttpSender, WSError};
use crate::{spawn_blocking_brotli_compress, Client};
use crate::{blocking_brotli_compress, brotli_compress, Client};
use crate::{RefreshTokenAction, RefreshTokenRetryCondition};
use anyhow::anyhow;
use app_error::AppError;
use async_trait::async_trait;
use rayon::iter::ParallelIterator;
use bytes::Bytes;
use client_api_entity::{CollabParams, PublishCollabItem, QueryCollabParams};
@ -26,6 +27,7 @@ use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC};
use std::pin::Pin;
use std::sync::atomic::Ordering;
use rayon::prelude::IntoParallelIterator;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio_retry::strategy::{ExponentialBackoff, FixedInterval};
@ -150,8 +152,7 @@ impl Client {
) -> Result<(), AppResponseError> {
let device_id = device_id.to_string();
let payload =
spawn_blocking_brotli_compress(msg.into_data(), 6, self.config.compression_buffer_size)
.await?;
blocking_brotli_compress(msg.into_data(), 6, self.config.compression_buffer_size).await?;
let msg = HttpRealtimeMessage { device_id, payload }.encode_to_vec();
let body = Body::wrap_stream(stream::iter(vec![Ok::<_, reqwest::Error>(msg)]));
@ -174,27 +175,22 @@ impl Client {
) -> Result<(), AppResponseError> {
let url = self.batch_create_collab_url(workspace_id);
// Parallel compression
let compression_tasks: Vec<_> = params_list
.into_iter()
.map(|params| {
let config = self.config.clone();
af_spawn(async move {
let data = params.to_bytes().map_err(AppError::from)?;
spawn_blocking_brotli_compress(
data,
config.compression_quality,
config.compression_buffer_size,
)
.await
})
let compression_tasks = params_list
.into_par_iter()
.filter_map(|params| {
let data = params.to_bytes().ok()?;
brotli_compress(
data,
self.config.compression_quality,
self.config.compression_buffer_size,
)
.ok()
})
.collect();
.collect::<Vec<_>>();
let mut framed_data = Vec::new();
let mut size_count = 0;
for task in compression_tasks {
let compressed = task.await??;
for compressed in compression_tasks {
// The length of a u32 in bytes is 4. The server uses a u32 to read the size of each data frame,
// hence the frame size header is always 4 bytes. It's crucial not to alter this size value,
// as the server's logic for frame size reading is based on this fixed 4-byte length.

View File

@ -3,7 +3,7 @@
# Generate the current dependency list
cargo tree > current_deps.txt
BASELINE_COUNT=611
BASELINE_COUNT=619
CURRENT_COUNT=$(cat current_deps.txt | wc -l)
echo "Expected dependency count (baseline): $BASELINE_COUNT"