chore: ai writer (#1153)

* chore: ai writer

* chore: update test

* chore: update test

* chore: set env

* chore: rename

* chore: format nginx conf
This commit is contained in:
Nathan.fooo 2025-01-12 23:07:31 +08:00 committed by GitHub
parent a5d94a09d6
commit b650e9e5fb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 141 additions and 129 deletions

View File

@ -102,7 +102,7 @@ jobs:
sed -i 's|AI_OPENAI_API_KEY=.*|AI_OPENAI_API_KEY=${{ secrets.CI_OPENAI_API_KEY }}|' .env
sed -i "s|AI_AWS_ACCESS_KEY_ID=.*|AI_AWS_ACCESS_KEY_ID=${{ secrets.LOCAL_AI_AWS_ACCESS_KEY_ID }}|" .env
sed -i "s|AI_AWS_SECRET_ACCESS_KEY=.*|AI_AWS_SECRET_ACCESS_KEY=${{ secrets.LOCAL_AI_AWS_SECRET_ACCESS_KEY }}|" .env
sed -i 's|AI_APPFLOWY_HOST=.*|AI_APPFLOWY_HOST=http://localhost:8000|' .env
sed -i 's|AI_APPFLOWY_HOST=.*|AI_APPFLOWY_HOST=http://localhost|' .env
sed -i 's|APPFLOWY_WEB_URL=.*|APPFLOWY_WEB_URL=http://localhost:3000|' .env
shell: bash

View File

@ -159,7 +159,7 @@ AI_DATABASE_URL=postgresql+psycopg://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${POS
AI_REDIS_URL=redis://${REDIS_HOST}:${REDIS_PORT}
LOCAL_AI_TEST_ENABLED=false
AI_APPFLOWY_BUCKET_NAME=appflowy
AI_APPFLOWY_HOST=http://your-host
AI_APPFLOWY_HOST=${FQDN}
AI_AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY}
AI_AWS_SECRET_ACCESS_KEY=${AWS_SECRET}

View File

@ -11,6 +11,7 @@ services:
- ./nginx/nginx.conf:/etc/nginx/nginx.conf
- ./nginx/ssl/certificate.crt:/etc/nginx/ssl/certificate.crt
- ./nginx/ssl/private_key.key:/etc/nginx/ssl/private_key.key
#- ./nginx_logs:/var/log/nginx
# You do not need this if you have configured to use your own s3 file storage
# You can try to access http://localhost/minio/browser/appflowy in your browser
@ -179,6 +180,8 @@ services:
- AI_REDIS_URL=${AI_REDIS_URL}
- AI_APPFLOWY_BUCKET_NAME=${AI_APPFLOWY_BUCKET_NAME}
- AI_APPFLOWY_HOST=${AI_APPFLOWY_HOST}
- AI_USE_MINIO=${APPFLOWY_S3_USE_MINIO}
- SUPPORT_OPENAI_V3_IMAGE_MODEL=false
appflowy_worker:
restart: on-failure

View File

@ -1,8 +1,8 @@
use crate::dto::{
AIModel, CalculateSimilarityParams, ChatAnswer, ChatQuestion, CompletionType, CreateChatContext,
CustomPrompt, Document, LocalAIConfig, MessageData, QuestionMetadata, RepeatedLocalAIPackage,
RepeatedRelatedQuestion, ResponseFormat, SearchDocumentsRequest, SimilarityResponse,
SummarizeRowResponse, TranslateRowData, TranslateRowResponse,
AIModel, CalculateSimilarityParams, ChatAnswer, ChatQuestion, CompleteTextParams,
CreateChatContext, Document, LocalAIConfig, MessageData, QuestionMetadata,
RepeatedLocalAIPackage, RepeatedRelatedQuestion, ResponseFormat, SearchDocumentsRequest,
SimilarityResponse, SummarizeRowResponse, TranslateRowData, TranslateRowResponse,
};
use crate::error::AIError;
@ -12,7 +12,7 @@ use reqwest;
use reqwest::{Method, RequestBuilder, StatusCode};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use serde_json::{json, Map, Value};
use serde_json::{Map, Value};
use std::borrow::Cow;
use std::time::Duration;
use tracing::{info, trace};
@ -41,24 +41,15 @@ impl AppFlowyAIClient {
Ok(())
}
pub async fn stream_completion_text<T: Into<Option<CompletionType>>>(
pub async fn stream_completion_text(
&self,
text: &str,
completion_type: T,
custom_prompt: Option<CustomPrompt>,
params: CompleteTextParams,
model: AIModel,
) -> Result<impl Stream<Item = Result<Bytes, AIError>>, AIError> {
let completion_type = completion_type.into();
if text.is_empty() {
if params.text.is_empty() {
return Err(AIError::InvalidRequest("Empty text".to_string()));
}
let params = json!({
"text": text,
"type": completion_type.map(|t| t as u8),
"custom_prompt": custom_prompt,
});
let url = format!("{}/completion/stream", self.url);
let resp = self
.async_http_client(Method::POST, &url)?

View File

@ -466,3 +466,34 @@ pub struct CalculateSimilarityParams {
pub struct SimilarityResponse {
pub score: f64,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CompletionMetadata {
pub object_id: String,
pub rag_ids: Option<Vec<String>>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CompleteTextParams {
pub text: String,
pub completion_type: Option<CompletionType>,
pub custom_prompt: Option<CustomPrompt>,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub metadata: Option<CompletionMetadata>,
}
impl CompleteTextParams {
pub fn new_with_completion_type(
text: String,
completion_type: CompletionType,
metadata: Option<CompletionMetadata>,
) -> Self {
Self {
text,
completion_type: Some(completion_type),
custom_prompt: None,
metadata,
}
}
}

View File

@ -1,16 +1,17 @@
use crate::appflowy_ai_client;
use appflowy_ai_client::client::collect_stream_text;
use appflowy_ai_client::dto::{AIModel, CompletionType};
use appflowy_ai_client::dto::{AIModel, CompleteTextParams, CompletionType};
#[tokio::test]
async fn continue_writing_test() {
let client = appflowy_ai_client();
let params = CompleteTextParams {
text: "I feel hungry".to_string(),
completion_type: Some(CompletionType::ImproveWriting),
custom_prompt: None,
metadata: None,
};
let stream = client
.stream_completion_text(
"I feel hungry",
CompletionType::ContinueWriting,
None,
AIModel::Claude3Sonnet,
)
.stream_completion_text(params, AIModel::GPT4oMini)
.await
.unwrap();
let text = collect_stream_text(stream).await;
@ -21,13 +22,14 @@ async fn continue_writing_test() {
#[tokio::test]
async fn improve_writing_test() {
let client = appflowy_ai_client();
let params = CompleteTextParams {
text: "I fell tired because i sleep not very well last night".to_string(),
completion_type: Some(CompletionType::ImproveWriting),
custom_prompt: None,
metadata: None,
};
let stream = client
.stream_completion_text(
"I fell tired because i sleep not very well last night",
CompletionType::ImproveWriting,
None,
AIModel::GPT4oMini,
)
.stream_completion_text(params, AIModel::GPT4oMini)
.await
.unwrap();
@ -40,13 +42,14 @@ async fn improve_writing_test() {
#[tokio::test]
async fn make_text_shorter_text() {
let client = appflowy_ai_client();
let params = CompleteTextParams {
text: "I have an immense passion and deep-seated affection for Rust, a modern, multi-paradigm, high-performance programming language that I find incredibly satisfying to use due to its focus on safety, speed, and concurrency".to_string(),
completion_type: Some(CompletionType::MakeShorter),
custom_prompt: None,
metadata: None,
};
let stream = client
.stream_completion_text(
"I have an immense passion and deep-seated affection for Rust, a modern, multi-paradigm, high-performance programming language that I find incredibly satisfying to use due to its focus on safety, speed, and concurrency",
CompletionType::MakeShorter,
None,
AIModel::GPT4oMini
)
.stream_completion_text(params, AIModel::GPT4oMini)
.await
.unwrap();

View File

@ -135,9 +135,9 @@ where
pub async fn get_blob_metadata(
&self,
workspace_id: &Uuid,
store_key: &str,
metadata_key: &str,
) -> Result<AFBlobMetadataRow, AppError> {
let metadata = get_blob_metadata(&self.pg_pool, workspace_id, store_key).await?;
let metadata = get_blob_metadata(&self.pg_pool, workspace_id, metadata_key).await?;
Ok(metadata)
}

View File

@ -135,8 +135,14 @@ pub async fn delete_blob_metadata(
pub async fn get_blob_metadata(
pg_pool: &PgPool,
workspace_id: &Uuid,
file_id: &str,
metadata_key: &str,
) -> Result<AFBlobMetadataRow, AppError> {
tracing::trace!(
"get_blob_metadata: workspace_id: {}, metadata_key: {}",
workspace_id,
metadata_key
);
// file_id is the BlobPath's blob_metadata_key
let metadata = sqlx::query_as!(
AFBlobMetadataRow,
r#"
@ -144,7 +150,7 @@ pub async fn get_blob_metadata(
WHERE workspace_id = $1 AND file_id = $2
"#,
workspace_id,
file_id,
metadata_key,
)
.fetch_one(pg_pool)
.await?;

View File

@ -31,23 +31,6 @@ pub struct SummarizeRowResponse {
pub text: String,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CompleteTextParams {
pub text: String,
pub completion_type: Option<CompletionType>,
pub custom_prompt: Option<CustomPrompt>,
}
impl CompleteTextParams {
pub fn new_with_completion_type(text: String, completion_type: CompletionType) -> Self {
Self {
text,
completion_type: Some(completion_type),
custom_prompt: None,
}
}
}
#[derive(Debug)]
pub enum StringOrMessage {
Left(String),

View File

@ -8,10 +8,11 @@ events {
http {
# docker dns resolver
resolver 127.0.0.11 valid=10s;
#error_log /var/log/nginx/error.log debug;
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
default upgrade;
'' close;
}
map $http_origin $cors_origin {
@ -50,14 +51,14 @@ http {
# GoTrue
location /gotrue/ {
if ($request_method = 'OPTIONS') {
add_header 'Access-Control-Allow-Origin' $cors_origin always;
add_header 'Access-Control-Allow-Credentials' 'true' always;
add_header 'Access-Control-Allow-Headers' '*' always;
add_header 'Access-Control-Allow-Methods' 'GET, POST, PUT, DELETE, PATCH, OPTIONS' always;
add_header 'Access-Control-Max-Age' 3600 always;
add_header 'Content-Type' 'text/plain charset=UTF-8' always;
add_header 'Content-Length' 0 always;
return 204;
add_header 'Access-Control-Allow-Origin' $cors_origin always;
add_header 'Access-Control-Allow-Credentials' 'true' always;
add_header 'Access-Control-Allow-Headers' '*' always;
add_header 'Access-Control-Allow-Methods' 'GET, POST, PUT, DELETE, PATCH, OPTIONS' always;
add_header 'Access-Control-Max-Age' 3600 always;
add_header 'Content-Type' 'text/plain charset=UTF-8' always;
add_header 'Content-Length' 0 always;
return 204;
}
proxy_pass $gotrue_backend;
@ -81,57 +82,12 @@ http {
proxy_read_timeout 86400;
}
# AppFlowy-Cloud
location /api/chat {
proxy_pass $appflowy_cloud_backend;
proxy_http_version 1.1;
proxy_set_header Connection "";
chunked_transfer_encoding on;
proxy_buffering off;
proxy_cache off;
proxy_read_timeout 600s;
proxy_connect_timeout 600s;
proxy_send_timeout 600s;
}
location /api/import {
proxy_pass $appflowy_cloud_backend;
# Set headers
proxy_set_header X-Request-Id $request_id;
proxy_set_header Host $http_host;
# Handle CORS
add_header 'Access-Control-Allow-Origin' $cors_origin always;
add_header 'Access-Control-Allow-Methods' 'GET, POST, PUT, DELETE, PATCH, OPTIONS' always;
add_header 'Access-Control-Allow-Headers' 'Content-Type, Authorization, Accept' always;
add_header 'Access-Control-Max-Age' 3600 always;
# Timeouts
proxy_read_timeout 600s;
proxy_connect_timeout 600s;
proxy_send_timeout 600s;
# Disable buffering for large file uploads
proxy_request_buffering off;
proxy_buffering off;
proxy_cache off;
client_max_body_size 2G;
}
location /api {
proxy_pass $appflowy_cloud_backend;
proxy_set_header X-Request-Id $request_id;
proxy_set_header Host $http_host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# Set CORS headers for other requests
if ($request_method = 'OPTIONS') {
add_header 'Access-Control-Allow-Origin' $cors_origin always;
add_header 'Access-Control-Allow-Methods' 'GET, POST, PUT, DELETE, PATCH, OPTIONS' always;
@ -150,8 +106,49 @@ http {
proxy_request_buffering off;
client_max_body_size 256M;
}
# AppFlowy-Cloud
location /api/chat {
proxy_pass $appflowy_cloud_backend;
proxy_http_version 1.1;
proxy_set_header Connection "";
chunked_transfer_encoding on;
proxy_buffering off;
proxy_cache off;
proxy_read_timeout 600s;
proxy_connect_timeout 600s;
proxy_send_timeout 600s;
}
location /api/import {
proxy_pass $appflowy_cloud_backend;
# Set headers
proxy_set_header X-Request-Id $request_id;
proxy_set_header Host $http_host;
# Handle CORS
add_header 'Access-Control-Allow-Origin' $cors_origin always;
add_header 'Access-Control-Allow-Methods' 'GET, POST, PUT, DELETE, PATCH, OPTIONS' always;
add_header 'Access-Control-Allow-Headers' 'Content-Type, Authorization, Accept' always;
add_header 'Access-Control-Max-Age' 3600 always;
# Timeouts
proxy_read_timeout 600s;
proxy_connect_timeout 600s;
proxy_send_timeout 600s;
# Disable buffering for large file uploads
proxy_request_buffering off;
proxy_buffering off;
proxy_cache off;
client_max_body_size 2G;
}
}
# AppFlowy AI
location /ai {
proxy_pass $appflowy_ai_backend;

View File

@ -37,14 +37,10 @@ async fn stream_complete_text_handler(
) -> actix_web::Result<HttpResponse> {
let ai_model = ai_model_from_header(&req);
let params = payload.into_inner();
match state
.ai_client
.stream_completion_text(
&params.text,
params.completion_type,
params.custom_prompt,
ai_model,
)
.stream_completion_text(params, ai_model)
.await
{
Ok(stream) => Ok(

View File

@ -383,22 +383,22 @@ async fn get_text_with_image_message_test() {
let answer = collect_answer(answer_stream).await;
println!("answer:\n{}", answer);
let image_url = extract_image_url(&answer).unwrap();
let (workspace_id_url, chat_id_url, file_id_url) = test_client
let (workspace_id_2, chat_id_2, file_id_2) = test_client
.api_client
.parse_blob_url_v1(&image_url)
.unwrap();
assert_eq!(workspace_id, workspace_id_url);
assert_eq!(chat_id, chat_id_url);
assert_eq!(workspace_id, workspace_id_2);
assert_eq!(chat_id, chat_id_2);
let mut retries = 5;
let retry_interval = Duration::from_secs(10);
let mut retries = 6;
let retry_interval = Duration::from_secs(20);
let mut last_error = None;
// The image will be generated in the background, so we need to retry until it's available
while retries > 0 {
match test_client
.api_client
.get_blob_v1(&workspace_id_url, &chat_id_url, &file_id_url)
.get_blob_v1(&workspace_id_2, &chat_id_2, &file_id_2)
.await
{
Ok(_) => {
@ -407,14 +407,16 @@ async fn get_text_with_image_message_test() {
break;
},
Err(err) => {
eprintln!("Failed to get blob: {:?}", err);
// Save the error and retry
last_error = Some(err);
retries -= 1;
if retries > 0 {
tokio::time::sleep(retry_interval).await;
}
},
}
if retries > 0 {
tokio::time::sleep(retry_interval).await;
}
}
if let Some(err) = last_error {