Merge branch 'main' into stateless

This commit is contained in:
khorshuheng 2024-12-20 13:24:44 +08:00
commit 8674c9f3ad
10 changed files with 133 additions and 89 deletions

View File

@ -0,0 +1,42 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO af_quick_note (workspace_id, uid, data) VALUES ($1, $2, $3)\n RETURNING quick_note_id AS id, data, created_at AS \"created_at!\", updated_at AS \"last_updated_at!\"\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "id",
"type_info": "Uuid"
},
{
"ordinal": 1,
"name": "data",
"type_info": "Jsonb"
},
{
"ordinal": 2,
"name": "created_at!",
"type_info": "Timestamptz"
},
{
"ordinal": 3,
"name": "last_updated_at!",
"type_info": "Timestamptz"
}
],
"parameters": {
"Left": [
"Uuid",
"Int8",
"Jsonb"
]
},
"nullable": [
false,
false,
true,
true
]
},
"hash": "35622d4ebede28dd28b613edcf3970ad258286f176ce86e88bd662a602e4ad58"
}

View File

@ -1,5 +1,5 @@
use client_api_entity::{
CreateQuickNoteParams, ListQuickNotesQueryParams, QuickNotes, UpdateQuickNoteParams,
CreateQuickNoteParams, ListQuickNotesQueryParams, QuickNote, QuickNotes, UpdateQuickNoteParams,
};
use reqwest::Method;
use shared_entity::response::{AppResponse, AppResponseError};
@ -18,15 +18,21 @@ fn quick_note_resource_url(base_url: &str, workspace_id: Uuid, quick_note_id: Uu
// Quick Note API
impl Client {
pub async fn create_quick_note(&self, workspace_id: Uuid) -> Result<(), AppResponseError> {
pub async fn create_quick_note(
&self,
workspace_id: Uuid,
data: Option<serde_json::Value>,
) -> Result<QuickNote, AppResponseError> {
let url = quick_note_resources_url(&self.base_url, workspace_id);
let resp = self
.http_client_with_auth(Method::POST, &url)
.await?
.json(&CreateQuickNoteParams {})
.json(&CreateQuickNoteParams { data })
.send()
.await?;
AppResponse::<()>::from_response(resp).await?.into_error()
AppResponse::<QuickNote>::from_response(resp)
.await?
.into_data()
}
pub async fn list_quick_notes(

View File

@ -1182,7 +1182,9 @@ pub struct QuickNotes {
}
#[derive(Serialize, Deserialize, Debug)]
pub struct CreateQuickNoteParams {}
pub struct CreateQuickNoteParams {
pub data: Option<serde_json::Value>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct UpdateQuickNoteParams {

View File

@ -10,16 +10,20 @@ pub async fn insert_new_quick_note<'a, E: Executor<'a, Database = Postgres>>(
workspace_id: Uuid,
uid: i64,
data: &serde_json::Value,
) -> Result<(), AppError> {
sqlx::query!(
"INSERT INTO af_quick_note (workspace_id, uid, data) VALUES ($1, $2, $3)",
) -> Result<QuickNote, AppError> {
let quick_note = sqlx::query_as!(
QuickNote,
r#"
INSERT INTO af_quick_note (workspace_id, uid, data) VALUES ($1, $2, $3)
RETURNING quick_note_id AS id, data, created_at AS "created_at!", updated_at AS "last_updated_at!"
"#,
workspace_id,
uid,
data
)
.execute(executor)
.fetch_one(executor)
.await?;
Ok(())
Ok(quick_note)
}
pub async fn select_quick_notes_with_one_more_than_limit<
@ -46,7 +50,7 @@ pub async fn select_quick_notes_with_one_more_than_limit<
query_builder.push_bind(workspace_id);
query_builder.push(" AND uid = ");
query_builder.push_bind(uid);
if let Some(search_term) = search_term {
if let Some(search_term) = search_term.filter(|term| !term.is_empty()) {
query_builder.push(" AND data @? ");
let json_path_query = format!("'$.**.insert ? (@ like_regex \".*{}.*\")'", search_term);
query_builder.push(json_path_query);

View File

@ -2,7 +2,7 @@ use crate::config::get_env_var;
use crate::indexer::metrics::EmbeddingMetrics;
use crate::indexer::vector::embedder::Embedder;
use crate::indexer::vector::open_ai;
use crate::indexer::IndexerProvider;
use crate::indexer::{Indexer, IndexerProvider};
use crate::thread_pool_no_abort::{ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
use actix::dev::Stream;
use anyhow::anyhow;
@ -128,6 +128,10 @@ impl IndexerScheduler {
true
}
pub fn is_indexing_enabled(&self, collab_type: &CollabType) -> bool {
self.indexer_provider.is_indexing_enabled(collab_type)
}
fn create_embedder(&self) -> Result<Embedder, AppError> {
if self.config.openai_api_key.is_empty() {
return Err(AppError::AIServiceUnavailable(
@ -158,10 +162,16 @@ impl IndexerScheduler {
return Ok(());
}
let embedder = self.create_embedder()?;
let indexed_collab = indexed_collab.into();
let indexer = self
.indexer_provider
.indexer_for(&indexed_collab.collab_type);
if indexer.is_none() {
return Ok(());
}
let embedder = self.create_embedder()?;
let workspace_id = Uuid::parse_str(workspace_id)?;
let indexer_provider = self.indexer_provider.clone();
let tx = self.schedule_tx.clone();
let metrics = self.metrics.clone();
@ -177,7 +187,7 @@ impl IndexerScheduler {
return;
}
match process_collab(&embedder, &indexer_provider, &indexed_collab, &metrics) {
match process_collab(&embedder, indexer, &indexed_collab, &metrics) {
Ok(Some((tokens_used, contents))) => {
if let Err(err) = tx.send(EmbeddingRecord {
workspace_id,
@ -205,19 +215,21 @@ impl IndexerScheduler {
Ok(())
}
pub fn is_indexing_enabled(&self, collab_type: &CollabType) -> bool {
self.indexer_provider.is_indexing_enabled(collab_type)
}
pub fn index_encoded_collabs(
&self,
workspace_id: &str,
indexed_collabs: Vec<IndexedCollab>,
mut indexed_collabs: Vec<IndexedCollab>,
) -> Result<(), AppError> {
if !self.index_enabled() {
return Ok(());
}
indexed_collabs.retain(|collab| self.is_indexing_enabled(&collab.collab_type));
if indexed_collabs.is_empty() {
return Ok(());
}
info!("indexing {} collabs", indexed_collabs.len());
let embedder = self.create_embedder()?;
let workspace_id = Uuid::parse_str(workspace_id)?;
let indexer_provider = self.indexer_provider.clone();
@ -229,6 +241,7 @@ impl IndexerScheduler {
let embeddings_list = indexed_collabs
.into_par_iter()
.filter_map(|collab| {
let indexer = indexer_provider.indexer_for(&collab.collab_type)?;
let task = ActiveTask::new(collab.object_id.clone());
let task_created_at = task.created_at;
active_task.insert(collab.object_id.clone(), task);
@ -237,7 +250,7 @@ impl IndexerScheduler {
if !should_embed(&active_task, &collab.object_id, task_created_at) {
return None;
}
process_collab(&embedder, &indexer_provider, &collab, &metrics).ok()
process_collab(&embedder, Some(indexer), &collab, &metrics).ok()
})
.ok()
})
@ -275,6 +288,10 @@ impl IndexerScheduler {
return Ok(());
}
if !self.is_indexing_enabled(collab_type) {
return Ok(());
}
let indexer = self
.indexer_provider
.indexer_for(collab_type)
@ -572,11 +589,11 @@ async fn batch_insert_records(
/// This function must be called within the rayon thread pool.
fn process_collab(
embdder: &Embedder,
indexer_provider: &IndexerProvider,
indexer: Option<Arc<dyn Indexer>>,
indexed_collab: &IndexedCollab,
metrics: &EmbeddingMetrics,
) -> Result<Option<(u32, Vec<AFCollabEmbeddedChunk>)>, AppError> {
if let Some(indexer) = indexer_provider.indexer_for(&indexed_collab.collab_type) {
if let Some(indexer) = indexer {
metrics.record_embed_count(1);
let encode_collab = EncodedCollab::decode_from_bytes(&indexed_collab.encoded_collab)?;
let collab = Collab::new_with_source(

View File

@ -847,12 +847,13 @@ async fn batch_create_collab_handler(
.map(IndexedCollab::from)
.collect();
let len = indexed_collabs.len();
state
.indexer_scheduler
.index_encoded_collabs(&workspace_id, indexed_collabs)?;
tracing::info!("scheduled indexing for {} collabs", len);
if !indexed_collabs.is_empty() {
let len = indexed_collabs.len();
state
.indexer_scheduler
.index_encoded_collabs(&workspace_id, indexed_collabs)?;
tracing::info!("scheduled indexing for {} collabs", len);
}
}
let start = Instant::now();
@ -2453,15 +2454,17 @@ async fn post_quick_note_handler(
user_uuid: UserUuid,
workspace_id: web::Path<Uuid>,
state: Data<AppState>,
) -> Result<JsonAppResponse<()>> {
data: Json<CreateQuickNoteParams>,
) -> Result<JsonAppResponse<QuickNote>> {
let workspace_id = workspace_id.into_inner();
let uid = state.user_cache.get_user_uid(&user_uuid).await?;
state
.workspace_access_control
.enforce_role(&uid, &workspace_id.to_string(), AFRole::Member)
.await?;
create_quick_note(&state.pg_pool, uid, workspace_id).await?;
Ok(Json(AppResponse::Ok()))
let data = data.into_inner();
let quick_note = create_quick_note(&state.pg_pool, uid, workspace_id, data.data.as_ref()).await?;
Ok(Json(AppResponse::Ok().with_data(quick_note)))
}
async fn list_quick_notes_handler(

View File

@ -7,15 +7,25 @@ use serde_json::json;
use sqlx::PgPool;
use uuid::Uuid;
use database_entity::dto::QuickNotes;
use database_entity::dto::{QuickNote, QuickNotes};
pub async fn create_quick_note(
pg_pool: &PgPool,
uid: i64,
workspace_id: Uuid,
) -> Result<(), AppError> {
let default_data = json!([]);
insert_new_quick_note(pg_pool, workspace_id, uid, &default_data).await
data: Option<&serde_json::Value>,
) -> Result<QuickNote, AppError> {
let default_data = json!([
{
"type": "paragraph",
"delta": {
"insert": "",
},
}
]);
let new_data = data.unwrap_or(&default_data);
let quick_note = insert_new_quick_note(pg_pool, workspace_id, uid, new_data).await?;
Ok(quick_note)
}
pub async fn update_quick_note(

View File

@ -294,54 +294,6 @@ async fn generate_chat_message_answer_test() {
assert!(!answer.is_empty());
}
#[tokio::test]
async fn create_chat_context_test() {
if !ai_test_enabled() {
return;
}
let test_client = TestClient::new_user_without_ws_conn().await;
let workspace_id = test_client.workspace_id().await;
let chat_id = uuid::Uuid::new_v4().to_string();
let params = CreateChatParams {
chat_id: chat_id.clone(),
name: "context chat".to_string(),
rag_ids: vec![],
};
test_client
.api_client
.create_chat(&workspace_id, params)
.await
.unwrap();
let content = "Lacus have lived in the US for five years".to_string();
let metadata = ChatMessageMetadata {
data: ChatRAGData::from_text(content),
id: chat_id.clone(),
name: "".to_string(),
source: "appflowy".to_string(),
extra: None,
};
let params = CreateChatMessageParams::new_user("Where Lacus live?").with_metadata(metadata);
let question = test_client
.api_client
.create_question(&workspace_id, &chat_id, params)
.await
.unwrap();
let answer = test_client
.api_client
.get_answer(&workspace_id, &chat_id, question.message_id)
.await
.unwrap();
println!("answer: {:?}", answer);
if answer.content.contains("United States") {
return;
}
assert!(answer.content.contains("US"));
}
// #[tokio::test]
// async fn update_chat_message_test() {
// if !ai_test_enabled() {

View File

@ -106,10 +106,10 @@ async fn chat_with_multiple_selected_source_test() {
&test_client,
&workspace_id,
&chat_id,
"When do we take off to Japan? Just tell me the date, and if you don't know, Just say you dont know",
"When do we take off to Japan? Just tell me the date, and if you don't know, Just say you dont know the date for the trip to Japan",
)
.await;
let expected_unknown_japan_answer = r#"I dont know"#;
let expected_unknown_japan_answer = r#"I dont know the date for your trip to Japan"#;
test_client
.assert_similarity(&workspace_id, &answer, expected_unknown_japan_answer, 0.7)
.await;
@ -168,7 +168,7 @@ async fn chat_with_multiple_selected_source_test() {
&test_client,
&workspace_id,
&chat_id,
"When do we take off to Japan? Just tell me the date, and if you don't know, Just say you dont know",
"When do we take off to Japan? Just tell me the date, and if you don't know, Just say you dont know the date for the trip to Japan",
)
.await;
test_client

View File

@ -10,12 +10,14 @@ async fn quick_note_crud_test() {
let client = TestClient::new_user_without_ws_conn().await;
let workspace_id = client.workspace_id().await;
let workspace_uuid = Uuid::parse_str(&workspace_id).unwrap();
let mut quick_note_ids: Vec<Uuid> = vec![];
for _ in 0..2 {
client
let quick_note = client
.api_client
.create_quick_note(workspace_uuid)
.create_quick_note(workspace_uuid, None)
.await
.expect("create quick note");
quick_note_ids.push(quick_note.id);
// To ensure that the creation time is different
time::sleep(Duration::from_millis(1)).await;
}
@ -93,6 +95,12 @@ async fn quick_note_crud_test() {
.await
.expect("list quick notes");
assert_eq!(quick_notes.quick_notes.len(), 2);
let quick_notes = client
.api_client
.list_quick_notes(workspace_uuid, Some("".to_string()), None, None)
.await
.expect("list quick notes with empty search term");
assert_eq!(quick_notes.quick_notes.len(), 2);
let quick_notes_with_offset_and_limit = client
.api_client
.list_quick_notes(workspace_uuid, None, Some(1), Some(1))