From 5195dae3a5489742228875bcb3adc48f8f5f4b7d Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Tue, 7 Jan 2025 12:46:55 +0800 Subject: [PATCH] feat: Chat file metadata (#1133) * chore: chat question metadata * chore: update * chore: update env * chore: update env --- .github/workflows/integration_test.yml | 7 ++-- deploy.env | 23 +++++++---- dev.env | 24 +++++++---- docker-compose-ci.yml | 20 +++++----- docker-compose.yml | 15 ++++--- libs/appflowy-ai-client/src/client.rs | 24 ++++++++--- libs/appflowy-ai-client/src/dto.rs | 13 ++++-- .../tests/chat_test/context_test.rs | 9 ++++- .../tests/chat_test/qa_test.rs | 14 ++++--- libs/client-api-test/src/log.rs | 2 +- libs/database/src/file/file_storage.rs | 18 ++++----- .../appflowy-collaborate/src/application.rs | 2 +- services/appflowy-collaborate/src/config.rs | 2 +- services/appflowy-worker/src/application.rs | 2 +- src/api/chat.rs | 40 +++++++++++++++---- src/api/file_storage.rs | 10 ++--- src/application.rs | 2 +- src/biz/chat/ops.rs | 7 +++- src/config/config.rs | 4 +- 19 files changed, 157 insertions(+), 81 deletions(-) diff --git a/.github/workflows/integration_test.yml b/.github/workflows/integration_test.yml index 4103be3d..9bf12bff 100644 --- a/.github/workflows/integration_test.yml +++ b/.github/workflows/integration_test.yml @@ -98,9 +98,10 @@ jobs: sed -i 's|GOTRUE_RATE_LIMIT_EMAIL_SENT=100|GOTRUE_RATE_LIMIT_EMAIL_SENT=1000|' .env sed -i 's|APPFLOWY_MAILER_SMTP_USERNAME=.*|APPFLOWY_MAILER_SMTP_USERNAME=${{ secrets.CI_GOTRUE_SMTP_USER }}|' .env sed -i 's|APPFLOWY_MAILER_SMTP_PASSWORD=.*|APPFLOWY_MAILER_SMTP_PASSWORD=${{ secrets.CI_GOTRUE_SMTP_PASS }}|' .env - sed -i 's|APPFLOWY_AI_OPENAI_API_KEY=.*|APPFLOWY_AI_OPENAI_API_KEY=${{ secrets.CI_OPENAI_API_KEY }}|' .env - sed -i "s|LOCAL_AI_AWS_ACCESS_KEY_ID=.*|LOCAL_AI_AWS_ACCESS_KEY_ID=${{ secrets.LOCAL_AI_AWS_ACCESS_KEY_ID }}|" .env - sed -i "s|LOCAL_AI_AWS_SECRET_ACCESS_KEY=.*|LOCAL_AI_AWS_SECRET_ACCESS_KEY=${{ secrets.LOCAL_AI_AWS_SECRET_ACCESS_KEY }}|" .env + sed -i 's|AI_OPENAI_API_KEY=.*|AI_OPENAI_API_KEY=${{ secrets.CI_OPENAI_API_KEY }}|' .env + sed -i "s|AI_AWS_ACCESS_KEY_ID=.*|AI_AWS_ACCESS_KEY_ID=${{ secrets.LOCAL_AI_AWS_ACCESS_KEY_ID }}|" .env + sed -i "s|AI_AWS_SECRET_ACCESS_KEY=.*|AI_AWS_SECRET_ACCESS_KEY=${{ secrets.LOCAL_AI_AWS_SECRET_ACCESS_KEY }}|" .env + sed -i 's|AI_APPFLOWY_HOST=.*|AI_APPFLOWY_HOST=http://localhost:8000|' .env sed -i 's|APPFLOWY_WEB_URL=.*|APPFLOWY_WEB_URL=http://localhost:3000|' .env shell: bash diff --git a/deploy.env b/deploy.env index 2e05ef47..9ac7af53 100644 --- a/deploy.env +++ b/deploy.env @@ -19,6 +19,9 @@ REDIS_PORT=6379 MINIO_HOST=minio MINIO_PORT=9000 +AWS_ACCESS_KEY=minioadmin +AWS_SECRET=minioadmin + # AppFlowy Cloud ## URL that connects to the gotrue docker container APPFLOWY_GOTRUE_BASE_URL=http://gotrue:9999 @@ -110,8 +113,8 @@ APPFLOWY_S3_CREATE_BUCKET=true # Keep this as true if you are using other S3 compatible storage provider other than AWS. APPFLOWY_S3_USE_MINIO=true APPFLOWY_S3_MINIO_URL=http://${MINIO_HOST}:${MINIO_PORT} # change this if you are using a different address for minio -APPFLOWY_S3_ACCESS_KEY=minioadmin -APPFLOWY_S3_SECRET_KEY=minioadmin +APPFLOWY_S3_ACCESS_KEY=${AWS_ACCESS_KEY} +APPFLOWY_S3_SECRET_KEY=${AWS_SECRET} APPFLOWY_S3_BUCKET=appflowy #APPFLOWY_S3_REGION=us-east-1 @@ -146,12 +149,16 @@ NGINX_PORT=80 NGINX_TLS_PORT=443 # AppFlowy AI -APPFLOWY_AI_OPENAI_API_KEY= -APPFLOWY_AI_SERVER_PORT=5001 -APPFLOWY_AI_SERVER_HOST=ai -APPFLOWY_AI_DATABASE_URL=postgresql+psycopg://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DB} -APPFLOWY_AI_REDIS_URL=redis://${REDIS_HOST}:${REDIS_PORT} -APPFLOWY_LOCAL_AI_TEST_ENABLED=false +AI_OPENAI_API_KEY= +AI_SERVER_PORT=5001 +AI_SERVER_HOST=ai +AI_DATABASE_URL=postgresql+psycopg://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DB} +AI_REDIS_URL=redis://${REDIS_HOST}:${REDIS_PORT} +LOCAL_AI_TEST_ENABLED=false +AI_APPFLOWY_BUCKET_NAME=appflowy +AI_APPFLOWY_HOST=http://your-host +AI_AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY} +AI_AWS_SECRET_ACCESS_KEY=${AWS_SECRET} # AppFlowy Indexer APPFLOWY_INDEXER_ENABLED=true diff --git a/dev.env b/dev.env index 9c9bd76c..d38ebb1b 100644 --- a/dev.env +++ b/dev.env @@ -6,6 +6,10 @@ APPFLOWY_WEBSOCKET_MAILBOX_SIZE=6000 APPFLOWY_DATABASE_MAX_CONNECTIONS=40 APPFLOWY_DOCUMENT_CONTENT_SPLIT_LEN=8000 +# AWS +AWS_ACCESS_KEY=minioadmin +AWS_SECRET=minioadmin + # This file is used to set the environment variables for local development # Copy this file to .env and change the values as needed @@ -83,8 +87,8 @@ GOTRUE_EXTERNAL_APPLE_REDIRECT_URI=http://localhost:9999/callback APPFLOWY_S3_CREATE_BUCKET=true APPFLOWY_S3_USE_MINIO=true APPFLOWY_S3_MINIO_URL=http://localhost:9000 # change this if you are using a different address for minio -APPFLOWY_S3_ACCESS_KEY=minioadmin -APPFLOWY_S3_SECRET_KEY=minioadmin +APPFLOWY_S3_ACCESS_KEY=${AWS_ACCESS_KEY} +APPFLOWY_S3_SECRET_KEY=${AWS_SECRET} APPFLOWY_S3_BUCKET=appflowy #APPFLOWY_S3_REGION=us-east-1 @@ -113,12 +117,16 @@ GF_SECURITY_ADMIN_PASSWORD=password CLOUDFLARE_TUNNEL_TOKEN= # AppFlowy AI -APPFLOWY_AI_OPENAI_API_KEY= -APPFLOWY_AI_SERVER_PORT=5001 -APPFLOWY_AI_SERVER_HOST=localhost -APPFLOWY_AI_DATABASE_URL=postgresql+psycopg://postgres:password@localhost:5432/postgres -APPFLOWY_AI_REDIS_URL=redis://redis:6379 -APPFLOWY_LOCAL_AI_TEST_ENABLED=false +AI_OPENAI_API_KEY= +AI_SERVER_PORT=5001 +AI_SERVER_HOST=localhost +AI_DATABASE_URL=postgresql+psycopg://postgres:password@localhost:5432/postgres +AI_REDIS_URL=redis://redis:6379 +LOCAL_AI_TEST_ENABLED=false +AI_APPFLOWY_BUCKET_NAME=appflowy +AI_APPFLOWY_HOST=http://localhost:8000 +AI_AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY} +AI_AWS_SECRET_ACCESS_KEY=${AWS_SECRET} # AppFlowy Indexer APPFLOWY_INDEXER_ENABLED=true diff --git a/docker-compose-ci.yml b/docker-compose-ci.yml index a036c09f..ddb3f5cc 100644 --- a/docker-compose-ci.yml +++ b/docker-compose-ci.yml @@ -131,15 +131,15 @@ services: - APPFLOWY_ACCESS_CONTROL=${APPFLOWY_ACCESS_CONTROL} # For the CI testing, we set the database connection to 20. The default value is 40. - APPFLOWY_DATABASE_MAX_CONNECTIONS=20 - - APPFLOWY_AI_SERVER_HOST=${APPFLOWY_AI_SERVER_HOST} - - APPFLOWY_AI_SERVER_PORT=${APPFLOWY_AI_SERVER_PORT} + - AI_SERVER_HOST=${AI_SERVER_HOST} + - AI_SERVER_PORT=${AI_SERVER_PORT} - APPFLOWY_WEB_URL=${APPFLOWY_WEB_URL} - APPFLOWY_MAILER_SMTP_HOST=${APPFLOWY_MAILER_SMTP_HOST} - APPFLOWY_MAILER_SMTP_PORT=${APPFLOWY_MAILER_SMTP_PORT} - APPFLOWY_MAILER_SMTP_USERNAME=${APPFLOWY_MAILER_SMTP_USERNAME} - APPFLOWY_MAILER_SMTP_EMAIL=${APPFLOWY_MAILER_SMTP_EMAIL} - APPFLOWY_MAILER_SMTP_PASSWORD=${APPFLOWY_MAILER_SMTP_PASSWORD} - - APPFLOWY_AI_OPENAI_API_KEY=${APPFLOWY_AI_OPENAI_API_KEY} + - AI_OPENAI_API_KEY=${AI_OPENAI_API_KEY} build: context: . dockerfile: Dockerfile @@ -171,12 +171,14 @@ services: ports: - "5001:5001" environment: - - OPENAI_API_KEY=${APPFLOWY_AI_OPENAI_API_KEY} - - LOCAL_AI_AWS_ACCESS_KEY_ID=${LOCAL_AI_AWS_ACCESS_KEY_ID} - - LOCAL_AI_AWS_SECRET_ACCESS_KEY=${LOCAL_AI_AWS_SECRET_ACCESS_KEY} - - APPFLOWY_AI_SERVER_PORT=${APPFLOWY_AI_SERVER_PORT} - - APPFLOWY_AI_DATABASE_URL=${APPFLOWY_AI_DATABASE_URL} - - APPFLOWY_AI_REDIS_URL=${APPFLOWY_AI_REDIS_URL} + - OPENAI_API_KEY=${AI_OPENAI_API_KEY} + - AI_AWS_ACCESS_KEY_ID=${AI_AWS_ACCESS_KEY_ID} + - AI_AWS_SECRET_ACCESS_KEY=${AI_AWS_SECRET_ACCESS_KEY} + - AI_SERVER_PORT=${AI_SERVER_PORT} + - AI_DATABASE_URL=${AI_DATABASE_URL} + - AI_REDIS_URL=${AI_REDIS_URL} + - AI_APPFLOWY_BUCKET_NAME=${AI_APPFLOWY_BUCKET_NAME} + - AI_APPFLOWY_HOST=${AI_APPFLOWY_HOST} appflowy_worker: restart: on-failure diff --git a/docker-compose.yml b/docker-compose.yml index 5a294f21..4bbf3f72 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -133,11 +133,10 @@ services: - APPFLOWY_MAILER_SMTP_TLS_KIND=${APPFLOWY_MAILER_SMTP_TLS_KIND} - APPFLOWY_ACCESS_CONTROL=${APPFLOWY_ACCESS_CONTROL} - APPFLOWY_DATABASE_MAX_CONNECTIONS=${APPFLOWY_DATABASE_MAX_CONNECTIONS} - - APPFLOWY_AI_SERVER_HOST=${APPFLOWY_AI_SERVER_HOST} - - APPFLOWY_AI_SERVER_PORT=${APPFLOWY_AI_SERVER_PORT} - - APPFLOWY_AI_OPENAI_API_KEY=${APPFLOWY_AI_OPENAI_API_KEY} + - AI_SERVER_HOST=${AI_SERVER_HOST} + - AI_SERVER_PORT=${AI_SERVER_PORT} + - AI_OPENAI_API_KEY=${AI_OPENAI_API_KEY} # Uncomment this line if AppFlowy Web has been deployed - # - APPFLOWY_WEB_URL=${APPFLOWY_WEB_URL} build: context: . dockerfile: Dockerfile @@ -161,10 +160,10 @@ services: restart: on-failure image: appflowyinc/appflowy_ai:${APPFLOWY_AI_VERSION:-latest} environment: - - OPENAI_API_KEY=${APPFLOWY_AI_OPENAI_API_KEY} - - APPFLOWY_AI_SERVER_PORT=${APPFLOWY_AI_SERVER_PORT} - - APPFLOWY_AI_DATABASE_URL=${APPFLOWY_AI_DATABASE_URL} - - APPFLOWY_AI_REDIS_URL=${APPFLOWY_AI_REDIS_URL} + - OPENAI_API_KEY=${AI_OPENAI_API_KEY} + - APPFLOWY_AI_SERVER_PORT=${AI_SERVER_PORT} + - APPFLOWY_AI_DATABASE_URL=${AI_DATABASE_URL} + - APPFLOWY_AI_REDIS_URL=${AI_REDIS_URL} appflowy_worker: restart: on-failure diff --git a/libs/appflowy-ai-client/src/client.rs b/libs/appflowy-ai-client/src/client.rs index a88b8296..edf9dfb6 100644 --- a/libs/appflowy-ai-client/src/client.rs +++ b/libs/appflowy-ai-client/src/client.rs @@ -1,8 +1,9 @@ use crate::dto::{ AIModel, CalculateSimilarityParams, ChatAnswer, ChatQuestion, CompleteTextResponse, CompletionType, CreateChatContext, CustomPrompt, Document, LocalAIConfig, MessageData, - RepeatedLocalAIPackage, RepeatedRelatedQuestion, ResponseFormat, SearchDocumentsRequest, - SimilarityResponse, SummarizeRowResponse, TranslateRowData, TranslateRowResponse, + QuestionMetadata, RepeatedLocalAIPackage, RepeatedRelatedQuestion, ResponseFormat, + SearchDocumentsRequest, SimilarityResponse, SummarizeRowResponse, TranslateRowData, + TranslateRowResponse, }; use crate::error::AIError; @@ -173,6 +174,7 @@ impl AppFlowyAIClient { pub async fn send_question( &self, + workspace_id: &str, chat_id: &str, question_id: i64, content: &str, @@ -184,10 +186,13 @@ impl AppFlowyAIClient { data: MessageData { content: content.to_string(), metadata, - rag_ids: vec![], message_id: Some(question_id.to_string()), }, format: Default::default(), + metadata: QuestionMetadata { + workspace_id: workspace_id.to_string(), + rag_ids: vec![], + }, }; let url = format!("{}/chat/message", self.url); let resp = self @@ -203,6 +208,7 @@ impl AppFlowyAIClient { pub async fn stream_question( &self, + workspace_id: String, chat_id: &str, content: &str, metadata: Option, @@ -214,10 +220,13 @@ impl AppFlowyAIClient { data: MessageData { content: content.to_string(), metadata, - rag_ids, message_id: None, }, format: Default::default(), + metadata: QuestionMetadata { + workspace_id, + rag_ids, + }, }; let url = format!("{}/chat/message/stream", self.url); let resp = self @@ -230,8 +239,10 @@ impl AppFlowyAIClient { AIResponse::<()>::stream_response(resp).await } + #[allow(clippy::too_many_arguments)] pub async fn stream_question_v2( &self, + workspace_id: String, chat_id: &str, question_id: i64, content: &str, @@ -244,10 +255,13 @@ impl AppFlowyAIClient { data: MessageData { content: content.to_string(), metadata, - rag_ids, message_id: Some(question_id.to_string()), }, format: ResponseFormat::default(), + metadata: QuestionMetadata { + workspace_id, + rag_ids, + }, }; self.stream_question_v3(model, json).await } diff --git a/libs/appflowy-ai-client/src/dto.rs b/libs/appflowy-ai-client/src/dto.rs index dad3d9a1..e5fb7a1e 100644 --- a/libs/appflowy-ai-client/src/dto.rs +++ b/libs/appflowy-ai-client/src/dto.rs @@ -26,6 +26,13 @@ pub struct ChatQuestion { pub data: MessageData, #[serde(default)] pub format: ResponseFormat, + pub metadata: QuestionMetadata, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct QuestionMetadata { + pub workspace_id: String, + pub rag_ids: Vec, } #[derive(Clone, Default, Debug, Serialize, Deserialize)] @@ -60,7 +67,7 @@ pub struct OutputContentMetadata { #[serde(default, skip_serializing_if = "Option::is_none")] pub custom_image_prompt: Option, - /// The image model to use for generation (default: "dall-e-2"). + /// The image model to use for generation (default: "dall-e-3"). #[serde(default = "default_image_model")] pub image_model: String, @@ -81,7 +88,7 @@ pub struct OutputContentMetadata { // Default values for the fields fn default_image_model() -> String { - "dall-e-2".to_string() + "dall-e-3".to_string() } fn default_image_size() -> Option { @@ -98,8 +105,6 @@ pub struct MessageData { #[serde(skip_serializing_if = "Option::is_none")] pub metadata: Option, #[serde(default)] - pub rag_ids: Vec, - #[serde(default)] pub message_id: Option, } diff --git a/libs/appflowy-ai-client/tests/chat_test/context_test.rs b/libs/appflowy-ai-client/tests/chat_test/context_test.rs index 79cfceb0..9938a71a 100644 --- a/libs/appflowy-ai-client/tests/chat_test/context_test.rs +++ b/libs/appflowy-ai-client/tests/chat_test/context_test.rs @@ -14,7 +14,14 @@ async fn create_chat_context_test() { }; client.create_chat_text_context(context).await.unwrap(); let resp = client - .send_question(&chat_id, 1, "Where I live?", &AIModel::GPT4oMini, None) + .send_question( + &uuid::Uuid::new_v4().to_string(), + &chat_id, + 1, + "Where I live?", + &AIModel::GPT4oMini, + None, + ) .await .unwrap(); // response will be something like: diff --git a/libs/appflowy-ai-client/tests/chat_test/qa_test.rs b/libs/appflowy-ai-client/tests/chat_test/qa_test.rs index 31ddc5f1..38804ae3 100644 --- a/libs/appflowy-ai-client/tests/chat_test/qa_test.rs +++ b/libs/appflowy-ai-client/tests/chat_test/qa_test.rs @@ -1,9 +1,6 @@ use crate::appflowy_ai_client; -use appflowy_ai_client::dto::{AIModel, STREAM_ANSWER_KEY}; -use appflowy_ai_client::error::AIError; -use futures::stream::StreamExt; -use infra::reqwest::JsonStream; +use appflowy_ai_client::dto::AIModel; #[tokio::test] async fn qa_test() { @@ -11,7 +8,14 @@ async fn qa_test() { client.health_check().await.unwrap(); let chat_id = uuid::Uuid::new_v4().to_string(); let resp = client - .send_question(&chat_id, 1, "I feel hungry", &AIModel::GPT4o, None) + .send_question( + &uuid::Uuid::new_v4().to_string(), + &chat_id, + 1, + "I feel hungry", + &AIModel::GPT4o, + None, + ) .await .unwrap(); assert!(!resp.content.is_empty()); diff --git a/libs/client-api-test/src/log.rs b/libs/client-api-test/src/log.rs index cfe5252d..339d1301 100644 --- a/libs/client-api-test/src/log.rs +++ b/libs/client-api-test/src/log.rs @@ -35,7 +35,7 @@ pub fn ai_test_enabled() -> bool { load_env(); // local ai test is disable by default - let enabled = get_bool_from_env_var("APPFLOWY_LOCAL_AI_TEST_ENABLED"); + let enabled = get_bool_from_env_var("LOCAL_AI_TEST_ENABLED"); if enabled { trace!("Local AI test is enabled"); } diff --git a/libs/database/src/file/file_storage.rs b/libs/database/src/file/file_storage.rs index 1abb8789..ac5a3689 100644 --- a/libs/database/src/file/file_storage.rs +++ b/libs/database/src/file/file_storage.rs @@ -67,7 +67,7 @@ pub trait BucketClient { pub trait BlobKey: Send + Sync { fn workspace_id(&self) -> &Uuid; fn object_key(&self) -> String; - fn meta_key(&self) -> String; + fn blob_metadata_key(&self) -> String; fn e_tag(&self) -> &str; } @@ -99,11 +99,11 @@ where file_type: String, file_size: usize, ) -> Result<(), AppError> { - if is_blob_metadata_exists(&self.pg_pool, key.workspace_id(), &key.meta_key()).await? { + if is_blob_metadata_exists(&self.pg_pool, key.workspace_id(), &key.blob_metadata_key()).await? { warn!( - "file already exists, workspace_id: {}, meta_key: {}", + "file already exists, workspace_id: {}, blob_metadata_key: {}", key.workspace_id(), - key.meta_key() + key.blob_metadata_key() ); return Ok(()); } @@ -114,7 +114,7 @@ where .await?; insert_blob_metadata( &self.pg_pool, - &key.meta_key(), + &key.blob_metadata_key(), key.workspace_id(), &file_type, file_size, @@ -127,7 +127,7 @@ where self.client.delete_blob(&key.object_key()).await?; let mut tx = self.pg_pool.begin().await?; - delete_blob_metadata(&mut tx, key.workspace_id(), &key.meta_key()).await?; + delete_blob_metadata(&mut tx, key.workspace_id(), &key.blob_metadata_key()).await?; tx.commit().await?; Ok(()) } @@ -135,9 +135,9 @@ where pub async fn get_blob_metadata( &self, workspace_id: &Uuid, - meta_key: &str, + store_key: &str, ) -> Result { - let metadata = get_blob_metadata(&self.pg_pool, workspace_id, meta_key).await?; + let metadata = get_blob_metadata(&self.pg_pool, workspace_id, store_key).await?; Ok(metadata) } @@ -180,7 +180,7 @@ where self.client.complete_upload(&key.object_key(), req).await?; insert_blob_metadata( &self.pg_pool, - &key.meta_key(), + &key.blob_metadata_key(), key.workspace_id(), &content_type, content_length, diff --git a/services/appflowy-collaborate/src/application.rs b/services/appflowy-collaborate/src/application.rs index 0d9cc9bd..eb4cfb8a 100644 --- a/services/appflowy-collaborate/src/application.rs +++ b/services/appflowy-collaborate/src/application.rs @@ -163,7 +163,7 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result() .unwrap_or(true), - openai_api_key: Secret::new(get_env_var("APPFLOWY_AI_OPENAI_API_KEY", "")), + openai_api_key: Secret::new(get_env_var("AI_OPENAI_API_KEY", "")), embedding_buffer_size: get_env_var("APPFLOWY_INDEXER_EMBEDDING_BUFFER_SIZE", "2000") .parse::() .unwrap_or(2000), diff --git a/services/appflowy-collaborate/src/config.rs b/services/appflowy-collaborate/src/config.rs index cd02fbe5..690aee4a 100644 --- a/services/appflowy-collaborate/src/config.rs +++ b/services/appflowy-collaborate/src/config.rs @@ -210,7 +210,7 @@ pub fn get_configuration() -> Result { redis_worker_count: get_env_var("APPFLOWY_REDIS_WORKERS", "60").parse()?, ai: AISettings { port: get_env_var("APPFLOWY_AI_SERVER_PORT", "5001").parse()?, - host: get_env_var("APPFLOWY_AI_SERVER_HOST", "localhost"), + host: get_env_var("AI_SERVER_HOST", "localhost"), }, }; Ok(config) diff --git a/services/appflowy-worker/src/application.rs b/services/appflowy-worker/src/application.rs index 187b053b..91e941da 100644 --- a/services/appflowy-worker/src/application.rs +++ b/services/appflowy-worker/src/application.rs @@ -145,7 +145,7 @@ pub async fn create_app(listener: TcpListener, config: Config) -> Result<(), Err .parse::() .unwrap_or(true), open_api_key: Secret::new(appflowy_collaborate::config::get_env_var( - "APPFLOWY_AI_OPENAI_API_KEY", + "AI_OPENAI_API_KEY", "", )), tick_interval_secs: 10, diff --git a/src/api/chat.rs b/src/api/chat.rs index f48e0b07..ebc10853 100644 --- a/src/api/chat.rs +++ b/src/api/chat.rs @@ -10,7 +10,8 @@ use serde::Deserialize; use crate::api::util::ai_model_from_header; use app_error::AppError; use appflowy_ai_client::dto::{ - ChatQuestion, ChatQuestionQuery, CreateChatContext, MessageData, RepeatedRelatedQuestion, + ChatQuestion, ChatQuestionQuery, CreateChatContext, MessageData, QuestionMetadata, + RepeatedRelatedQuestion, }; use authentication::jwt::UserUuid; use bytes::Bytes; @@ -88,7 +89,7 @@ pub fn chat_scope() -> Scope { ) .service( web::resource("/{chat_id}/{message_id}/v2/answer/stream") - .route(web::get().to(answer_stream_v2_handler)) + .route(web::get().to(answer_stream_v2_handler)) // Deprecated since 0.9.2 ) .service( web::resource("/{chat_id}/answer/stream") @@ -140,13 +141,22 @@ async fn create_chat_context_handler( } async fn update_question_handler( + path: web::Path<(String, String)>, state: Data, payload: Json, req: HttpRequest, ) -> actix_web::Result> { + let (workspace_id, _chat_id) = path.into_inner(); let params = payload.into_inner(); let ai_model = ai_model_from_header(&req); - update_chat_message(&state.pg_pool, params, state.ai_client.clone(), ai_model).await?; + update_chat_message( + workspace_id, + &state.pg_pool, + params, + state.ai_client.clone(), + ai_model, + ) + .await?; Ok(AppResponse::Ok().into()) } @@ -236,9 +246,10 @@ async fn answer_handler( state: Data, req: HttpRequest, ) -> actix_web::Result> { - let (_workspace_id, chat_id, message_id) = path.into_inner(); + let (workspace_id, chat_id, message_id) = path.into_inner(); let ai_model = ai_model_from_header(&req); let message = generate_chat_message_answer( + workspace_id, &state.pg_pool, state.ai_client.clone(), message_id, @@ -255,14 +266,21 @@ async fn answer_stream_handler( state: Data, req: HttpRequest, ) -> actix_web::Result { - let (_workspace_id, chat_id, question_id) = path.into_inner(); + let (workspace_id, chat_id, question_id) = path.into_inner(); let (content, metadata) = chat::chat_ops::select_chat_message_content(&state.pg_pool, question_id).await?; let rag_ids = chat::chat_ops::select_chat_rag_ids(&state.pg_pool, &chat_id).await?; let ai_model = ai_model_from_header(&req); match state .ai_client - .stream_question(&chat_id, &content, Some(metadata), rag_ids, &ai_model) + .stream_question( + workspace_id, + &chat_id, + &content, + Some(metadata), + rag_ids, + &ai_model, + ) .await { Ok(answer_stream) => { @@ -289,7 +307,7 @@ async fn answer_stream_v2_handler( state: Data, req: HttpRequest, ) -> actix_web::Result { - let (_workspace_id, chat_id, question_id) = path.into_inner(); + let (workspace_id, chat_id, question_id) = path.into_inner(); let (content, metadata) = chat::chat_ops::select_chat_message_content(&state.pg_pool, question_id).await?; let rag_ids = chat::chat_ops::select_chat_rag_ids(&state.pg_pool, &chat_id).await?; @@ -304,6 +322,7 @@ async fn answer_stream_v2_handler( match state .ai_client .stream_question_v2( + workspace_id, &chat_id, question_id, &content, @@ -333,10 +352,12 @@ async fn answer_stream_v2_handler( #[instrument(level = "debug", skip_all, err)] async fn answer_stream_v3_handler( + path: web::Path<(String, String)>, payload: Json, state: Data, req: HttpRequest, ) -> actix_web::Result { + let (workspace_id, _) = path.into_inner(); let payload = payload.into_inner(); let (content, metadata) = chat::chat_ops::select_chat_message_content(&state.pg_pool, payload.question_id).await?; @@ -348,10 +369,13 @@ async fn answer_stream_v3_handler( data: MessageData { content: content.to_string(), metadata: Some(metadata), - rag_ids, message_id: Some(payload.question_id.to_string()), }, format: payload.format, + metadata: QuestionMetadata { + workspace_id, + rag_ids, + }, }; trace!("[Chat] stream v3 {:?}", question); match state diff --git a/src/api/file_storage.rs b/src/api/file_storage.rs index 0edce6cd..62ea16e4 100644 --- a/src/api/file_storage.rs +++ b/src/api/file_storage.rs @@ -357,7 +357,7 @@ async fn get_blob_by_object_key( // Get the metadata let result = state .bucket_storage - .get_blob_metadata(key.workspace_id(), &key.meta_key()) + .get_blob_metadata(key.workspace_id(), &key.blob_metadata_key()) .await; if let Err(err) = result.as_ref() { @@ -424,7 +424,7 @@ async fn get_blob_metadata_handler( // Get the metadata let metadata = state .bucket_storage - .get_blob_metadata(&path.workspace_id, &path.meta_key()) + .get_blob_metadata(&path.workspace_id, &path.blob_metadata_key()) .await .map(|meta| BlobMetadata { workspace_id: meta.workspace_id, @@ -448,7 +448,7 @@ async fn get_blob_metadata_v1_handler( // Get the metadata let metadata = state .bucket_storage - .get_blob_metadata(&path.workspace_id, &path.meta_key()) + .get_blob_metadata(&path.workspace_id, &path.blob_metadata_key()) .await .map(|meta| BlobMetadata { workspace_id: meta.workspace_id, @@ -589,7 +589,7 @@ impl BlobKey for BlobPathV0 { format!("{}/{}", self.workspace_id, self.file_id) } - fn meta_key(&self) -> String { + fn blob_metadata_key(&self) -> String { self.file_id.clone() } @@ -615,7 +615,7 @@ impl BlobKey for BlobPathV1 { format!("{}/{}/{}", self.workspace_id, self.parent_dir, self.file_id) } - fn meta_key(&self) -> String { + fn blob_metadata_key(&self) -> String { format!("{}_{}", self.parent_dir, self.file_id) } diff --git a/src/application.rs b/src/application.rs index 2196f367..d27874a6 100644 --- a/src/application.rs +++ b/src/application.rs @@ -332,7 +332,7 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result() .unwrap_or(true), - openai_api_key: Secret::new(get_env_var("APPFLOWY_AI_OPENAI_API_KEY", "")), + openai_api_key: Secret::new(get_env_var("AI_OPENAI_API_KEY", "")), embedding_buffer_size: appflowy_collaborate::config::get_env_var( "APPFLOWY_INDEXER_EMBEDDING_BUFFER_SIZE", "5000", diff --git a/src/biz/chat/ops.rs b/src/biz/chat/ops.rs index d542e42f..a56ca6a9 100644 --- a/src/biz/chat/ops.rs +++ b/src/biz/chat/ops.rs @@ -42,6 +42,7 @@ pub(crate) async fn delete_chat(pg_pool: &PgPool, chat_id: &str) -> Result<(), A } pub async fn update_chat_message( + workspace_id: String, pg_pool: &PgPool, params: UpdateChatMessageContentParams, ai_client: AppFlowyAIClient, @@ -60,6 +61,7 @@ pub async fn update_chat_message( // TODO(nathan): query the metadata from the database let new_answer = ai_client .send_question( + &workspace_id, ¶ms.chat_id, params.message_id, ¶ms.content, @@ -81,6 +83,7 @@ pub async fn update_chat_message( } pub async fn generate_chat_message_answer( + workspace_id: String, pg_pool: &PgPool, ai_client: AppFlowyAIClient, question_message_id: i64, @@ -91,6 +94,7 @@ pub async fn generate_chat_message_answer( chat::chat_ops::select_chat_message_content(pg_pool, question_message_id).await?; let new_answer = ai_client .send_question( + &workspace_id, chat_id, question_message_id, &content, @@ -145,6 +149,7 @@ pub async fn create_chat_message( pub async fn create_chat_message_stream( pg_pool: &PgPool, uid: i64, + workspace_id: String, chat_id: String, params: CreateChatMessageParams, ai_client: AppFlowyAIClient, @@ -186,7 +191,7 @@ pub async fn create_chat_message_stream( match params.message_type { ChatMessageType::System => {} ChatMessageType::User => { - let answer = match ai_client.send_question(&chat_id,question_id, ¶ms.content, &ai_model, Some(json!(params.metadata))).await { + let answer = match ai_client.send_question(&workspace_id, &chat_id,question_id, ¶ms.content, &ai_model, Some(json!(params.metadata))).await { Ok(response) => response, Err(err) => { error!("Failed to send question to AI: {}", err); diff --git a/src/config/config.rs b/src/config/config.rs index 13f74c56..73c621a3 100644 --- a/src/config/config.rs +++ b/src/config/config.rs @@ -241,8 +241,8 @@ pub fn get_configuration() -> Result { region: get_env_var("APPFLOWY_S3_REGION", ""), }, appflowy_ai: AppFlowyAISetting { - port: get_env_var("APPFLOWY_AI_SERVER_PORT", "5001").into(), - host: get_env_var("APPFLOWY_AI_SERVER_HOST", "localhost").into(), + port: get_env_var("AI_SERVER_PORT", "5001").into(), + host: get_env_var("AI_SERVER_HOST", "localhost").into(), }, grpc_history: GrpcHistorySetting { addrs: get_env_var("APPFLOWY_GRPC_HISTORY_ADDRS", "http://localhost:50051"),