feat: use custome async reader

This commit is contained in:
Zack Fu Zi Xiang 2024-06-20 17:55:52 +08:00
parent ab5c7bb830
commit 1d57c80fa6
No known key found for this signature in database
3 changed files with 127 additions and 23 deletions

View File

@ -315,6 +315,7 @@ where
pub struct PublishCollabItemStream<Metadata, Data> {
items: Vec<PublishCollabItem<Metadata, Data>>,
idx: usize,
done: bool,
}
impl<Metadata, Data> PublishCollabItemStream<Metadata, Data> {
@ -322,6 +323,7 @@ impl<Metadata, Data> PublishCollabItemStream<Metadata, Data> {
PublishCollabItemStream {
items: publish_collab_items,
idx: 0,
done: false,
}
}
}
@ -337,6 +339,10 @@ where
let mut self_mut = self.as_mut();
if self_mut.idx >= self_mut.items.len() {
if !self_mut.done {
self_mut.done = true;
return Poll::Ready(Some(Ok((0 as u32).to_le_bytes().to_vec().into())));
}
return Poll::Ready(None);
}
@ -357,9 +363,10 @@ where
{
let meta = serde_json::to_vec(&m)?;
let mut chunk = Vec::with_capacity(4 + meta.len() + d.len());
let mut chunk = Vec::with_capacity(8 + meta.len() + d.len());
chunk.extend_from_slice(&(meta.len() as u32).to_le_bytes()); // Encode metadata length
chunk.extend_from_slice(&meta);
chunk.extend_from_slice(&(d.len() as u32).to_le_bytes()); // Encode data length
chunk.extend_from_slice(d);
Ok(Bytes::from(chunk))

View File

@ -1,11 +1,14 @@
use crate::domain::compression::{CompressionType, X_COMPRESSION_BUFFER_SIZE, X_COMPRESSION_TYPE};
use actix_http::header::HeaderMap;
use actix_web::web::Payload;
use app_error::AppError;
use async_trait::async_trait;
use byteorder::{ByteOrder, LittleEndian};
use collab_rt_protocol::validate_encode_collab;
use database_entity::dto::CollabParams;
use std::str::FromStr;
use tokio_stream::StreamExt;
#[inline]
pub fn compress_type_from_header_value(headers: &HeaderMap) -> Result<CompressionType, AppError> {
@ -74,3 +77,94 @@ impl CollabValidator for CollabParams {
.map_err(|err| AppError::NoRequiredData(err.to_string()))
}
}
pub struct PayloadReader {
payload: Payload,
buffer: Vec<u8>,
buf_start: usize,
buf_end: usize,
}
impl PayloadReader {
pub fn new(payload: Payload) -> Self {
Self {
payload,
buffer: Vec::new(),
buf_start: 0,
buf_end: 0,
}
}
pub async fn read_exact(&mut self, dest: &mut [u8]) -> actix_web::Result<()> {
let mut written = 0;
while written < dest.len() {
if self.buf_start == self.buf_end {
self.fill_buffer().await?;
}
let current_dest = &mut dest[written..];
let current_src = &self.buffer[self.buf_start..self.buf_end];
let n = copy_buffer(current_src, current_dest);
written += n;
self.buf_start += n;
}
Ok(())
}
pub async fn read_u32_little_endian(&mut self) -> actix_web::Result<u32> {
self.fill_at_least(4).await?;
let bytes: [u8; 4] = [
self.buffer[self.buf_start],
self.buffer[self.buf_start + 1],
self.buffer[self.buf_start + 2],
self.buffer[self.buf_start + 3],
];
self.buf_start += 4;
Ok(LittleEndian::read_u32(&bytes))
}
async fn fill_at_least(&mut self, min_len: usize) -> actix_web::Result<()> {
while self.len() < min_len {
let n = self.fill_buffer().await?;
if n == 0 {
println!("min_len: {}", min_len);
println!("len: {}", self.len());
println!("buf_start: {}", self.buf_start);
println!("buf_end: {}", self.buf_end);
println!("buffer: {:?}", self.buffer.len());
return Err(AppError::InvalidRequest("unexpected EOF".to_string()).into());
}
}
Ok(())
}
fn len(&self) -> usize {
self.buf_end - self.buf_start
}
async fn fill_buffer(&mut self) -> actix_web::Result<usize> {
if self.buf_start == self.buf_end {
self.buffer.clear();
self.buf_start = 0;
self.buf_end = 0;
}
let bytes = self.payload.try_next().await?;
match bytes {
Some(bytes) => {
self.buffer.extend_from_slice(&bytes);
self.buf_end += bytes.len();
Ok(bytes.len())
},
None => Ok(0),
}
}
}
fn copy_buffer(src: &[u8], dest: &mut [u8]) -> usize {
let bytes_to_copy = std::cmp::min(src.len(), dest.len());
dest[..bytes_to_copy].copy_from_slice(&src[..bytes_to_copy]);
bytes_to_copy
}

View File

@ -1,5 +1,4 @@
use std::io::{Cursor, Read};
use crate::api::util::PayloadReader;
use actix_web::web::{Bytes, Payload};
use actix_web::web::{Data, Json, PayloadConfig};
use actix_web::{web, Scope};
@ -20,7 +19,6 @@ use access_control::collab::CollabAccessControl;
use app_error::AppError;
use appflowy_collaborate::actix_ws::entities::ClientStreamMessage;
use authentication::jwt::UserUuid;
use byteorder::{LittleEndian, ReadBytesExt};
use collab_rt_entity::realtime_proto::HttpRealtimeMessage;
use collab_rt_entity::RealtimeMessage;
use collab_rt_protocol::validate_encode_collab;
@ -1003,42 +1001,47 @@ async fn get_published_collab_info_handler(
async fn post_publish_collabs_handler(
workspace_id: web::Path<Uuid>,
user_uuid: UserUuid,
mut payload: Payload,
payload: Payload,
state: Data<AppState>,
) -> Result<Json<AppResponse<()>>> {
let workspace_id = workspace_id.into_inner();
// PublishCollabItem
let mut accumulator = Vec::<PublishCollabItem<serde_json::Value, Vec<u8>>>::new();
let mut payload_reader: PayloadReader = PayloadReader::new(payload);
while let Some(item) = payload.next().await {
let item = item?;
let item_len = item.len();
let mut cursor = Cursor::new(item);
loop {
let meta: PublishCollabMetadata<serde_json::Value> = {
let meta_len = cursor.read_u32::<LittleEndian>()?;
let meta_len = payload_reader.read_u32_little_endian().await?;
println!("meta_len: {}", meta_len);
if meta_len > 4 * 1024 * 1024 {
// 4MB Limit for metadata
return Err(AppError::InvalidRequest(String::from("metadata too large")).into());
}
if meta_len == 0 {
println!("meta_len is 0");
break;
}
let mut meta_buffer = vec![0; meta_len as usize];
cursor.read_exact(&mut meta_buffer)?;
payload_reader.read_exact(&mut meta_buffer).await?;
serde_json::from_slice(&meta_buffer)?
};
let data: Vec<u8> = {
let remain_len = item_len - cursor.position() as usize;
let mut data_buffer = vec![0; remain_len];
cursor.read_exact(&mut data_buffer)?;
println!("meta: {:?}", meta);
let data = {
let data_len = payload_reader.read_u32_little_endian().await?;
println!("data_len: {}", data_len);
if data_len > 128 * 1024 * 1024 {
// 128MB Limit for data
return Err(AppError::InvalidRequest(String::from("data too large")).into());
}
let mut data_buffer = vec![0; data_len as usize];
payload_reader.read_exact(&mut data_buffer).await?;
data_buffer
};
accumulator.push(PublishCollabItem { meta, data });
println!("data: {:?}", data);
assert_eq!(
cursor.position() as usize,
cursor.get_ref().len(),
"Cursor is not empty after reading"
);
accumulator.push(PublishCollabItem { meta, data });
}
if accumulator.is_empty() {