From 1d57c80fa6ddce5a11187fbb270a85e5710383e0 Mon Sep 17 00:00:00 2001 From: Zack Fu Zi Xiang Date: Thu, 20 Jun 2024 17:55:52 +0800 Subject: [PATCH] feat: use custome async reader --- libs/client-api/src/native/http_native.rs | 9 ++- src/api/util.rs | 94 +++++++++++++++++++++++ src/api/workspace.rs | 47 ++++++------ 3 files changed, 127 insertions(+), 23 deletions(-) diff --git a/libs/client-api/src/native/http_native.rs b/libs/client-api/src/native/http_native.rs index a0cdceb7..a9239e26 100644 --- a/libs/client-api/src/native/http_native.rs +++ b/libs/client-api/src/native/http_native.rs @@ -315,6 +315,7 @@ where pub struct PublishCollabItemStream { items: Vec>, idx: usize, + done: bool, } impl PublishCollabItemStream { @@ -322,6 +323,7 @@ impl PublishCollabItemStream { PublishCollabItemStream { items: publish_collab_items, idx: 0, + done: false, } } } @@ -337,6 +339,10 @@ where let mut self_mut = self.as_mut(); if self_mut.idx >= self_mut.items.len() { + if !self_mut.done { + self_mut.done = true; + return Poll::Ready(Some(Ok((0 as u32).to_le_bytes().to_vec().into()))); + } return Poll::Ready(None); } @@ -357,9 +363,10 @@ where { let meta = serde_json::to_vec(&m)?; - let mut chunk = Vec::with_capacity(4 + meta.len() + d.len()); + let mut chunk = Vec::with_capacity(8 + meta.len() + d.len()); chunk.extend_from_slice(&(meta.len() as u32).to_le_bytes()); // Encode metadata length chunk.extend_from_slice(&meta); + chunk.extend_from_slice(&(d.len() as u32).to_le_bytes()); // Encode data length chunk.extend_from_slice(d); Ok(Bytes::from(chunk)) diff --git a/src/api/util.rs b/src/api/util.rs index cfa1d507..fdff228c 100644 --- a/src/api/util.rs +++ b/src/api/util.rs @@ -1,11 +1,14 @@ use crate::domain::compression::{CompressionType, X_COMPRESSION_BUFFER_SIZE, X_COMPRESSION_TYPE}; use actix_http::header::HeaderMap; +use actix_web::web::Payload; use app_error::AppError; use async_trait::async_trait; +use byteorder::{ByteOrder, LittleEndian}; use collab_rt_protocol::validate_encode_collab; use database_entity::dto::CollabParams; use std::str::FromStr; +use tokio_stream::StreamExt; #[inline] pub fn compress_type_from_header_value(headers: &HeaderMap) -> Result { @@ -74,3 +77,94 @@ impl CollabValidator for CollabParams { .map_err(|err| AppError::NoRequiredData(err.to_string())) } } + +pub struct PayloadReader { + payload: Payload, + buffer: Vec, + buf_start: usize, + buf_end: usize, +} + +impl PayloadReader { + pub fn new(payload: Payload) -> Self { + Self { + payload, + buffer: Vec::new(), + buf_start: 0, + buf_end: 0, + } + } + + pub async fn read_exact(&mut self, dest: &mut [u8]) -> actix_web::Result<()> { + let mut written = 0; + while written < dest.len() { + if self.buf_start == self.buf_end { + self.fill_buffer().await?; + } + + let current_dest = &mut dest[written..]; + let current_src = &self.buffer[self.buf_start..self.buf_end]; + let n = copy_buffer(current_src, current_dest); + written += n; + self.buf_start += n; + } + Ok(()) + } + + pub async fn read_u32_little_endian(&mut self) -> actix_web::Result { + self.fill_at_least(4).await?; + + let bytes: [u8; 4] = [ + self.buffer[self.buf_start], + self.buffer[self.buf_start + 1], + self.buffer[self.buf_start + 2], + self.buffer[self.buf_start + 3], + ]; + self.buf_start += 4; + + Ok(LittleEndian::read_u32(&bytes)) + } + + async fn fill_at_least(&mut self, min_len: usize) -> actix_web::Result<()> { + while self.len() < min_len { + let n = self.fill_buffer().await?; + if n == 0 { + println!("min_len: {}", min_len); + println!("len: {}", self.len()); + println!("buf_start: {}", self.buf_start); + println!("buf_end: {}", self.buf_end); + println!("buffer: {:?}", self.buffer.len()); + return Err(AppError::InvalidRequest("unexpected EOF".to_string()).into()); + } + } + Ok(()) + } + + fn len(&self) -> usize { + self.buf_end - self.buf_start + } + + async fn fill_buffer(&mut self) -> actix_web::Result { + if self.buf_start == self.buf_end { + self.buffer.clear(); + self.buf_start = 0; + self.buf_end = 0; + } + + let bytes = self.payload.try_next().await?; + match bytes { + Some(bytes) => { + self.buffer.extend_from_slice(&bytes); + self.buf_end += bytes.len(); + Ok(bytes.len()) + }, + None => Ok(0), + } + } +} + +fn copy_buffer(src: &[u8], dest: &mut [u8]) -> usize { + let bytes_to_copy = std::cmp::min(src.len(), dest.len()); + dest[..bytes_to_copy].copy_from_slice(&src[..bytes_to_copy]); + bytes_to_copy +} diff --git a/src/api/workspace.rs b/src/api/workspace.rs index 4239145c..91379484 100644 --- a/src/api/workspace.rs +++ b/src/api/workspace.rs @@ -1,5 +1,4 @@ -use std::io::{Cursor, Read}; - +use crate::api::util::PayloadReader; use actix_web::web::{Bytes, Payload}; use actix_web::web::{Data, Json, PayloadConfig}; use actix_web::{web, Scope}; @@ -20,7 +19,6 @@ use access_control::collab::CollabAccessControl; use app_error::AppError; use appflowy_collaborate::actix_ws::entities::ClientStreamMessage; use authentication::jwt::UserUuid; -use byteorder::{LittleEndian, ReadBytesExt}; use collab_rt_entity::realtime_proto::HttpRealtimeMessage; use collab_rt_entity::RealtimeMessage; use collab_rt_protocol::validate_encode_collab; @@ -1003,42 +1001,47 @@ async fn get_published_collab_info_handler( async fn post_publish_collabs_handler( workspace_id: web::Path, user_uuid: UserUuid, - mut payload: Payload, + payload: Payload, state: Data, ) -> Result>> { let workspace_id = workspace_id.into_inner(); - // PublishCollabItem let mut accumulator = Vec::>>::new(); + let mut payload_reader: PayloadReader = PayloadReader::new(payload); - while let Some(item) = payload.next().await { - let item = item?; - let item_len = item.len(); - - let mut cursor = Cursor::new(item); + loop { let meta: PublishCollabMetadata = { - let meta_len = cursor.read_u32::()?; + let meta_len = payload_reader.read_u32_little_endian().await?; + println!("meta_len: {}", meta_len); if meta_len > 4 * 1024 * 1024 { // 4MB Limit for metadata return Err(AppError::InvalidRequest(String::from("metadata too large")).into()); } + if meta_len == 0 { + println!("meta_len is 0"); + break; + } + let mut meta_buffer = vec![0; meta_len as usize]; - cursor.read_exact(&mut meta_buffer)?; + payload_reader.read_exact(&mut meta_buffer).await?; serde_json::from_slice(&meta_buffer)? }; - let data: Vec = { - let remain_len = item_len - cursor.position() as usize; - let mut data_buffer = vec![0; remain_len]; - cursor.read_exact(&mut data_buffer)?; + println!("meta: {:?}", meta); + + let data = { + let data_len = payload_reader.read_u32_little_endian().await?; + println!("data_len: {}", data_len); + if data_len > 128 * 1024 * 1024 { + // 128MB Limit for data + return Err(AppError::InvalidRequest(String::from("data too large")).into()); + } + let mut data_buffer = vec![0; data_len as usize]; + payload_reader.read_exact(&mut data_buffer).await?; data_buffer }; - accumulator.push(PublishCollabItem { meta, data }); + println!("data: {:?}", data); - assert_eq!( - cursor.position() as usize, - cursor.get_ref().len(), - "Cursor is not empty after reading" - ); + accumulator.push(PublishCollabItem { meta, data }); } if accumulator.is_empty() {