diff --git a/libs/client-api/src/http_publish.rs b/libs/client-api/src/http_publish.rs index 2c5dbf82..a9bf0235 100644 --- a/libs/client-api/src/http_publish.rs +++ b/libs/client-api/src/http_publish.rs @@ -1,13 +1,6 @@ -use std::{ - pin::Pin, - task::{Context, Poll}, -}; - use bytes::Bytes; -use client_api_entity::{PublishCollabItem, PublishInfo, UpdatePublishNamespace}; -use futures::Stream; -use reqwest::{Body, Method}; -use serde::Serialize; +use client_api_entity::{PublishInfo, UpdatePublishNamespace}; +use reqwest::Method; use shared_entity::response::{AppResponse, AppResponseError}; use crate::Client; @@ -54,28 +47,6 @@ impl Client { .into_data() } - pub async fn publish_collabs( - &self, - workspace_id: &str, - items: Vec>, - ) -> Result<(), AppResponseError> - where - Metadata: serde::Serialize + Send + 'static + Unpin, - Data: AsRef<[u8]> + Send + 'static + Unpin, - { - let publish_collab_stream = PublishCollabItemStream::new(items); - let url = format!("{}/api/workspace/{}/publish", self.base_url, workspace_id,); - let resp = self - .http_client_with_auth(Method::POST, &url) - .await? - .header("Content-Type", "octet-stream") - .header("Transfer-Encoding", "chunked") - .body(Body::wrap_stream(publish_collab_stream)) - .send() - .await?; - AppResponse::<()>::from_response(resp).await?.into_error() - } - pub async fn unpublish_collabs( &self, workspace_id: &str, @@ -161,56 +132,3 @@ impl Client { Ok(bytes) } } - -pub struct PublishCollabItemStream { - items: Vec>, - idx: usize, -} - -impl PublishCollabItemStream { - pub fn new(publish_collab_items: Vec>) -> Self { - PublishCollabItemStream { - items: publish_collab_items, - idx: 0, - } - } -} - -impl Stream for PublishCollabItemStream -where - Metadata: Serialize + Send + 'static + Unpin, - Data: AsRef<[u8]> + Send + 'static + Unpin, -{ - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - let mut self_mut = self.as_mut(); - - if self_mut.idx >= self_mut.items.len() { - return Poll::Ready(None); - } - - let item = &self_mut.items[self_mut.idx]; - match serialize_metadata_data(&item.meta, item.data.as_ref()) { - Err(e) => Poll::Ready(Some(Err(e))), - Ok(chunk) => { - self_mut.idx += 1; - Poll::Ready(Some(Ok::(chunk))) - }, - } - } -} - -fn serialize_metadata_data(m: Metadata, d: &[u8]) -> Result -where - Metadata: Serialize, -{ - let meta = serde_json::to_vec(&m)?; - - let mut chunk = Vec::with_capacity(4 + meta.len() + d.len()); - chunk.extend_from_slice(&(meta.len() as u32).to_le_bytes()); // Encode metadata length - chunk.extend_from_slice(&meta); - chunk.extend_from_slice(d); - - Ok(Bytes::from(chunk)) -} diff --git a/libs/client-api/src/native/http_native.rs b/libs/client-api/src/native/http_native.rs index 08561d1a..2b64414b 100644 --- a/libs/client-api/src/native/http_native.rs +++ b/libs/client-api/src/native/http_native.rs @@ -7,19 +7,24 @@ use anyhow::anyhow; use app_error::AppError; use async_trait::async_trait; -use client_api_entity::{CollabParams, QueryCollabParams}; +use bytes::Bytes; +use client_api_entity::{CollabParams, PublishCollabItem, QueryCollabParams}; use client_api_entity::{ CompleteUploadRequest, CreateUploadRequest, CreateUploadResponse, UploadPartResponse, }; use collab_rt_entity::HttpRealtimeMessage; +use futures::Stream; use futures_util::stream; use prost::Message; use reqwest::{Body, Method}; +use serde::Serialize; use shared_entity::dto::workspace_dto::CollabResponse; use shared_entity::response::{AppResponse, AppResponseError}; use std::future::Future; +use std::pin::Pin; use std::sync::atomic::Ordering; +use std::task::{Context, Poll}; use std::time::Duration; use tokio_retry::strategy::{ExponentialBackoff, FixedInterval}; use tokio_retry::{Retry, RetryIf}; @@ -247,6 +252,28 @@ impl Client { }, } } + + pub async fn publish_collabs( + &self, + workspace_id: &str, + items: Vec>, + ) -> Result<(), AppResponseError> + where + Metadata: serde::Serialize + Send + 'static + Unpin, + Data: AsRef<[u8]> + Send + 'static + Unpin, + { + let publish_collab_stream = PublishCollabItemStream::new(items); + let url = format!("{}/api/workspace/{}/publish", self.base_url, workspace_id,); + let resp = self + .http_client_with_auth(Method::POST, &url) + .await? + .header("Content-Type", "octet-stream") + .header("Transfer-Encoding", "chunked") + .body(Body::wrap_stream(publish_collab_stream)) + .send() + .await?; + AppResponse::<()>::from_response(resp).await?.into_error() + } } #[async_trait] @@ -286,3 +313,56 @@ where { tokio::spawn(future) } + +pub struct PublishCollabItemStream { + items: Vec>, + idx: usize, +} + +impl PublishCollabItemStream { + pub fn new(publish_collab_items: Vec>) -> Self { + PublishCollabItemStream { + items: publish_collab_items, + idx: 0, + } + } +} + +impl Stream for PublishCollabItemStream +where + Metadata: Serialize + Send + 'static + Unpin, + Data: AsRef<[u8]> + Send + 'static + Unpin, +{ + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + let mut self_mut = self.as_mut(); + + if self_mut.idx >= self_mut.items.len() { + return Poll::Ready(None); + } + + let item = &self_mut.items[self_mut.idx]; + match serialize_metadata_data(&item.meta, item.data.as_ref()) { + Err(e) => Poll::Ready(Some(Err(e))), + Ok(chunk) => { + self_mut.idx += 1; + Poll::Ready(Some(Ok::(chunk))) + }, + } + } +} + +fn serialize_metadata_data(m: Metadata, d: &[u8]) -> Result +where + Metadata: Serialize, +{ + let meta = serde_json::to_vec(&m)?; + + let mut chunk = Vec::with_capacity(4 + meta.len() + d.len()); + chunk.extend_from_slice(&(meta.len() as u32).to_le_bytes()); // Encode metadata length + chunk.extend_from_slice(&meta); + chunk.extend_from_slice(d); + + Ok(Bytes::from(chunk)) +}