diff --git a/libs/shared-entity/src/response_stream.rs b/libs/shared-entity/src/response_stream.rs index b62cc3cd..9a14aaf8 100644 --- a/libs/shared-entity/src/response_stream.rs +++ b/libs/shared-entity/src/response_stream.rs @@ -29,6 +29,7 @@ where } let stream = resp.bytes_stream().map_err(AppResponseError::from); + let stream = check_first_item_response_error(stream).await?; Ok(JsonStream::new(stream)) } @@ -42,6 +43,7 @@ where } let stream = resp.bytes_stream().map_err(AppResponseError::from); + let stream = check_first_item_response_error(stream).await?; Ok(NewlineStream::new(stream)) } @@ -79,7 +81,6 @@ impl JsonStream { } } } - impl Stream for JsonStream where T: DeserializeOwned, @@ -89,32 +90,74 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); + // Poll for the next chunk of data from the underlying stream match ready!(this.stream.as_mut().poll_next(cx)) { Some(Ok(bytes)) => { + // Append the new bytes to the buffer this.buffer.extend_from_slice(&bytes); + + // Create a StreamDeserializer to deserialize the bytes into T let de = StreamDeserializer::new(SliceRead::new(this.buffer)); let mut iter = de.into_iter(); + + // Check if there's a valid deserialized object in the stream if let Some(result) = iter.next() { return match result { Ok(value) => { + // Determine the offset of the successfully deserialized data let remaining = iter.byte_offset(); + // Drain the buffer up to the byte offset to remove the consumed bytes this.buffer.drain(0..remaining); Poll::Ready(Some(Ok(value))) }, Err(err) => { + // Handle EOF gracefully by checking if the error indicates incomplete data if err.is_eof() { + // If EOF, but not enough data to complete the object, wait for more data Poll::Pending } else { + // If the error is not EOF, return it Poll::Ready(Some(Err(AppResponseError::from(err)))) } }, }; } else { + // If no complete object is ready yet, wait for more data Poll::Pending } }, Some(Err(err)) => Poll::Ready(Some(Err(err))), - None => Poll::Ready(None), + None => { + // Handle the case when the stream has ended but the buffer still has incomplete data + if this.buffer.is_empty() { + Poll::Ready(None) + } else { + // Try to deserialize any remaining data in the buffer + let de = StreamDeserializer::new(SliceRead::new(this.buffer)); + let mut iter = de.into_iter(); + + if let Some(result) = iter.next() { + match result { + Ok(value) => { + let remaining = iter.byte_offset(); + this.buffer.drain(0..remaining); + Poll::Ready(Some(Ok(value))) + }, + Err(err) => { + if err.is_eof() { + // If EOF and buffer is incomplete, return None to indicate stream end + Poll::Ready(None) + } else { + // Return any other errors that occur during deserialization + Poll::Ready(Some(Err(AppResponseError::from(err)))) + } + }, + } + } else { + Poll::Ready(None) + } + } + }, } } }