chore: chat stream timeout (#1137)

* chore: increase timeout

* chore: keepalive event
This commit is contained in:
Nathan.fooo 2025-01-08 01:11:25 +08:00 committed by GitHub
parent e0bc8f814b
commit 2bd6da228d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 13 additions and 6 deletions

View File

@ -263,20 +263,21 @@ impl AppFlowyAIClient {
rag_ids, rag_ids,
}, },
}; };
self.stream_question_v3(model, json).await self.stream_question_v3(model, json, Some(30)).await
} }
pub async fn stream_question_v3( pub async fn stream_question_v3(
&self, &self,
model: &AIModel, model: &AIModel,
question: ChatQuestion, question: ChatQuestion,
timeout_secs: Option<u64>,
) -> Result<impl Stream<Item = Result<Bytes, AIError>>, AIError> { ) -> Result<impl Stream<Item = Result<Bytes, AIError>>, AIError> {
let url = format!("{}/v2/chat/message/stream", self.url); let url = format!("{}/v2/chat/message/stream", self.url);
let resp = self let resp = self
.async_http_client(Method::POST, &url)? .async_http_client(Method::POST, &url)?
.header(AI_MODEL_HEADER_KEY, model.to_str()) .header(AI_MODEL_HEADER_KEY, model.to_str())
.json(&question) .json(&question)
.timeout(Duration::from_secs(30)) .timeout(Duration::from_secs(timeout_secs.unwrap_or(30)))
.send() .send()
.await?; .await?;
AIResponse::<()>::stream_response(resp).await AIResponse::<()>::stream_response(resp).await

View File

@ -8,6 +8,7 @@ use std::str::FromStr;
pub const STREAM_METADATA_KEY: &str = "0"; pub const STREAM_METADATA_KEY: &str = "0";
pub const STREAM_ANSWER_KEY: &str = "1"; pub const STREAM_ANSWER_KEY: &str = "1";
pub const STREAM_IMAGE_KEY: &str = "2"; pub const STREAM_IMAGE_KEY: &str = "2";
pub const STREAM_KEEP_ALIVE_KEY: &str = "3";
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SummarizeRowResponse { pub struct SummarizeRowResponse {
pub text: String, pub text: String,

View File

@ -1271,6 +1271,7 @@ pub async fn collect_answer(mut stream: QuestionStream) -> String {
answer.push_str(&value); answer.push_str(&value);
}, },
QuestionStreamValue::Metadata { .. } => {}, QuestionStreamValue::Metadata { .. } => {},
QuestionStreamValue::KeepAlive => {},
} }
} }
answer answer

View File

@ -12,7 +12,7 @@ use reqwest::Method;
use serde_json::Value; use serde_json::Value;
use shared_entity::dto::ai_dto::{ use shared_entity::dto::ai_dto::{
CalculateSimilarityParams, ChatQuestionQuery, RepeatedRelatedQuestion, SimilarityResponse, CalculateSimilarityParams, ChatQuestionQuery, RepeatedRelatedQuestion, SimilarityResponse,
STREAM_ANSWER_KEY, STREAM_IMAGE_KEY, STREAM_METADATA_KEY, STREAM_ANSWER_KEY, STREAM_IMAGE_KEY, STREAM_KEEP_ALIVE_KEY, STREAM_METADATA_KEY,
}; };
use shared_entity::dto::chat_dto::{ChatSettings, UpdateChatParams}; use shared_entity::dto::chat_dto::{ChatSettings, UpdateChatParams};
use shared_entity::response::{AppResponse, AppResponseError}; use shared_entity::response::{AppResponse, AppResponseError};
@ -366,6 +366,7 @@ pub enum QuestionStreamValue {
Metadata { Metadata {
value: serde_json::Value, value: serde_json::Value,
}, },
KeepAlive,
} }
impl Stream for QuestionStream { impl Stream for QuestionStream {
type Item = Result<QuestionStreamValue, AppResponseError>; type Item = Result<QuestionStreamValue, AppResponseError>;
@ -394,6 +395,10 @@ impl Stream for QuestionStream {
return Poll::Ready(Some(Ok(QuestionStreamValue::Answer { value: image }))); return Poll::Ready(Some(Ok(QuestionStreamValue::Answer { value: image })));
} }
if value.remove(STREAM_KEEP_ALIVE_KEY).is_some() {
return Poll::Ready(Some(Ok(QuestionStreamValue::KeepAlive)));
}
error!("Invalid streaming value: {:?}", value); error!("Invalid streaming value: {:?}", value);
Poll::Ready(None) Poll::Ready(None)
}, },

View File

@ -92,7 +92,6 @@ where
// Poll for the next chunk of data from the underlying stream // Poll for the next chunk of data from the underlying stream
match ready!(this.stream.as_mut().poll_next(cx)) { match ready!(this.stream.as_mut().poll_next(cx)) {
Some(Ok(bytes)) => { Some(Ok(bytes)) => {
// Append the new bytes to the buffer
this.buffer.extend_from_slice(&bytes); this.buffer.extend_from_slice(&bytes);
// Create a StreamDeserializer to deserialize the bytes into T // Create a StreamDeserializer to deserialize the bytes into T
@ -112,7 +111,6 @@ where
return Poll::Pending; return Poll::Pending;
}, },
Some(Err(err)) => { Some(Err(err)) => {
// Return other deserialization errors wrapped in SE
return Poll::Ready(Some(Err(err.into()))); return Poll::Ready(Some(Err(err.into())));
}, },
None => { None => {

View File

@ -380,7 +380,7 @@ async fn answer_stream_v3_handler(
trace!("[Chat] stream v3 {:?}", question); trace!("[Chat] stream v3 {:?}", question);
match state match state
.ai_client .ai_client
.stream_question_v3(&ai_model, question) .stream_question_v3(&ai_model, question, Some(60))
.await .await
{ {
Ok(answer_stream) => { Ok(answer_stream) => {

View File

@ -450,6 +450,7 @@ async fn collect_answer(mut stream: QuestionStream) -> String {
answer.push_str(&value); answer.push_str(&value);
}, },
QuestionStreamValue::Metadata { .. } => {}, QuestionStreamValue::Metadata { .. } => {},
QuestionStreamValue::KeepAlive => {},
} }
} }
answer answer