diff --git a/.sqlx/query-1f3943b7aa640f6a3eeaf1301fd51137e21580ef13982a95d634c40070094779.json b/.sqlx/query-1f3943b7aa640f6a3eeaf1301fd51137e21580ef13982a95d634c40070094779.json deleted file mode 100644 index a0eda720..00000000 --- a/.sqlx/query-1f3943b7aa640f6a3eeaf1301fd51137e21580ef13982a95d634c40070094779.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO af_collab_member (uid, oid, permission_id)\n VALUES ($1, $2, $3)\n ON CONFLICT (uid, oid)\n DO UPDATE\n SET permission_id = excluded.permission_id;\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8", - "Text", - "Int4" - ] - }, - "nullable": [] - }, - "hash": "1f3943b7aa640f6a3eeaf1301fd51137e21580ef13982a95d634c40070094779" -} diff --git a/.sqlx/query-7e413ca3430f3270e9f903c8bc2151a5f354cd951c20dbd6261d65dbdb503cbc.json b/.sqlx/query-7e413ca3430f3270e9f903c8bc2151a5f354cd951c20dbd6261d65dbdb503cbc.json new file mode 100644 index 00000000..486ce7a1 --- /dev/null +++ b/.sqlx/query-7e413ca3430f3270e9f903c8bc2151a5f354cd951c20dbd6261d65dbdb503cbc.json @@ -0,0 +1,19 @@ +{ + "db_name": "PostgreSQL", + "query": "CALL af_collab_upsert($1, $2, $3, $4, $5, $6)", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Text", + "Int4", + "Int8", + "Int4", + "Bytea" + ] + }, + "nullable": [] + }, + "hash": "7e413ca3430f3270e9f903c8bc2151a5f354cd951c20dbd6261d65dbdb503cbc" +} diff --git a/.sqlx/query-9dd4d1e9fc5684558f765200c88eaf0dc5a4bd4d8bfa621ad1dc85010c965728.json b/.sqlx/query-9dd4d1e9fc5684558f765200c88eaf0dc5a4bd4d8bfa621ad1dc85010c965728.json deleted file mode 100644 index 9f635974..00000000 --- a/.sqlx/query-9dd4d1e9fc5684558f765200c88eaf0dc5a4bd4d8bfa621ad1dc85010c965728.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "SELECT workspace_id FROM af_collab WHERE oid = $1", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "workspace_id", - "type_info": "Uuid" - } - ], - "parameters": { - "Left": [ - "Text" - ] - }, - "nullable": [ - false - ] - }, - "hash": "9dd4d1e9fc5684558f765200c88eaf0dc5a4bd4d8bfa621ad1dc85010c965728" -} diff --git a/.sqlx/query-b7d2c2d32d4b221ed8c74d6549a9aa0e03922fcdddddb1c473eb9496b7bb9721.json b/.sqlx/query-b7d2c2d32d4b221ed8c74d6549a9aa0e03922fcdddddb1c473eb9496b7bb9721.json deleted file mode 100644 index 2a50aa45..00000000 --- a/.sqlx/query-b7d2c2d32d4b221ed8c74d6549a9aa0e03922fcdddddb1c473eb9496b7bb9721.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "DELETE FROM af_collab_embeddings WHERE oid = $1", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Text" - ] - }, - "nullable": [] - }, - "hash": "b7d2c2d32d4b221ed8c74d6549a9aa0e03922fcdddddb1c473eb9496b7bb9721" -} diff --git a/.sqlx/query-d4ce35ee25927744b46eb4c4a4b6b710669f8059b77e107b60993c939112e3a0.json b/.sqlx/query-d4ce35ee25927744b46eb4c4a4b6b710669f8059b77e107b60993c939112e3a0.json deleted file mode 100644 index 8b911533..00000000 --- a/.sqlx/query-d4ce35ee25927744b46eb4c4a4b6b710669f8059b77e107b60993c939112e3a0.json +++ /dev/null @@ -1,19 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "UPDATE af_collab SET blob = $3, len = $4, encrypt = $5, owner_uid = $6 WHERE oid = $1 AND partition_key = $2;", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Text", - "Int4", - "Bytea", - "Int4", - "Int4", - "Int8" - ] - }, - "nullable": [] - }, - "hash": "d4ce35ee25927744b46eb4c4a4b6b710669f8059b77e107b60993c939112e3a0" -} diff --git a/.sqlx/query-d6708c221831093b03524bd6b73974f6f41db00b5f58f47ad7180d068e273406.json b/.sqlx/query-d6708c221831093b03524bd6b73974f6f41db00b5f58f47ad7180d068e273406.json deleted file mode 100644 index 574d0fe4..00000000 --- a/.sqlx/query-d6708c221831093b03524bd6b73974f6f41db00b5f58f47ad7180d068e273406.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT rp.permission_id\n FROM af_role_permissions rp\n JOIN af_roles ON rp.role_id = af_roles.id\n WHERE af_roles.name = 'Owner';\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "permission_id", - "type_info": "Int4" - } - ], - "parameters": { - "Left": [] - }, - "nullable": [ - false - ] - }, - "hash": "d6708c221831093b03524bd6b73974f6f41db00b5f58f47ad7180d068e273406" -} diff --git a/.sqlx/query-e226a33d5e5db0b155b5245e05293072f0e16cb4e25840b064bf00469f8adcf4.json b/.sqlx/query-e226a33d5e5db0b155b5245e05293072f0e16cb4e25840b064bf00469f8adcf4.json deleted file mode 100644 index 641bd9ab..00000000 --- a/.sqlx/query-e226a33d5e5db0b155b5245e05293072f0e16cb4e25840b064bf00469f8adcf4.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "INSERT INTO af_collab (oid, blob, len, partition_key, encrypt, owner_uid, workspace_id)VALUES ($1, $2, $3, $4, $5, $6, $7)", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Text", - "Bytea", - "Int4", - "Int4", - "Int4", - "Int8", - "Uuid" - ] - }, - "nullable": [] - }, - "hash": "e226a33d5e5db0b155b5245e05293072f0e16cb4e25840b064bf00469f8adcf4" -} diff --git a/libs/client-api/src/http_view.rs b/libs/client-api/src/http_view.rs index 61177a8d..1bf56e7b 100644 --- a/libs/client-api/src/http_view.rs +++ b/libs/client-api/src/http_view.rs @@ -27,7 +27,7 @@ impl Client { pub async fn move_workspace_page_view_to_trash( &self, workspace_id: Uuid, - view_id: String, + view_id: &str, ) -> Result<(), AppResponseError> { let url = format!( "{}/api/workspace/{}/page-view/{}/move-to-trash", diff --git a/libs/database/src/collab/collab_db_ops.rs b/libs/database/src/collab/collab_db_ops.rs index 2301c603..1eae19ca 100644 --- a/libs/database/src/collab/collab_db_ops.rs +++ b/libs/database/src/collab/collab_db_ops.rs @@ -18,7 +18,7 @@ use sqlx::{Error, Executor, PgPool, Postgres, Row, Transaction}; use std::collections::HashMap; use std::fmt::Debug; use std::{ops::DerefMut, str::FromStr}; -use tracing::{error, event, instrument}; +use tracing::{error, instrument}; use uuid::Uuid; /// Inserts a new row into the `af_collab` table or updates an existing row if it matches the @@ -55,109 +55,33 @@ pub async fn insert_into_af_collab( let encrypt = 0; let partition_key = crate::collab::partition_key_from_collab_type(¶ms.collab_type); let workspace_id = Uuid::from_str(workspace_id)?; - let existing_workspace_id: Option = sqlx::query_scalar!( - "SELECT workspace_id FROM af_collab WHERE oid = $1", - ¶ms.object_id - ) - .fetch_optional(tx.deref_mut()) - .await?; - event!( - tracing::Level::TRACE, + tracing::trace!( "upsert collab:{}, len:{}", params.object_id, params.encoded_collab_v1.len(), ); - // If the collab already exists, update the row with the new data. - // In most cases, the workspace_id should be the same as the existing one. Comparing the workspace_id - // is a safety check to prevent a user from inserting a row with an existing object_id but a different - // workspace_id. - match existing_workspace_id { - Some(existing_workspace_id) => { - if existing_workspace_id == workspace_id { - sqlx::query!( - "UPDATE af_collab \ - SET blob = $3, len = $4, encrypt = $5, owner_uid = $6 WHERE oid = $1 AND partition_key = $2;", - params.object_id, - partition_key, - params.encoded_collab_v1.as_ref(), - params.encoded_collab_v1.len() as i32, - encrypt, - uid, - ) - .execute(tx.deref_mut()) - .await.map_err(|err| { - AppError::Internal(anyhow!( - "Update af_collab failed: workspace_id:{}, uid:{}, object_id:{}, collab_type:{}. error: {:?}", - workspace_id, uid, params.object_id, params.collab_type, err, - )) - })?; - } else { - return Err(AppError::Internal(anyhow!( - "workspace_id is not match. expect workspace_id:{}, but receive:{}", - existing_workspace_id, - workspace_id - ))); - } - }, - None => { - // If the collab doesn't exist, insert a new row into the `af_collab` table and add a corresponding - // entry to the `af_collab_member` table. - let permission_id: i32 = sqlx::query_scalar!( - r#" - SELECT rp.permission_id - FROM af_role_permissions rp - JOIN af_roles ON rp.role_id = af_roles.id - WHERE af_roles.name = 'Owner'; - "# - ) - .fetch_one(tx.deref_mut()) - .await?; - - sqlx::query!( - r#" - INSERT INTO af_collab_member (uid, oid, permission_id) - VALUES ($1, $2, $3) - ON CONFLICT (uid, oid) - DO UPDATE - SET permission_id = excluded.permission_id; - "#, - uid, - params.object_id, - permission_id - ) - .execute(tx.deref_mut()) - .await - .map_err(|err| { - AppError::Internal(anyhow!( - "Insert af_collab_member failed: {}:{}:{}. error details:{:?}", - uid, - params.object_id, - permission_id, - err - )) - })?; - - sqlx::query!( - "INSERT INTO af_collab (oid, blob, len, partition_key, encrypt, owner_uid, workspace_id)\ - VALUES ($1, $2, $3, $4, $5, $6, $7)", - params.object_id, - params.encoded_collab_v1.as_ref(), - params.encoded_collab_v1.len() as i32, - partition_key, - encrypt, - uid, - workspace_id, - ) - .execute(tx.deref_mut()) - .await.map_err(|err| { - AppError::Internal(anyhow!( - "Insert new af_collab failed: workspace_id:{}, uid:{}, object_id:{}, collab_type:{}. payload len:{} error: {:?}", - workspace_id, uid, params.object_id, params.collab_type, params.encoded_collab_v1.len(), err, - )) - })?; - }, - } + sqlx::query!( + r#"CALL af_collab_upsert($1, $2, $3, $4, $5, $6)"#, + workspace_id, + params.object_id, + partition_key, + *uid, + encrypt, + params.encoded_collab_v1.as_ref(), + ) + .execute(tx.deref_mut()) + .await + .map_err(|err| { + AppError::Internal(anyhow!( + "Update af_collab failed: workspace_id:{}, uid:{}, object_id:{}, collab_type:{}. error: {:?}", + workspace_id, + uid, + params.object_id, + params.collab_type, + err, + )) + })?; Ok(()) } diff --git a/libs/database/src/collab/disk_cache.rs b/libs/database/src/collab/disk_cache.rs index 296ee786..3c43310d 100644 --- a/libs/database/src/collab/disk_cache.rs +++ b/libs/database/src/collab/disk_cache.rs @@ -63,7 +63,13 @@ impl CollabDiskCache { em.tokens_consumed ); let workspace_id = Uuid::parse_str(workspace_id)?; - upsert_collab_embeddings(transaction, &workspace_id, em.tokens_consumed, &em.params).await?; + upsert_collab_embeddings( + transaction, + &workspace_id, + em.tokens_consumed, + em.params.clone(), + ) + .await?; } else if params.collab_type == CollabType::Document { tracing::info!("no embeddings to save for collab {}", params.object_id); } diff --git a/libs/database/src/index/collab_embeddings_ops.rs b/libs/database/src/index/collab_embeddings_ops.rs index 1722b54d..a97ff718 100644 --- a/libs/database/src/index/collab_embeddings_ops.rs +++ b/libs/database/src/index/collab_embeddings_ops.rs @@ -2,6 +2,7 @@ use std::ops::DerefMut; use collab_entity::CollabType; use pgvector::Vector; +use sqlx::postgres::{PgHasArrayType, PgTypeInfo}; use sqlx::{Error, Executor, Postgres, Transaction}; use uuid::Uuid; @@ -58,52 +59,52 @@ WHERE w.workspace_id = $1"#, } } +#[derive(sqlx::Type)] +#[sqlx(type_name = "af_fragment", no_pg_array)] +struct Fragment { + fragment_id: String, + content_type: i32, + contents: String, + embedding: Option, +} + +impl From for Fragment { + fn from(value: AFCollabEmbeddingParams) -> Self { + Fragment { + fragment_id: value.fragment_id, + content_type: value.content_type as i32, + contents: value.content, + embedding: value.embedding.map(Vector::from), + } + } +} + +impl PgHasArrayType for Fragment { + fn array_type_info() -> PgTypeInfo { + PgTypeInfo::with_name("af_fragment[]") + } +} + pub async fn upsert_collab_embeddings( tx: &mut Transaction<'_, sqlx::Postgres>, workspace_id: &Uuid, tokens_used: u32, - records: &[AFCollabEmbeddingParams], + records: Vec, ) -> Result<(), sqlx::Error> { - if tokens_used > 0 { - sqlx::query(r#" - INSERT INTO af_workspace_ai_usage(created_at, workspace_id, search_requests, search_tokens_consumed, index_tokens_consumed) - VALUES (now()::date, $1, 0, 0, $2) - ON CONFLICT (created_at, workspace_id) DO UPDATE - SET index_tokens_consumed = af_workspace_ai_usage.index_tokens_consumed + $2"#, - ) - .bind(workspace_id) - .bind(tokens_used as i64) - .execute(tx.deref_mut()) - .await?; + if records.is_empty() { + return Ok(()); } + let object_id = records[0].object_id.clone(); + let collab_type = records[0].collab_type.clone(); - if !records.is_empty() { - // replace existing collab embeddings - remove_collab_embeddings(tx, &records[0].object_id).await?; - for r in records { - sqlx::query( - r#"INSERT INTO af_collab_embeddings (fragment_id, oid, partition_key, content_type, content, embedding, indexed_at) - VALUES ($1, $2, $3, $4, $5, $6, NOW()) - ON CONFLICT (fragment_id) DO UPDATE SET content_type = $4, content = $5, embedding = $6, indexed_at = NOW()"#, - ) - .bind(&r.fragment_id) - .bind(&r.object_id) - .bind(crate::collab::partition_key_from_collab_type(&r.collab_type)) - .bind(r.content_type as i32) - .bind(&r.content) - .bind(r.embedding.clone().map(Vector::from)) - .execute(tx.deref_mut()) - .await?; - } - } - Ok(()) -} + let fragments = records.into_iter().map(Fragment::from).collect::>(); -pub async fn remove_collab_embeddings( - tx: &mut Transaction<'_, sqlx::Postgres>, - object_id: &str, -) -> Result<(), sqlx::Error> { - sqlx::query!("DELETE FROM af_collab_embeddings WHERE oid = $1", object_id) + sqlx::query(r#"CALL af_collab_embeddings_upsert($1, $2, $3, $4, $5::af_fragment[])"#) + .bind(*workspace_id) + .bind(object_id) + .bind(crate::collab::partition_key_from_collab_type(&collab_type)) + .bind(tokens_used as i32) + .bind(fragments) .execute(tx.deref_mut()) .await?; Ok(()) diff --git a/migrations/20241126175909_af_collab_stored_procedures.sql b/migrations/20241126175909_af_collab_stored_procedures.sql new file mode 100644 index 00000000..2e123b25 --- /dev/null +++ b/migrations/20241126175909_af_collab_stored_procedures.sql @@ -0,0 +1,50 @@ +CREATE OR REPLACE PROCEDURE af_collab_upsert( + IN p_workspace_id UUID, + IN p_oid TEXT, + IN p_partition_key INT, + IN p_uid BIGINT, + IN p_encrypt INT, + IN p_blob BYTEA +) +LANGUAGE plpgsql +AS $$ +BEGIN + INSERT INTO af_collab (oid, blob, len, partition_key, encrypt, owner_uid, workspace_id) + VALUES (p_oid, p_blob, LENGTH(p_blob), p_partition_key, p_encrypt, p_uid, p_workspace_id) + ON CONFLICT (oid, partition_key) + DO UPDATE SET blob = p_blob, len = LENGTH(p_blob), encrypt = p_encrypt, owner_uid = p_uid WHERE excluded.workspace_id = af_collab.workspace_id; + + INSERT INTO af_collab_member (uid, oid, permission_id) + SELECT p_uid, p_oid, rp.permission_id + FROM af_role_permissions rp + JOIN af_roles ON rp.role_id = af_roles.id + WHERE af_roles.name = 'Owner' + ON CONFLICT (uid, oid) + DO UPDATE SET permission_id = excluded.permission_id; +END +$$; + +CREATE TYPE af_fragment AS (fragment_id TEXT, content_type INT, contents TEXT, embedding VECTOR(1536)); + +CREATE OR REPLACE PROCEDURE af_collab_embeddings_upsert( + IN p_workspace_id UUID, + IN p_oid TEXT, + IN p_partition_key INT, + IN p_tokens_used INT, + IN p_fragments af_fragment[] +) +LANGUAGE plpgsql +AS $$ +BEGIN + DELETE FROM af_collab_embeddings WHERE oid = p_oid; + + INSERT INTO af_collab_embeddings (fragment_id, oid, partition_key, content_type, content, embedding, indexed_at) + SELECT f.fragment_id, p_oid, p_partition_key, f.content_type, f.contents, f.embedding, NOW() + FROM UNNEST(p_fragments) as f; + + INSERT INTO af_workspace_ai_usage(created_at, workspace_id, search_requests, search_tokens_consumed, index_tokens_consumed) + VALUES (now()::date, p_workspace_id, 0, 0, p_tokens_used) + ON CONFLICT (created_at, workspace_id) + DO UPDATE SET index_tokens_consumed = af_workspace_ai_usage.index_tokens_consumed + p_tokens_used; +END +$$; \ No newline at end of file diff --git a/services/appflowy-collaborate/src/indexer/provider.rs b/services/appflowy-collaborate/src/indexer/provider.rs index f56b6a07..e7e8eab9 100644 --- a/services/appflowy-collaborate/src/indexer/provider.rs +++ b/services/appflowy-collaborate/src/indexer/provider.rs @@ -163,7 +163,7 @@ impl IndexerProvider { &mut tx, &workspace_id, embeddings.tokens_consumed, - &embeddings.params, + embeddings.params, ) .await?; tx.commit().await?; diff --git a/src/biz/collab/ops.rs b/src/biz/collab/ops.rs index 3a41b80d..26b1d74e 100644 --- a/src/biz/collab/ops.rs +++ b/src/biz/collab/ops.rs @@ -820,7 +820,7 @@ fn type_options_serde( let mut result = HashMap::with_capacity(type_option.len()); for (key, value) in type_option { match field_type { - FieldType::SingleSelect | FieldType::MultiSelect => { + FieldType::SingleSelect | FieldType::MultiSelect | FieldType::Media => { if let yrs::Any::String(arc_str) = value { if let Ok(serde_value) = serde_json::from_str::(&arc_str) { result.insert(key.clone(), serde_value); diff --git a/src/biz/workspace/page_view.rs b/src/biz/workspace/page_view.rs index 833fe951..32f22dc4 100644 --- a/src/biz/workspace/page_view.rs +++ b/src/biz/workspace/page_view.rs @@ -553,9 +553,13 @@ async fn move_view_to_trash(view_id: &str, folder: &mut Folder) -> Result>(); + assert!(views_in_trash_for_app.contains(&general_space.view_id)); + for view in general_space.children.iter() { + assert!(!views_in_trash_for_app.contains(&view.view_id)); + } + let views_in_trash_for_web = web_client + .api_client + .get_workspace_trash(&workspace_id) + .await + .unwrap() + .views + .iter() + .map(|v| v.view.view_id.clone()) + .collect::>(); + assert!(views_in_trash_for_web.contains(&general_space.view_id)); + + web_client + .api_client + .restore_workspace_page_view_from_trash( + Uuid::parse_str(&workspace_id).unwrap(), + &general_space.view_id, + ) + .await + .unwrap(); + let folder = get_latest_folder(&app_client, &workspace_id).await; + assert!(!folder + .get_my_trash_sections() + .iter() + .any(|v| v.id == general_space.view_id)); + let view_found = web_client + .api_client + .get_workspace_trash(&workspace_id) + .await + .unwrap() + .views + .iter() + .any(|v| v.view.view_id == general_space.view_id); + assert!(!view_found); +} + #[tokio::test] async fn update_page() { let registered_user = generate_unique_registered_user().await;