fix: return ai stream error when service unavailable (#673)

This commit is contained in:
Nathan.fooo 2024-07-02 13:24:40 +08:00 committed by GitHub
parent 8c634551fd
commit c0bc7020d9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 43 additions and 18 deletions

View File

@ -119,6 +119,9 @@ pub enum AppError {
#[error("{0}")] #[error("{0}")]
PublishNamespaceAlreadyTaken(String), PublishNamespaceAlreadyTaken(String),
#[error("{0}")]
AIServiceUnavailable(String),
} }
impl AppError { impl AppError {
@ -178,6 +181,7 @@ impl AppError {
AppError::OverrideWithIncorrectData(_) => ErrorCode::OverrideWithIncorrectData, AppError::OverrideWithIncorrectData(_) => ErrorCode::OverrideWithIncorrectData,
AppError::Utf8Error(_) => ErrorCode::Internal, AppError::Utf8Error(_) => ErrorCode::Internal,
AppError::PublishNamespaceAlreadyTaken(_) => ErrorCode::PublishNamespaceAlreadyTaken, AppError::PublishNamespaceAlreadyTaken(_) => ErrorCode::PublishNamespaceAlreadyTaken,
AppError::AIServiceUnavailable(_) => ErrorCode::AIServiceUnavailable,
} }
} }
} }
@ -288,6 +292,7 @@ pub enum ErrorCode {
OverrideWithIncorrectData = 1029, OverrideWithIncorrectData = 1029,
PublishNamespaceNotSet = 1030, PublishNamespaceNotSet = 1030,
PublishNamespaceAlreadyTaken = 1031, PublishNamespaceAlreadyTaken = 1031,
AIServiceUnavailable = 1032,
} }
impl ErrorCode { impl ErrorCode {

View File

@ -1,10 +1,12 @@
use crate::api::util::ai_model_from_header; use crate::api::util::ai_model_from_header;
use crate::state::AppState; use crate::state::AppState;
use actix_web::web::{Data, Json}; use actix_web::web::{Data, Json};
use actix_web::{web, HttpRequest, HttpResponse, Scope}; use actix_web::{web, HttpRequest, HttpResponse, Scope};
use app_error::AppError; use app_error::AppError;
use appflowy_ai_client::dto::{CompleteTextResponse, TranslateRowParams, TranslateRowResponse}; use appflowy_ai_client::dto::{CompleteTextResponse, TranslateRowParams, TranslateRowResponse};
use futures_util::TryStreamExt;
use futures_util::{stream, TryStreamExt};
use shared_entity::dto::ai_dto::{ use shared_entity::dto::ai_dto::{
CompleteTextParams, SummarizeRowData, SummarizeRowParams, SummarizeRowResponse, CompleteTextParams, SummarizeRowData, SummarizeRowParams, SummarizeRowResponse,
}; };
@ -42,16 +44,24 @@ async fn stream_complete_text_handler(
) -> actix_web::Result<HttpResponse> { ) -> actix_web::Result<HttpResponse> {
let ai_model = ai_model_from_header(&req); let ai_model = ai_model_from_header(&req);
let params = payload.into_inner(); let params = payload.into_inner();
let stream = state match state
.ai_client .ai_client
.stream_completion_text(&params.text, params.completion_type, ai_model) .stream_completion_text(&params.text, params.completion_type, ai_model)
.await .await
.map_err(|err| AppError::Internal(err.into()))?; {
Ok( Ok(stream) => Ok(
HttpResponse::Ok() HttpResponse::Ok()
.content_type("text/event-stream") .content_type("text/event-stream")
.streaming(stream.map_err(AppError::from)), .streaming(stream.map_err(AppError::from)),
) ),
Err(err) => Ok(
HttpResponse::Ok()
.content_type("text/event-stream")
.streaming(stream::once(async move {
Err(AppError::AIServiceUnavailable(err.to_string()))
})),
),
}
} }
#[instrument(level = "debug", skip(state, payload), err)] #[instrument(level = "debug", skip(state, payload), err)]

View File

@ -15,12 +15,12 @@ use database_entity::dto::{
GetChatMessageParams, MessageCursor, RepeatedChatMessage, UpdateChatMessageContentParams, GetChatMessageParams, MessageCursor, RepeatedChatMessage, UpdateChatMessageContentParams,
}; };
use futures::Stream; use futures::Stream;
use futures_util::stream;
use futures_util::{FutureExt, TryStreamExt}; use futures_util::{FutureExt, TryStreamExt};
use pin_project::pin_project; use pin_project::pin_project;
use shared_entity::response::{AppResponse, JsonAppResponse}; use shared_entity::response::{AppResponse, JsonAppResponse};
use std::collections::HashMap; use std::collections::HashMap;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use tokio::sync::oneshot; use tokio::sync::oneshot;
use tokio::task; use tokio::task;
@ -28,6 +28,7 @@ use tokio::task;
use database::chat; use database::chat;
use crate::api::util::ai_model_from_header; use crate::api::util::ai_model_from_header;
use database::chat::chat_ops::insert_answer_message; use database::chat::chat_ops::insert_answer_message;
use tracing::{instrument, trace}; use tracing::{instrument, trace};
use validator::Validate; use validator::Validate;
@ -208,18 +209,27 @@ async fn answer_stream_handler(
let (_workspace_id, chat_id, question_id) = path.into_inner(); let (_workspace_id, chat_id, question_id) = path.into_inner();
let content = chat::chat_ops::select_chat_message_content(&state.pg_pool, question_id).await?; let content = chat::chat_ops::select_chat_message_content(&state.pg_pool, question_id).await?;
let ai_model = ai_model_from_header(&req); let ai_model = ai_model_from_header(&req);
let answer_stream = state match state
.ai_client .ai_client
.stream_question(&chat_id, &content, &ai_model) .stream_question(&chat_id, &content, &ai_model)
.await .await
.map_err(|err| AppError::Internal(err.into()))?; {
Ok(answer_stream) => {
let new_answer_stream = answer_stream.map_err(AppError::from); let new_answer_stream = answer_stream.map_err(AppError::from);
Ok( Ok(
HttpResponse::Ok() HttpResponse::Ok()
.content_type("text/event-stream") .content_type("text/event-stream")
.streaming(new_answer_stream), .streaming(new_answer_stream),
) )
},
Err(err) => Ok(
HttpResponse::Ok()
.content_type("text/event-stream")
.streaming(stream::once(async move {
Err(AppError::AIServiceUnavailable(err.to_string()))
})),
),
}
} }
#[instrument(level = "debug", skip_all, err)] #[instrument(level = "debug", skip_all, err)]