From b650e9e5fbaa0fc8758b9d5af3f7388bb74f9560 Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Sun, 12 Jan 2025 23:07:31 +0800 Subject: [PATCH] chore: ai writer (#1153) * chore: ai writer * chore: update test * chore: update test * chore: set env * chore: rename * chore: format nginx conf --- .github/workflows/integration_test.yml | 2 +- deploy.env | 2 +- docker-compose-ci.yml | 3 + libs/appflowy-ai-client/src/client.rs | 25 ++-- libs/appflowy-ai-client/src/dto.rs | 31 +++++ .../tests/chat_test/completion_test.rs | 41 +++---- libs/database/src/file/file_storage.rs | 4 +- libs/database/src/resource_usage.rs | 10 +- libs/shared-entity/src/dto/ai_dto.rs | 17 --- nginx/nginx.conf | 107 +++++++++--------- src/api/ai.rs | 8 +- tests/ai_test/chat_test.rs | 20 ++-- 12 files changed, 141 insertions(+), 129 deletions(-) diff --git a/.github/workflows/integration_test.yml b/.github/workflows/integration_test.yml index 834044d7..b0cd0836 100644 --- a/.github/workflows/integration_test.yml +++ b/.github/workflows/integration_test.yml @@ -102,7 +102,7 @@ jobs: sed -i 's|AI_OPENAI_API_KEY=.*|AI_OPENAI_API_KEY=${{ secrets.CI_OPENAI_API_KEY }}|' .env sed -i "s|AI_AWS_ACCESS_KEY_ID=.*|AI_AWS_ACCESS_KEY_ID=${{ secrets.LOCAL_AI_AWS_ACCESS_KEY_ID }}|" .env sed -i "s|AI_AWS_SECRET_ACCESS_KEY=.*|AI_AWS_SECRET_ACCESS_KEY=${{ secrets.LOCAL_AI_AWS_SECRET_ACCESS_KEY }}|" .env - sed -i 's|AI_APPFLOWY_HOST=.*|AI_APPFLOWY_HOST=http://localhost:8000|' .env + sed -i 's|AI_APPFLOWY_HOST=.*|AI_APPFLOWY_HOST=http://localhost|' .env sed -i 's|APPFLOWY_WEB_URL=.*|APPFLOWY_WEB_URL=http://localhost:3000|' .env shell: bash diff --git a/deploy.env b/deploy.env index 873c1efe..74df73e7 100644 --- a/deploy.env +++ b/deploy.env @@ -159,7 +159,7 @@ AI_DATABASE_URL=postgresql+psycopg://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${POS AI_REDIS_URL=redis://${REDIS_HOST}:${REDIS_PORT} LOCAL_AI_TEST_ENABLED=false AI_APPFLOWY_BUCKET_NAME=appflowy -AI_APPFLOWY_HOST=http://your-host +AI_APPFLOWY_HOST=${FQDN} AI_AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY} AI_AWS_SECRET_ACCESS_KEY=${AWS_SECRET} diff --git a/docker-compose-ci.yml b/docker-compose-ci.yml index ddb3f5cc..a21747e9 100644 --- a/docker-compose-ci.yml +++ b/docker-compose-ci.yml @@ -11,6 +11,7 @@ services: - ./nginx/nginx.conf:/etc/nginx/nginx.conf - ./nginx/ssl/certificate.crt:/etc/nginx/ssl/certificate.crt - ./nginx/ssl/private_key.key:/etc/nginx/ssl/private_key.key + #- ./nginx_logs:/var/log/nginx # You do not need this if you have configured to use your own s3 file storage # You can try to access http://localhost/minio/browser/appflowy in your browser @@ -179,6 +180,8 @@ services: - AI_REDIS_URL=${AI_REDIS_URL} - AI_APPFLOWY_BUCKET_NAME=${AI_APPFLOWY_BUCKET_NAME} - AI_APPFLOWY_HOST=${AI_APPFLOWY_HOST} + - AI_USE_MINIO=${APPFLOWY_S3_USE_MINIO} + - SUPPORT_OPENAI_V3_IMAGE_MODEL=false appflowy_worker: restart: on-failure diff --git a/libs/appflowy-ai-client/src/client.rs b/libs/appflowy-ai-client/src/client.rs index 78df90ef..e94608bc 100644 --- a/libs/appflowy-ai-client/src/client.rs +++ b/libs/appflowy-ai-client/src/client.rs @@ -1,8 +1,8 @@ use crate::dto::{ - AIModel, CalculateSimilarityParams, ChatAnswer, ChatQuestion, CompletionType, CreateChatContext, - CustomPrompt, Document, LocalAIConfig, MessageData, QuestionMetadata, RepeatedLocalAIPackage, - RepeatedRelatedQuestion, ResponseFormat, SearchDocumentsRequest, SimilarityResponse, - SummarizeRowResponse, TranslateRowData, TranslateRowResponse, + AIModel, CalculateSimilarityParams, ChatAnswer, ChatQuestion, CompleteTextParams, + CreateChatContext, Document, LocalAIConfig, MessageData, QuestionMetadata, + RepeatedLocalAIPackage, RepeatedRelatedQuestion, ResponseFormat, SearchDocumentsRequest, + SimilarityResponse, SummarizeRowResponse, TranslateRowData, TranslateRowResponse, }; use crate::error::AIError; @@ -12,7 +12,7 @@ use reqwest; use reqwest::{Method, RequestBuilder, StatusCode}; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; -use serde_json::{json, Map, Value}; +use serde_json::{Map, Value}; use std::borrow::Cow; use std::time::Duration; use tracing::{info, trace}; @@ -41,24 +41,15 @@ impl AppFlowyAIClient { Ok(()) } - pub async fn stream_completion_text>>( + pub async fn stream_completion_text( &self, - text: &str, - completion_type: T, - custom_prompt: Option, + params: CompleteTextParams, model: AIModel, ) -> Result>, AIError> { - let completion_type = completion_type.into(); - if text.is_empty() { + if params.text.is_empty() { return Err(AIError::InvalidRequest("Empty text".to_string())); } - let params = json!({ - "text": text, - "type": completion_type.map(|t| t as u8), - "custom_prompt": custom_prompt, - }); - let url = format!("{}/completion/stream", self.url); let resp = self .async_http_client(Method::POST, &url)? diff --git a/libs/appflowy-ai-client/src/dto.rs b/libs/appflowy-ai-client/src/dto.rs index 4eb5a1d3..c38fb025 100644 --- a/libs/appflowy-ai-client/src/dto.rs +++ b/libs/appflowy-ai-client/src/dto.rs @@ -466,3 +466,34 @@ pub struct CalculateSimilarityParams { pub struct SimilarityResponse { pub score: f64, } + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct CompletionMetadata { + pub object_id: String, + pub rag_ids: Option>, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct CompleteTextParams { + pub text: String, + pub completion_type: Option, + pub custom_prompt: Option, + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + pub metadata: Option, +} + +impl CompleteTextParams { + pub fn new_with_completion_type( + text: String, + completion_type: CompletionType, + metadata: Option, + ) -> Self { + Self { + text, + completion_type: Some(completion_type), + custom_prompt: None, + metadata, + } + } +} diff --git a/libs/appflowy-ai-client/tests/chat_test/completion_test.rs b/libs/appflowy-ai-client/tests/chat_test/completion_test.rs index 38532a12..e3f72686 100644 --- a/libs/appflowy-ai-client/tests/chat_test/completion_test.rs +++ b/libs/appflowy-ai-client/tests/chat_test/completion_test.rs @@ -1,16 +1,17 @@ use crate::appflowy_ai_client; use appflowy_ai_client::client::collect_stream_text; -use appflowy_ai_client::dto::{AIModel, CompletionType}; +use appflowy_ai_client::dto::{AIModel, CompleteTextParams, CompletionType}; #[tokio::test] async fn continue_writing_test() { let client = appflowy_ai_client(); + let params = CompleteTextParams { + text: "I feel hungry".to_string(), + completion_type: Some(CompletionType::ImproveWriting), + custom_prompt: None, + metadata: None, + }; let stream = client - .stream_completion_text( - "I feel hungry", - CompletionType::ContinueWriting, - None, - AIModel::Claude3Sonnet, - ) + .stream_completion_text(params, AIModel::GPT4oMini) .await .unwrap(); let text = collect_stream_text(stream).await; @@ -21,13 +22,14 @@ async fn continue_writing_test() { #[tokio::test] async fn improve_writing_test() { let client = appflowy_ai_client(); + let params = CompleteTextParams { + text: "I fell tired because i sleep not very well last night".to_string(), + completion_type: Some(CompletionType::ImproveWriting), + custom_prompt: None, + metadata: None, + }; let stream = client - .stream_completion_text( - "I fell tired because i sleep not very well last night", - CompletionType::ImproveWriting, - None, - AIModel::GPT4oMini, - ) + .stream_completion_text(params, AIModel::GPT4oMini) .await .unwrap(); @@ -40,13 +42,14 @@ async fn improve_writing_test() { #[tokio::test] async fn make_text_shorter_text() { let client = appflowy_ai_client(); + let params = CompleteTextParams { + text: "I have an immense passion and deep-seated affection for Rust, a modern, multi-paradigm, high-performance programming language that I find incredibly satisfying to use due to its focus on safety, speed, and concurrency".to_string(), + completion_type: Some(CompletionType::MakeShorter), + custom_prompt: None, + metadata: None, + }; let stream = client - .stream_completion_text( - "I have an immense passion and deep-seated affection for Rust, a modern, multi-paradigm, high-performance programming language that I find incredibly satisfying to use due to its focus on safety, speed, and concurrency", - CompletionType::MakeShorter, - None, - AIModel::GPT4oMini - ) + .stream_completion_text(params, AIModel::GPT4oMini) .await .unwrap(); diff --git a/libs/database/src/file/file_storage.rs b/libs/database/src/file/file_storage.rs index ac5a3689..211f6cef 100644 --- a/libs/database/src/file/file_storage.rs +++ b/libs/database/src/file/file_storage.rs @@ -135,9 +135,9 @@ where pub async fn get_blob_metadata( &self, workspace_id: &Uuid, - store_key: &str, + metadata_key: &str, ) -> Result { - let metadata = get_blob_metadata(&self.pg_pool, workspace_id, store_key).await?; + let metadata = get_blob_metadata(&self.pg_pool, workspace_id, metadata_key).await?; Ok(metadata) } diff --git a/libs/database/src/resource_usage.rs b/libs/database/src/resource_usage.rs index d7cf1a88..357849a5 100644 --- a/libs/database/src/resource_usage.rs +++ b/libs/database/src/resource_usage.rs @@ -135,8 +135,14 @@ pub async fn delete_blob_metadata( pub async fn get_blob_metadata( pg_pool: &PgPool, workspace_id: &Uuid, - file_id: &str, + metadata_key: &str, ) -> Result { + tracing::trace!( + "get_blob_metadata: workspace_id: {}, metadata_key: {}", + workspace_id, + metadata_key + ); + // file_id is the BlobPath's blob_metadata_key let metadata = sqlx::query_as!( AFBlobMetadataRow, r#" @@ -144,7 +150,7 @@ pub async fn get_blob_metadata( WHERE workspace_id = $1 AND file_id = $2 "#, workspace_id, - file_id, + metadata_key, ) .fetch_one(pg_pool) .await?; diff --git a/libs/shared-entity/src/dto/ai_dto.rs b/libs/shared-entity/src/dto/ai_dto.rs index f198894e..d6be8d0f 100644 --- a/libs/shared-entity/src/dto/ai_dto.rs +++ b/libs/shared-entity/src/dto/ai_dto.rs @@ -31,23 +31,6 @@ pub struct SummarizeRowResponse { pub text: String, } -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct CompleteTextParams { - pub text: String, - pub completion_type: Option, - pub custom_prompt: Option, -} - -impl CompleteTextParams { - pub fn new_with_completion_type(text: String, completion_type: CompletionType) -> Self { - Self { - text, - completion_type: Some(completion_type), - custom_prompt: None, - } - } -} - #[derive(Debug)] pub enum StringOrMessage { Left(String), diff --git a/nginx/nginx.conf b/nginx/nginx.conf index 6712da4e..201edf88 100644 --- a/nginx/nginx.conf +++ b/nginx/nginx.conf @@ -8,10 +8,11 @@ events { http { # docker dns resolver resolver 127.0.0.11 valid=10s; + #error_log /var/log/nginx/error.log debug; map $http_upgrade $connection_upgrade { - default upgrade; - '' close; + default upgrade; + '' close; } map $http_origin $cors_origin { @@ -50,14 +51,14 @@ http { # GoTrue location /gotrue/ { if ($request_method = 'OPTIONS') { - add_header 'Access-Control-Allow-Origin' $cors_origin always; - add_header 'Access-Control-Allow-Credentials' 'true' always; - add_header 'Access-Control-Allow-Headers' '*' always; - add_header 'Access-Control-Allow-Methods' 'GET, POST, PUT, DELETE, PATCH, OPTIONS' always; - add_header 'Access-Control-Max-Age' 3600 always; - add_header 'Content-Type' 'text/plain charset=UTF-8' always; - add_header 'Content-Length' 0 always; - return 204; + add_header 'Access-Control-Allow-Origin' $cors_origin always; + add_header 'Access-Control-Allow-Credentials' 'true' always; + add_header 'Access-Control-Allow-Headers' '*' always; + add_header 'Access-Control-Allow-Methods' 'GET, POST, PUT, DELETE, PATCH, OPTIONS' always; + add_header 'Access-Control-Max-Age' 3600 always; + add_header 'Content-Type' 'text/plain charset=UTF-8' always; + add_header 'Content-Length' 0 always; + return 204; } proxy_pass $gotrue_backend; @@ -81,57 +82,12 @@ http { proxy_read_timeout 86400; } - # AppFlowy-Cloud - location /api/chat { - proxy_pass $appflowy_cloud_backend; - - proxy_http_version 1.1; - proxy_set_header Connection ""; - chunked_transfer_encoding on; - proxy_buffering off; - proxy_cache off; - - proxy_read_timeout 600s; - proxy_connect_timeout 600s; - proxy_send_timeout 600s; - } - - location /api/import { - proxy_pass $appflowy_cloud_backend; - - # Set headers - proxy_set_header X-Request-Id $request_id; - proxy_set_header Host $http_host; - - # Handle CORS - add_header 'Access-Control-Allow-Origin' $cors_origin always; - add_header 'Access-Control-Allow-Methods' 'GET, POST, PUT, DELETE, PATCH, OPTIONS' always; - add_header 'Access-Control-Allow-Headers' 'Content-Type, Authorization, Accept' always; - add_header 'Access-Control-Max-Age' 3600 always; - - # Timeouts - proxy_read_timeout 600s; - proxy_connect_timeout 600s; - proxy_send_timeout 600s; - - # Disable buffering for large file uploads - proxy_request_buffering off; - proxy_buffering off; - proxy_cache off; - client_max_body_size 2G; - } - location /api { proxy_pass $appflowy_cloud_backend; - proxy_set_header X-Request-Id $request_id; proxy_set_header Host $http_host; - proxy_set_header X-Real-IP $remote_addr; - proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; - proxy_set_header X-Forwarded-Proto $scheme; # Set CORS headers for other requests - if ($request_method = 'OPTIONS') { add_header 'Access-Control-Allow-Origin' $cors_origin always; add_header 'Access-Control-Allow-Methods' 'GET, POST, PUT, DELETE, PATCH, OPTIONS' always; @@ -150,8 +106,49 @@ http { proxy_request_buffering off; client_max_body_size 256M; } + + # AppFlowy-Cloud + location /api/chat { + proxy_pass $appflowy_cloud_backend; + + proxy_http_version 1.1; + proxy_set_header Connection ""; + chunked_transfer_encoding on; + proxy_buffering off; + proxy_cache off; + + proxy_read_timeout 600s; + proxy_connect_timeout 600s; + proxy_send_timeout 600s; + } + + location /api/import { + proxy_pass $appflowy_cloud_backend; + + # Set headers + proxy_set_header X-Request-Id $request_id; + proxy_set_header Host $http_host; + + # Handle CORS + add_header 'Access-Control-Allow-Origin' $cors_origin always; + add_header 'Access-Control-Allow-Methods' 'GET, POST, PUT, DELETE, PATCH, OPTIONS' always; + add_header 'Access-Control-Allow-Headers' 'Content-Type, Authorization, Accept' always; + add_header 'Access-Control-Max-Age' 3600 always; + + # Timeouts + proxy_read_timeout 600s; + proxy_connect_timeout 600s; + proxy_send_timeout 600s; + + # Disable buffering for large file uploads + proxy_request_buffering off; + proxy_buffering off; + proxy_cache off; + client_max_body_size 2G; + } } + # AppFlowy AI location /ai { proxy_pass $appflowy_ai_backend; diff --git a/src/api/ai.rs b/src/api/ai.rs index 949eb3da..8d7c7a34 100644 --- a/src/api/ai.rs +++ b/src/api/ai.rs @@ -37,14 +37,10 @@ async fn stream_complete_text_handler( ) -> actix_web::Result { let ai_model = ai_model_from_header(&req); let params = payload.into_inner(); + match state .ai_client - .stream_completion_text( - ¶ms.text, - params.completion_type, - params.custom_prompt, - ai_model, - ) + .stream_completion_text(params, ai_model) .await { Ok(stream) => Ok( diff --git a/tests/ai_test/chat_test.rs b/tests/ai_test/chat_test.rs index 9c92bea8..45ec97dc 100644 --- a/tests/ai_test/chat_test.rs +++ b/tests/ai_test/chat_test.rs @@ -383,22 +383,22 @@ async fn get_text_with_image_message_test() { let answer = collect_answer(answer_stream).await; println!("answer:\n{}", answer); let image_url = extract_image_url(&answer).unwrap(); - let (workspace_id_url, chat_id_url, file_id_url) = test_client + let (workspace_id_2, chat_id_2, file_id_2) = test_client .api_client .parse_blob_url_v1(&image_url) .unwrap(); - assert_eq!(workspace_id, workspace_id_url); - assert_eq!(chat_id, chat_id_url); + assert_eq!(workspace_id, workspace_id_2); + assert_eq!(chat_id, chat_id_2); - let mut retries = 5; - let retry_interval = Duration::from_secs(10); + let mut retries = 6; + let retry_interval = Duration::from_secs(20); let mut last_error = None; // The image will be generated in the background, so we need to retry until it's available while retries > 0 { match test_client .api_client - .get_blob_v1(&workspace_id_url, &chat_id_url, &file_id_url) + .get_blob_v1(&workspace_id_2, &chat_id_2, &file_id_2) .await { Ok(_) => { @@ -407,14 +407,16 @@ async fn get_text_with_image_message_test() { break; }, Err(err) => { + eprintln!("Failed to get blob: {:?}", err); // Save the error and retry last_error = Some(err); retries -= 1; - if retries > 0 { - tokio::time::sleep(retry_interval).await; - } }, } + + if retries > 0 { + tokio::time::sleep(retry_interval).await; + } } if let Some(err) = last_error {