diff --git a/Cargo.lock b/Cargo.lock index 82c528cd..cf4eff1a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -772,6 +772,7 @@ dependencies = [ name = "appflowy-indexer" version = "0.1.0" dependencies = [ + "appflowy-ai-client", "async-stream", "async-trait", "axum 0.7.4", @@ -790,7 +791,6 @@ dependencies = [ "humantime", "lazy_static", "log", - "openai_dive", "pgvector", "rand 0.8.5", "redis 0.25.2", diff --git a/deploy.env b/deploy.env index e3421340..5ce77fd2 100644 --- a/deploy.env +++ b/deploy.env @@ -81,9 +81,6 @@ APPFLOWY_S3_SECRET_KEY=minioadmin APPFLOWY_S3_BUCKET=appflowy #APPFLOWY_S3_REGION=us-east-1 -# Search API -APPFLOWY_OPENAI_API_KEY= - # AppFlowy Cloud Mailer APPFLOWY_MAILER_SMTP_HOST=smtp.gmail.com APPFLOWY_MAILER_SMTP_PORT=465 @@ -122,6 +119,5 @@ APPFLOWY_HISTORY_REDIS_URL=redis://redis:6379 APPFLOWY_HISTORY_DATABASE_URL=postgres://postgres:password@postgres:5432/postgres # AppFlowy Indexer -APPFLOWY_INDEXER_OPENAI_API_KEY= APPFLOWY_INDEXER_DATABASE_URL=postgres://postgres:password@postgres:5432/postgres APPFLOWY_INDEXER_REDIS_URL=redis://redis:6379 \ No newline at end of file diff --git a/dev.env b/dev.env index 6baa6ebd..0452f633 100644 --- a/dev.env +++ b/dev.env @@ -77,9 +77,6 @@ APPFLOWY_S3_SECRET_KEY=minioadmin APPFLOWY_S3_BUCKET=appflowy #APPFLOWY_S3_REGION=us-east-1 -# Search API -APPFLOWY_OPENAI_API_KEY= - # AppFlowy Cloud Mailer APPFLOWY_MAILER_SMTP_HOST=smtp.gmail.com APPFLOWY_MAILER_SMTP_USERNAME=notify@appflowy.io @@ -113,6 +110,5 @@ APPFLOWY_HISTORY_REDIS_URL=redis://redis:6379 APPFLOWY_HISTORY_DATABASE_URL=postgres://postgres:password@postgres:5432/postgres # AppFlowy Indexer -APPFLOWY_INDEXER_OPENAI_API_KEY= APPFLOWY_INDEXER_DATABASE_URL=postgres://postgres:password@postgres:5432/postgres APPFLOWY_INDEXER_REDIS_URL=redis://redis:6379 \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 5ec8121f..40d8bf6a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -162,7 +162,8 @@ services: - APPFLOWY_INDEXER_REDIS_URL=redis://redis:6379 - APPFLOWY_INDEXER_ENVIRONMENT=production - APPFLOWY_INDEXER_DATABASE_URL=${APPFLOWY_INDEXER_DATABASE_URL} - - APPFLOWY_INDEXER_OPENAI_API_KEY=${APPFLOWY_INDEXER_OPENAI_API_KEY} + - APPFLOWY_AI_SERVER_PORT=${APPFLOWY_AI_SERVER_PORT} + - APPFLOWY_AI_DATABASE_URL=${APPFLOWY_AI_DATABASE_URL} volumes: postgres_data: diff --git a/services/appflowy-indexer/Cargo.toml b/services/appflowy-indexer/Cargo.toml index d854cafd..60ddc3c0 100644 --- a/services/appflowy-indexer/Cargo.toml +++ b/services/appflowy-indexer/Cargo.toml @@ -20,7 +20,7 @@ tracing.workspace = true serde.workspace = true serde_json.workspace = true yrs.workspace = true -openai_dive = { workspace = true, features = ["rustls-tls"] } +appflowy-ai-client = { workspace = true, features = ["client-api"] } sqlx = { workspace = true, default-features = false, features = ["runtime-tokio-rustls", "macros", "postgres", "uuid", "chrono"] } tokio = { workspace = true, features = ["rt-multi-thread", "macros", "net", "sync"] } tokio-util = { version = "0.7" } diff --git a/services/appflowy-indexer/src/collab_handle.rs b/services/appflowy-indexer/src/collab_handle.rs index daa137e5..ce28e5f3 100644 --- a/services/appflowy-indexer/src/collab_handle.rs +++ b/services/appflowy-indexer/src/collab_handle.rs @@ -294,7 +294,7 @@ mod test { use crate::collab_handle::CollabHandle; use crate::indexer::{Indexer, PostgresIndexer}; use crate::test_utils::{ - collab_update_forwarder, db_pool, openai_client, redis_stream, setup_collab, + ai_client, collab_update_forwarder, db_pool, redis_stream, setup_collab, }; #[tokio::test] @@ -325,7 +325,7 @@ mod test { let object_id = object_id.to_string(); - let openai = openai_client(); + let openai = ai_client(); let indexer: Arc = Arc::new(PostgresIndexer::new(openai, db)); let stream_group = redis_stream diff --git a/services/appflowy-indexer/src/consumer.rs b/services/appflowy-indexer/src/consumer.rs index 75bbf00b..d2a59495 100644 --- a/services/appflowy-indexer/src/consumer.rs +++ b/services/appflowy-indexer/src/consumer.rs @@ -286,7 +286,7 @@ mod test { use crate::consumer::OpenCollabConsumer; use crate::indexer::PostgresIndexer; use crate::test_utils::{ - collab_update_forwarder, db_pool, openai_client, redis_stream, setup_collab, + ai_client, collab_update_forwarder, db_pool, redis_stream, setup_collab, }; use collab::core::collab::MutexCollab; use collab::preclude::Collab; @@ -309,7 +309,7 @@ mod test { let object_id = uuid::Uuid::new_v4(); let db = db_pool().await; - let openai = openai_client(); + let openai = ai_client(); let mut collab = Collab::new( uid, @@ -412,7 +412,7 @@ mod test { let object_id = uuid::Uuid::new_v4(); let db = db_pool().await; - let openai = openai_client(); + let openai = ai_client(); let mut collab = Collab::new( uid, diff --git a/services/appflowy-indexer/src/indexer.rs b/services/appflowy-indexer/src/indexer.rs index a5f68c3e..2b2f280d 100644 --- a/services/appflowy-indexer/src/indexer.rs +++ b/services/appflowy-indexer/src/indexer.rs @@ -1,16 +1,15 @@ use std::pin::Pin; +use appflowy_ai_client::client::AppFlowyAIClient; +use appflowy_ai_client::dto::{ + EmbeddingEncodingFormat, EmbeddingInput, EmbeddingOutput, EmbeddingRequest, EmbeddingsModel, +}; use async_stream::try_stream; use async_trait::async_trait; use collab::entity::EncodedCollab; use collab::error::CollabError; use collab_entity::CollabType; use futures::Stream; -use openai_dive::v1::api::Client; -use openai_dive::v1::models::EmbeddingsEngine; -use openai_dive::v1::resources::embedding::{ - EmbeddingEncodingFormat, EmbeddingInput, EmbeddingOutput, EmbeddingParameters, -}; use serde::{Deserialize, Serialize}; use sqlx::PgPool; use uuid::Uuid; @@ -112,21 +111,21 @@ impl From for AFCollabEmbeddingParams { } pub struct PostgresIndexer { - openai: Client, + ai_client: AppFlowyAIClient, db: PgPool, } impl PostgresIndexer { #[allow(dead_code)] - pub async fn open(openai_api_key: &str, pg_conn: &str) -> Result { - let openai = Client::new(openai_api_key.to_string()); + pub async fn open(appflowy_ai_url: &str, pg_conn: &str) -> Result { + let ai_client = AppFlowyAIClient::new(appflowy_ai_url); let db = PgPool::connect(pg_conn).await?; - Ok(Self { openai, db }) + Ok(Self { ai_client, db }) } #[allow(dead_code)] - pub fn new(openai: Client, db: PgPool) -> Self { - Self { openai, db } + pub fn new(ai_client: AppFlowyAIClient, db: PgPool) -> Self { + Self { ai_client, db } } async fn get_embeddings(&self, fragments: Vec) -> Result { @@ -135,25 +134,18 @@ impl PostgresIndexer { .map(|fragment| fragment.content.clone()) .collect(); let resp = self - .openai - .embeddings() - .create(EmbeddingParameters { + .ai_client + .embeddings(EmbeddingRequest { input: EmbeddingInput::StringArray(inputs), - model: EmbeddingsEngine::TextEmbedding3Small.to_string(), - encoding_format: Some(EmbeddingEncodingFormat::Float), - dimensions: Some(1536), // text-embedding-3-small default number of dimensions - user: None, + model: EmbeddingsModel::TextEmbedding3Small.to_string(), + chunk_size: 0, + encoding_format: EmbeddingEncodingFormat::Float, + dimensions: 1536, }) .await .map_err(|e| crate::error::Error::OpenAI(e.to_string()))?; tracing::trace!("fetched {} embeddings", resp.data.len()); - let tokens_used = if let Some(usage) = resp.usage { - tracing::info!("OpenAI API index tokens used: {}", usage.total_tokens); - usage.total_tokens - } else { - 0 - }; let mut fragments: Vec<_> = fragments.into_iter().map(EmbedFragment::from).collect(); for e in resp.data.into_iter() { @@ -168,7 +160,7 @@ impl PostgresIndexer { fragments[e.index as usize].embedding = Some(embedding); } Ok(Embeddings { - tokens_used, + tokens_used: resp.total_tokens as u32, fragments, }) } @@ -268,7 +260,7 @@ mod test { use database_entity::dto::EmbeddingContentType; use crate::indexer::{Indexer, PostgresIndexer}; - use crate::test_utils::{db_pool, openai_client, setup_collab}; + use crate::test_utils::{ai_client, db_pool, setup_collab}; #[tokio::test] async fn test_indexing_embeddings() { @@ -289,7 +281,7 @@ mod test { ) .await; - let openai = openai_client(); + let openai = ai_client(); let indexer = PostgresIndexer::new(openai, db); diff --git a/services/appflowy-indexer/src/main.rs b/services/appflowy-indexer/src/main.rs index ae334849..4d8ba883 100644 --- a/services/appflowy-indexer/src/main.rs +++ b/services/appflowy-indexer/src/main.rs @@ -24,8 +24,11 @@ pub struct Config { #[clap(long, env = "APPFLOWY_INDEXER_REDIS_URL")] pub redis_url: String, - #[clap(long, env = "APPFLOWY_INDEXER_OPENAI_API_KEY")] - pub openai_api_key: String, + #[clap(long, env = "APPFLOWY_AI_SERVER_HOST", default_value = "localhost")] + pub appflowy_ai_host: String, + + #[clap(long, env = "APPFLOWY_AI_SERVER_PORT", default_value = "5001")] + pub appflowy_ai_port: u16, #[clap(long, env = "APPFLOWY_INDEXER_DATABASE_URL")] pub database_url: String, @@ -52,6 +55,12 @@ pub struct Config { pub preindex: bool, } +impl Config { + pub fn appflowy_ai_url(&self) -> String { + format!("http://{}:{}", self.appflowy_ai_host, self.appflowy_ai_port) + } +} + #[tokio::main] async fn main() -> Result<(), Box> { dotenvy::dotenv().ok(); @@ -62,9 +71,10 @@ async fn main() -> Result<(), Box> { } async fn run_server(config: Config) -> Result<(), Box> { + let appflowy_ai_url = config.appflowy_ai_url(); let redis_client = redis::Client::open(config.redis_url)?; let collab_stream = CollabRedisStream::new(redis_client).await?; - let indexer = PostgresIndexer::open(&config.openai_api_key, &config.database_url).await?; + let indexer = PostgresIndexer::open(&appflowy_ai_url, &config.database_url).await?; tracing::info!("Starting AppFlowy Indexer..."); let consumer = OpenCollabConsumer::new( collab_stream, diff --git a/services/appflowy-indexer/src/test_utils.rs b/services/appflowy-indexer/src/test_utils.rs index 2a8351af..5e590342 100644 --- a/services/appflowy-indexer/src/test_utils.rs +++ b/services/appflowy-indexer/src/test_utils.rs @@ -3,6 +3,7 @@ use std::env; use std::ops::DerefMut; use std::sync::Arc; +use appflowy_ai_client::client::AppFlowyAIClient; use collab::core::collab::MutexCollab; use collab::entity::EncodedCollab; use collab_entity::CollabType; @@ -20,8 +21,8 @@ use database::user::create_user; use database_entity::dto::CollabParams; lazy_static! { - pub static ref APPFLOWY_INDEXER_OPENAI_API_KEY: Cow<'static, str> = - get_env_var("APPFLOWY_INDEXER_OPENAI_API_KEY", ""); + pub static ref APPFLOWY_INDEXER_AI_URL: Cow<'static, str> = + get_env_var("APPFLOWY_INDEXER_AI_URL", "http://localhost:5001"); pub static ref APPFLOWY_INDEXER_DATABASE_URL: Cow<'static, str> = get_env_var( "APPFLOWY_INDEXER_DATABASE_URL", "postgres://postgres:password@localhost:5432/postgres" @@ -42,8 +43,8 @@ fn get_env_var<'default>(key: &str, default: &'default str) -> Cow<'default, str } } -pub fn openai_client() -> openai_dive::v1::api::Client { - openai_dive::v1::api::Client::new(APPFLOWY_INDEXER_OPENAI_API_KEY.to_string()) +pub fn ai_client() -> AppFlowyAIClient { + AppFlowyAIClient::new(&APPFLOWY_INDEXER_AI_URL) } pub async fn db_pool() -> PgPool {