chore: fix incompleted response (#841)

This commit is contained in:
Nathan.fooo 2024-09-24 23:50:45 +08:00 committed by GitHub
parent e499c4c720
commit b023e2f511
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 45 additions and 2 deletions

View File

@ -29,6 +29,7 @@ where
}
let stream = resp.bytes_stream().map_err(AppResponseError::from);
let stream = check_first_item_response_error(stream).await?;
Ok(JsonStream::new(stream))
}
@ -42,6 +43,7 @@ where
}
let stream = resp.bytes_stream().map_err(AppResponseError::from);
let stream = check_first_item_response_error(stream).await?;
Ok(NewlineStream::new(stream))
}
@ -79,7 +81,6 @@ impl<T> JsonStream<T> {
}
}
}
impl<T> Stream for JsonStream<T>
where
T: DeserializeOwned,
@ -89,32 +90,74 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
// Poll for the next chunk of data from the underlying stream
match ready!(this.stream.as_mut().poll_next(cx)) {
Some(Ok(bytes)) => {
// Append the new bytes to the buffer
this.buffer.extend_from_slice(&bytes);
// Create a StreamDeserializer to deserialize the bytes into T
let de = StreamDeserializer::new(SliceRead::new(this.buffer));
let mut iter = de.into_iter();
// Check if there's a valid deserialized object in the stream
if let Some(result) = iter.next() {
return match result {
Ok(value) => {
// Determine the offset of the successfully deserialized data
let remaining = iter.byte_offset();
// Drain the buffer up to the byte offset to remove the consumed bytes
this.buffer.drain(0..remaining);
Poll::Ready(Some(Ok(value)))
},
Err(err) => {
// Handle EOF gracefully by checking if the error indicates incomplete data
if err.is_eof() {
// If EOF, but not enough data to complete the object, wait for more data
Poll::Pending
} else {
// If the error is not EOF, return it
Poll::Ready(Some(Err(AppResponseError::from(err))))
}
},
};
} else {
// If no complete object is ready yet, wait for more data
Poll::Pending
}
},
Some(Err(err)) => Poll::Ready(Some(Err(err))),
None => Poll::Ready(None),
None => {
// Handle the case when the stream has ended but the buffer still has incomplete data
if this.buffer.is_empty() {
Poll::Ready(None)
} else {
// Try to deserialize any remaining data in the buffer
let de = StreamDeserializer::new(SliceRead::new(this.buffer));
let mut iter = de.into_iter();
if let Some(result) = iter.next() {
match result {
Ok(value) => {
let remaining = iter.byte_offset();
this.buffer.drain(0..remaining);
Poll::Ready(Some(Ok(value)))
},
Err(err) => {
if err.is_eof() {
// If EOF and buffer is incomplete, return None to indicate stream end
Poll::Ready(None)
} else {
// Return any other errors that occur during deserialization
Poll::Ready(Some(Err(AppResponseError::from(err))))
}
},
}
} else {
Poll::Ready(None)
}
}
},
}
}
}