chore: appflowy indexer uses appflowy ai as open ai proxy

This commit is contained in:
Bartosz Sypytkowski 2024-06-18 08:33:24 +02:00
parent 9ea4de06fd
commit ae2a2a4fa0
10 changed files with 46 additions and 50 deletions

2
Cargo.lock generated
View File

@ -772,6 +772,7 @@ dependencies = [
name = "appflowy-indexer"
version = "0.1.0"
dependencies = [
"appflowy-ai-client",
"async-stream",
"async-trait",
"axum 0.7.4",
@ -790,7 +791,6 @@ dependencies = [
"humantime",
"lazy_static",
"log",
"openai_dive",
"pgvector",
"rand 0.8.5",
"redis 0.25.2",

View File

@ -81,9 +81,6 @@ APPFLOWY_S3_SECRET_KEY=minioadmin
APPFLOWY_S3_BUCKET=appflowy
#APPFLOWY_S3_REGION=us-east-1
# Search API
APPFLOWY_OPENAI_API_KEY=
# AppFlowy Cloud Mailer
APPFLOWY_MAILER_SMTP_HOST=smtp.gmail.com
APPFLOWY_MAILER_SMTP_PORT=465
@ -122,6 +119,5 @@ APPFLOWY_HISTORY_REDIS_URL=redis://redis:6379
APPFLOWY_HISTORY_DATABASE_URL=postgres://postgres:password@postgres:5432/postgres
# AppFlowy Indexer
APPFLOWY_INDEXER_OPENAI_API_KEY=
APPFLOWY_INDEXER_DATABASE_URL=postgres://postgres:password@postgres:5432/postgres
APPFLOWY_INDEXER_REDIS_URL=redis://redis:6379

View File

@ -77,9 +77,6 @@ APPFLOWY_S3_SECRET_KEY=minioadmin
APPFLOWY_S3_BUCKET=appflowy
#APPFLOWY_S3_REGION=us-east-1
# Search API
APPFLOWY_OPENAI_API_KEY=
# AppFlowy Cloud Mailer
APPFLOWY_MAILER_SMTP_HOST=smtp.gmail.com
APPFLOWY_MAILER_SMTP_USERNAME=notify@appflowy.io
@ -113,6 +110,5 @@ APPFLOWY_HISTORY_REDIS_URL=redis://redis:6379
APPFLOWY_HISTORY_DATABASE_URL=postgres://postgres:password@postgres:5432/postgres
# AppFlowy Indexer
APPFLOWY_INDEXER_OPENAI_API_KEY=
APPFLOWY_INDEXER_DATABASE_URL=postgres://postgres:password@postgres:5432/postgres
APPFLOWY_INDEXER_REDIS_URL=redis://redis:6379

View File

@ -162,7 +162,8 @@ services:
- APPFLOWY_INDEXER_REDIS_URL=redis://redis:6379
- APPFLOWY_INDEXER_ENVIRONMENT=production
- APPFLOWY_INDEXER_DATABASE_URL=${APPFLOWY_INDEXER_DATABASE_URL}
- APPFLOWY_INDEXER_OPENAI_API_KEY=${APPFLOWY_INDEXER_OPENAI_API_KEY}
- APPFLOWY_AI_SERVER_PORT=${APPFLOWY_AI_SERVER_PORT}
- APPFLOWY_AI_DATABASE_URL=${APPFLOWY_AI_DATABASE_URL}
volumes:
postgres_data:

View File

@ -20,7 +20,7 @@ tracing.workspace = true
serde.workspace = true
serde_json.workspace = true
yrs.workspace = true
openai_dive = { workspace = true, features = ["rustls-tls"] }
appflowy-ai-client = { workspace = true, features = ["client-api"] }
sqlx = { workspace = true, default-features = false, features = ["runtime-tokio-rustls", "macros", "postgres", "uuid", "chrono"] }
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "net", "sync"] }
tokio-util = { version = "0.7" }

View File

@ -294,7 +294,7 @@ mod test {
use crate::collab_handle::CollabHandle;
use crate::indexer::{Indexer, PostgresIndexer};
use crate::test_utils::{
collab_update_forwarder, db_pool, openai_client, redis_stream, setup_collab,
ai_client, collab_update_forwarder, db_pool, redis_stream, setup_collab,
};
#[tokio::test]
@ -325,7 +325,7 @@ mod test {
let object_id = object_id.to_string();
let openai = openai_client();
let openai = ai_client();
let indexer: Arc<dyn Indexer> = Arc::new(PostgresIndexer::new(openai, db));
let stream_group = redis_stream

View File

@ -286,7 +286,7 @@ mod test {
use crate::consumer::OpenCollabConsumer;
use crate::indexer::PostgresIndexer;
use crate::test_utils::{
collab_update_forwarder, db_pool, openai_client, redis_stream, setup_collab,
ai_client, collab_update_forwarder, db_pool, redis_stream, setup_collab,
};
use collab::core::collab::MutexCollab;
use collab::preclude::Collab;
@ -309,7 +309,7 @@ mod test {
let object_id = uuid::Uuid::new_v4();
let db = db_pool().await;
let openai = openai_client();
let openai = ai_client();
let mut collab = Collab::new(
uid,
@ -412,7 +412,7 @@ mod test {
let object_id = uuid::Uuid::new_v4();
let db = db_pool().await;
let openai = openai_client();
let openai = ai_client();
let mut collab = Collab::new(
uid,

View File

@ -1,16 +1,15 @@
use std::pin::Pin;
use appflowy_ai_client::client::AppFlowyAIClient;
use appflowy_ai_client::dto::{
EmbeddingEncodingFormat, EmbeddingInput, EmbeddingOutput, EmbeddingRequest, EmbeddingsModel,
};
use async_stream::try_stream;
use async_trait::async_trait;
use collab::entity::EncodedCollab;
use collab::error::CollabError;
use collab_entity::CollabType;
use futures::Stream;
use openai_dive::v1::api::Client;
use openai_dive::v1::models::EmbeddingsEngine;
use openai_dive::v1::resources::embedding::{
EmbeddingEncodingFormat, EmbeddingInput, EmbeddingOutput, EmbeddingParameters,
};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use uuid::Uuid;
@ -112,21 +111,21 @@ impl From<EmbedFragment> for AFCollabEmbeddingParams {
}
pub struct PostgresIndexer {
openai: Client,
ai_client: AppFlowyAIClient,
db: PgPool,
}
impl PostgresIndexer {
#[allow(dead_code)]
pub async fn open(openai_api_key: &str, pg_conn: &str) -> Result<Self> {
let openai = Client::new(openai_api_key.to_string());
pub async fn open(appflowy_ai_url: &str, pg_conn: &str) -> Result<Self> {
let ai_client = AppFlowyAIClient::new(appflowy_ai_url);
let db = PgPool::connect(pg_conn).await?;
Ok(Self { openai, db })
Ok(Self { ai_client, db })
}
#[allow(dead_code)]
pub fn new(openai: Client, db: PgPool) -> Self {
Self { openai, db }
pub fn new(ai_client: AppFlowyAIClient, db: PgPool) -> Self {
Self { ai_client, db }
}
async fn get_embeddings(&self, fragments: Vec<Fragment>) -> Result<Embeddings> {
@ -135,25 +134,18 @@ impl PostgresIndexer {
.map(|fragment| fragment.content.clone())
.collect();
let resp = self
.openai
.embeddings()
.create(EmbeddingParameters {
.ai_client
.embeddings(EmbeddingRequest {
input: EmbeddingInput::StringArray(inputs),
model: EmbeddingsEngine::TextEmbedding3Small.to_string(),
encoding_format: Some(EmbeddingEncodingFormat::Float),
dimensions: Some(1536), // text-embedding-3-small default number of dimensions
user: None,
model: EmbeddingsModel::TextEmbedding3Small.to_string(),
chunk_size: 0,
encoding_format: EmbeddingEncodingFormat::Float,
dimensions: 1536,
})
.await
.map_err(|e| crate::error::Error::OpenAI(e.to_string()))?;
tracing::trace!("fetched {} embeddings", resp.data.len());
let tokens_used = if let Some(usage) = resp.usage {
tracing::info!("OpenAI API index tokens used: {}", usage.total_tokens);
usage.total_tokens
} else {
0
};
let mut fragments: Vec<_> = fragments.into_iter().map(EmbedFragment::from).collect();
for e in resp.data.into_iter() {
@ -168,7 +160,7 @@ impl PostgresIndexer {
fragments[e.index as usize].embedding = Some(embedding);
}
Ok(Embeddings {
tokens_used,
tokens_used: resp.total_tokens as u32,
fragments,
})
}
@ -268,7 +260,7 @@ mod test {
use database_entity::dto::EmbeddingContentType;
use crate::indexer::{Indexer, PostgresIndexer};
use crate::test_utils::{db_pool, openai_client, setup_collab};
use crate::test_utils::{ai_client, db_pool, setup_collab};
#[tokio::test]
async fn test_indexing_embeddings() {
@ -289,7 +281,7 @@ mod test {
)
.await;
let openai = openai_client();
let openai = ai_client();
let indexer = PostgresIndexer::new(openai, db);

View File

@ -24,8 +24,11 @@ pub struct Config {
#[clap(long, env = "APPFLOWY_INDEXER_REDIS_URL")]
pub redis_url: String,
#[clap(long, env = "APPFLOWY_INDEXER_OPENAI_API_KEY")]
pub openai_api_key: String,
#[clap(long, env = "APPFLOWY_AI_SERVER_HOST", default_value = "localhost")]
pub appflowy_ai_host: String,
#[clap(long, env = "APPFLOWY_AI_SERVER_PORT", default_value = "5001")]
pub appflowy_ai_port: u16,
#[clap(long, env = "APPFLOWY_INDEXER_DATABASE_URL")]
pub database_url: String,
@ -52,6 +55,12 @@ pub struct Config {
pub preindex: bool,
}
impl Config {
pub fn appflowy_ai_url(&self) -> String {
format!("http://{}:{}", self.appflowy_ai_host, self.appflowy_ai_port)
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
dotenvy::dotenv().ok();
@ -62,9 +71,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
async fn run_server(config: Config) -> Result<(), Box<dyn std::error::Error>> {
let appflowy_ai_url = config.appflowy_ai_url();
let redis_client = redis::Client::open(config.redis_url)?;
let collab_stream = CollabRedisStream::new(redis_client).await?;
let indexer = PostgresIndexer::open(&config.openai_api_key, &config.database_url).await?;
let indexer = PostgresIndexer::open(&appflowy_ai_url, &config.database_url).await?;
tracing::info!("Starting AppFlowy Indexer...");
let consumer = OpenCollabConsumer::new(
collab_stream,

View File

@ -3,6 +3,7 @@ use std::env;
use std::ops::DerefMut;
use std::sync::Arc;
use appflowy_ai_client::client::AppFlowyAIClient;
use collab::core::collab::MutexCollab;
use collab::entity::EncodedCollab;
use collab_entity::CollabType;
@ -20,8 +21,8 @@ use database::user::create_user;
use database_entity::dto::CollabParams;
lazy_static! {
pub static ref APPFLOWY_INDEXER_OPENAI_API_KEY: Cow<'static, str> =
get_env_var("APPFLOWY_INDEXER_OPENAI_API_KEY", "");
pub static ref APPFLOWY_INDEXER_AI_URL: Cow<'static, str> =
get_env_var("APPFLOWY_INDEXER_AI_URL", "http://localhost:5001");
pub static ref APPFLOWY_INDEXER_DATABASE_URL: Cow<'static, str> = get_env_var(
"APPFLOWY_INDEXER_DATABASE_URL",
"postgres://postgres:password@localhost:5432/postgres"
@ -42,8 +43,8 @@ fn get_env_var<'default>(key: &str, default: &'default str) -> Cow<'default, str
}
}
pub fn openai_client() -> openai_dive::v1::api::Client {
openai_dive::v1::api::Client::new(APPFLOWY_INDEXER_OPENAI_API_KEY.to_string())
pub fn ai_client() -> AppFlowyAIClient {
AppFlowyAIClient::new(&APPFLOWY_INDEXER_AI_URL)
}
pub async fn db_pool() -> PgPool {