diff --git a/libs/shared-entity/src/response_stream.rs b/libs/shared-entity/src/response_stream.rs index 1ab91138..6d6533f0 100644 --- a/libs/shared-entity/src/response_stream.rs +++ b/libs/shared-entity/src/response_stream.rs @@ -13,6 +13,8 @@ use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; +use futures::stream::StreamExt; + impl AppResponse where T: DeserializeOwned + 'static, @@ -42,6 +44,7 @@ where let stream = resp.bytes_stream().map_err(AppResponseError::from); Ok(NewlineStream::new(stream)) } + pub async fn answer_response_stream( resp: reqwest::Response, ) -> Result>, AppResponseError> { @@ -52,6 +55,7 @@ where } let stream = resp.bytes_stream().map_err(AppResponseError::from); + let stream = check_first_item_response_error(stream).await?; Ok(stream) } } @@ -266,3 +270,18 @@ impl Stream for AnswerStream { } } } + +async fn check_first_item_response_error( + stream: impl Stream> + Unpin, +) -> Result>, AppResponseError> { + let mut stream = stream.peekable(); + if let Some(first_item) = Pin::new(&mut stream).peek().await { + let first_item = first_item.as_ref().map_err(|e| e.clone())?; + if let Ok(app_err) = serde_json::from_slice::(first_item) { + if app_err.code != ErrorCode::Ok { + return Err(app_err); + } + }; + } + Ok(stream) +}