From ab9932a943196a828557eeec44d9865bc6ad241c Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Fri, 17 Jan 2025 13:52:42 +0800 Subject: [PATCH] chore: add ai metrics (#1169) --- libs/appflowy-ai-client/src/dto.rs | 8 ++++- src/api/chat.rs | 58 +++++++++++++++++++----------- src/biz/chat/metrics.rs | 46 ++++++++++++++++++++++++ src/biz/chat/mod.rs | 1 + src/state.rs | 4 +++ 5 files changed, 95 insertions(+), 22 deletions(-) create mode 100644 src/biz/chat/metrics.rs diff --git a/libs/appflowy-ai-client/src/dto.rs b/libs/appflowy-ai-client/src/dto.rs index c38fb025..b602d9dc 100644 --- a/libs/appflowy-ai-client/src/dto.rs +++ b/libs/appflowy-ai-client/src/dto.rs @@ -53,7 +53,7 @@ pub enum OutputLayout { SimpleTable = 3, } -#[derive(Clone, Debug, Default, Serialize_repr, Deserialize_repr)] +#[derive(Clone, Debug, Default, Serialize_repr, Deserialize_repr, Eq, PartialEq)] #[repr(u8)] pub enum OutputContent { #[default] @@ -62,6 +62,12 @@ pub enum OutputContent { RichTextImage = 2, } +impl OutputContent { + pub fn is_image(&self) -> bool { + *self == OutputContent::IMAGE || *self == OutputContent::RichTextImage + } +} + #[derive(Clone, Default, Debug, Serialize, Deserialize)] pub struct OutputContentMetadata { /// Custom prompt for image generation. diff --git a/src/api/chat.rs b/src/api/chat.rs index fca0dc8c..965e7214 100644 --- a/src/api/chat.rs +++ b/src/api/chat.rs @@ -271,6 +271,7 @@ async fn answer_stream_handler( chat::chat_ops::select_chat_message_content(&state.pg_pool, question_id).await?; let rag_ids = chat::chat_ops::select_chat_rag_ids(&state.pg_pool, &chat_id).await?; let ai_model = ai_model_from_header(&req); + state.metrics.ai_metrics.record_total_stream_count(1); match state .ai_client .stream_question( @@ -291,13 +292,16 @@ async fn answer_stream_handler( .streaming(new_answer_stream), ) }, - Err(err) => Ok( - HttpResponse::Ok() - .content_type("text/event-stream") - .streaming(stream::once(async move { - Err(AppError::AIServiceUnavailable(err.to_string())) - })), - ), + Err(err) => { + state.metrics.ai_metrics.record_failed_stream_count(1); + Ok( + HttpResponse::Ok() + .content_type("text/event-stream") + .streaming(stream::once(async move { + Err(AppError::AIServiceUnavailable(err.to_string())) + })), + ) + }, } } @@ -313,6 +317,7 @@ async fn answer_stream_v2_handler( let rag_ids = chat::chat_ops::select_chat_rag_ids(&state.pg_pool, &chat_id).await?; let ai_model = ai_model_from_header(&req); + state.metrics.ai_metrics.record_total_stream_count(1); trace!( "[Chat] stream answer for chat: {}, question: {}, rag_ids: {:?}", chat_id, @@ -340,13 +345,16 @@ async fn answer_stream_v2_handler( .streaming(new_answer_stream), ) }, - Err(err) => Ok( - HttpResponse::ServiceUnavailable() - .content_type("text/event-stream") - .streaming(stream::once(async move { - Err(AppError::AIServiceUnavailable(err.to_string())) - })), - ), + Err(err) => { + state.metrics.ai_metrics.record_failed_stream_count(1); + Ok( + HttpResponse::ServiceUnavailable() + .content_type("text/event-stream") + .streaming(stream::once(async move { + Err(AppError::AIServiceUnavailable(err.to_string())) + })), + ) + }, } } @@ -363,6 +371,10 @@ async fn answer_stream_v3_handler( chat::chat_ops::select_chat_message_content(&state.pg_pool, payload.question_id).await?; let rag_ids = chat::chat_ops::select_chat_rag_ids(&state.pg_pool, &payload.chat_id).await?; let ai_model = ai_model_from_header(&req); + state.metrics.ai_metrics.record_total_stream_count(1); + if payload.format.output_content.is_image() { + state.metrics.ai_metrics.record_stream_image_count(1); + } let question = ChatQuestion { chat_id: payload.chat_id, @@ -377,6 +389,7 @@ async fn answer_stream_v3_handler( rag_ids, }, }; + trace!("[Chat] stream v3 {:?}", question); match state .ai_client @@ -391,13 +404,16 @@ async fn answer_stream_v3_handler( .streaming(new_answer_stream), ) }, - Err(err) => Ok( - HttpResponse::ServiceUnavailable() - .content_type("text/event-stream") - .streaming(stream::once(async move { - Err(AppError::AIServiceUnavailable(err.to_string())) - })), - ), + Err(err) => { + state.metrics.ai_metrics.record_failed_stream_count(1); + Ok( + HttpResponse::ServiceUnavailable() + .content_type("text/event-stream") + .streaming(stream::once(async move { + Err(AppError::AIServiceUnavailable(err.to_string())) + })), + ) + }, } } diff --git a/src/biz/chat/metrics.rs b/src/biz/chat/metrics.rs new file mode 100644 index 00000000..3d6fdcf1 --- /dev/null +++ b/src/biz/chat/metrics.rs @@ -0,0 +1,46 @@ +use prometheus_client::metrics::counter::Counter; + +#[derive(Default, Clone)] +pub struct AIMetrics { + total_stream_count: Counter, + failed_stream_count: Counter, + stream_image_count: Counter, +} + +impl AIMetrics { + pub fn register(registry: &mut prometheus_client::registry::Registry) -> Self { + let metrics = Self::default(); + let realtime_registry = registry.sub_registry_with_prefix("ai"); + + // Register each metric with the Prometheus registry + realtime_registry.register( + "total_stream_count", + "Total count of streams processed", + metrics.total_stream_count.clone(), + ); + realtime_registry.register( + "failed_stream_count", + "Total count of failed streams", + metrics.failed_stream_count.clone(), + ); + realtime_registry.register( + "image_stream_count", + "Total count of image streams processed", + metrics.stream_image_count.clone(), + ); + + metrics + } + + pub fn record_total_stream_count(&self, count: u64) { + self.total_stream_count.inc_by(count); + } + + pub fn record_failed_stream_count(&self, count: u64) { + self.failed_stream_count.inc_by(count); + } + + pub fn record_stream_image_count(&self, count: u64) { + self.stream_image_count.inc_by(count); + } +} diff --git a/src/biz/chat/mod.rs b/src/biz/chat/mod.rs index 01eafd2e..88c903a4 100644 --- a/src/biz/chat/mod.rs +++ b/src/biz/chat/mod.rs @@ -1 +1,2 @@ +pub mod metrics; pub mod ops; diff --git a/src/state.rs b/src/state.rs index 661caa91..2c7f0cf2 100644 --- a/src/state.rs +++ b/src/state.rs @@ -28,6 +28,7 @@ use snowflake::Snowflake; use tonic_proto::history::history_client::HistoryClient; use crate::api::metrics::{AppFlowyWebMetrics, PublishedCollabMetrics, RequestMetrics}; +use crate::biz::chat::metrics::AIMetrics; use crate::biz::pg_listener::PgListeners; use crate::biz::workspace::publish::PublishedCollabStore; use crate::config::config::Config; @@ -130,6 +131,7 @@ pub struct AppMetrics { pub appflowy_web_metrics: Arc, pub embedding_metrics: Arc, pub collab_stream_metrics: Arc, + pub ai_metrics: Arc, } impl Default for AppMetrics { @@ -149,6 +151,7 @@ impl AppMetrics { let appflowy_web_metrics = Arc::new(AppFlowyWebMetrics::register(&mut registry)); let embedding_metrics = Arc::new(EmbeddingMetrics::register(&mut registry)); let collab_stream_metrics = Arc::new(CollabStreamMetrics::register(&mut registry)); + let ai_metrics = Arc::new(AIMetrics::register(&mut registry)); Self { registry: Arc::new(registry), request_metrics, @@ -159,6 +162,7 @@ impl AppMetrics { appflowy_web_metrics, embedding_metrics, collab_stream_metrics, + ai_metrics, } } }