Merge pull request #1017 from AppFlowy-IO/feat/database-row-update

feat: add a polling api for getting database row id updates
This commit is contained in:
Zack 2024-11-25 02:48:32 -08:00 committed by GitHub
commit 9e067e618b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 190 additions and 10 deletions

View File

@ -0,0 +1,30 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT\n updated_at as updated_at,\n oid as row_id\n FROM af_collab_database_row\n WHERE workspace_id = $1\n AND oid = ANY($2)\n AND updated_at > $3\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "updated_at",
"type_info": "Timestamptz"
},
{
"ordinal": 1,
"name": "row_id",
"type_info": "Text"
}
],
"parameters": {
"Left": [
"Uuid",
"TextArray",
"Timestamptz"
]
},
"nullable": [
false,
false
]
},
"hash": "1331f64dbbf63fc694e3358aefd2bdc4b3bcff64eda36420acde1a948884239d"
}

View File

@ -2,8 +2,10 @@ use crate::http::log_request_id;
use crate::{blocking_brotli_compress, brotli_compress, Client}; use crate::{blocking_brotli_compress, brotli_compress, Client};
use app_error::AppError; use app_error::AppError;
use bytes::Bytes; use bytes::Bytes;
use chrono::{DateTime, Utc};
use client_api_entity::workspace_dto::{ use client_api_entity::workspace_dto::{
AFDatabase, AFDatabaseRow, AFDatabaseRowDetail, ListDatabaseRowDetailParam, AFDatabase, AFDatabaseRow, AFDatabaseRowDetail, DatabaseRowUpdatedItem,
ListDatabaseRowDetailParam, ListDatabaseRowUpdatedParam,
}; };
use client_api_entity::{ use client_api_entity::{
BatchQueryCollabParams, BatchQueryCollabResult, CollabParams, CreateCollabParams, BatchQueryCollabParams, BatchQueryCollabResult, CollabParams, CreateCollabParams,
@ -190,6 +192,26 @@ impl Client {
AppResponse::from_response(resp).await?.into_data() AppResponse::from_response(resp).await?.into_data()
} }
pub async fn list_database_row_ids_updated(
&self,
workspace_id: &str,
database_id: &str,
after: Option<DateTime<Utc>>,
) -> Result<Vec<DatabaseRowUpdatedItem>, AppResponseError> {
let url = format!(
"{}/api/workspace/{}/database/{}/row/updated",
self.base_url, workspace_id, database_id
);
let resp = self
.http_client_with_auth(Method::GET, &url)
.await?
.query(&ListDatabaseRowUpdatedParam { after })
.send()
.await?;
log_request_id(&resp);
AppResponse::from_response(resp).await?.into_data()
}
pub async fn list_database_row_details( pub async fn list_database_row_details(
&self, &self,
workspace_id: &str, workspace_id: &str,

View File

@ -4,12 +4,13 @@ use database_entity::dto::{
AFAccessLevel, AFCollabMember, AFPermission, AFSnapshotMeta, AFSnapshotMetas, CollabParams, AFAccessLevel, AFCollabMember, AFPermission, AFSnapshotMeta, AFSnapshotMetas, CollabParams,
QueryCollab, QueryCollabResult, RawData, QueryCollab, QueryCollabResult, RawData,
}; };
use shared_entity::dto::workspace_dto::DatabaseRowUpdatedItem;
use crate::collab::{partition_key_from_collab_type, SNAPSHOT_PER_HOUR}; use crate::collab::{partition_key_from_collab_type, SNAPSHOT_PER_HOUR};
use crate::pg_row::AFSnapshotRow; use crate::pg_row::AFSnapshotRow;
use crate::pg_row::{AFCollabMemberAccessLevelRow, AFCollabRowMeta}; use crate::pg_row::{AFCollabMemberAccessLevelRow, AFCollabRowMeta};
use app_error::AppError; use app_error::AppError;
use chrono::{Duration, Utc}; use chrono::{DateTime, Duration, Utc};
use futures_util::stream::BoxStream; use futures_util::stream::BoxStream;
use sqlx::postgres::PgRow; use sqlx::postgres::PgRow;
@ -792,3 +793,29 @@ pub async fn select_workspace_database_oid<'a, E: Executor<'a, Database = Postgr
.fetch_one(executor) .fetch_one(executor)
.await .await
} }
pub async fn select_last_updated_database_row_ids(
pg_pool: &PgPool,
workspace_id: &Uuid,
row_ids: &[String],
after: &DateTime<Utc>,
) -> Result<Vec<DatabaseRowUpdatedItem>, sqlx::Error> {
let updated_row_items = sqlx::query_as!(
DatabaseRowUpdatedItem,
r#"
SELECT
updated_at as updated_at,
oid as row_id
FROM af_collab_database_row
WHERE workspace_id = $1
AND oid = ANY($2)
AND updated_at > $3
"#,
workspace_id,
row_ids,
after,
)
.fetch_all(pg_pool)
.await?;
Ok(updated_row_items)
}

View File

@ -300,6 +300,17 @@ pub struct ListDatabaseRowDetailParam {
pub ids: String, pub ids: String,
} }
#[derive(Default, Debug, Deserialize, Serialize)]
pub struct ListDatabaseRowUpdatedParam {
pub after: Option<DateTime<Utc>>,
}
#[derive(Default, Debug, Deserialize, Serialize)]
pub struct DatabaseRowUpdatedItem {
pub updated_at: DateTime<Utc>,
pub row_id: String,
}
impl ListDatabaseRowDetailParam { impl ListDatabaseRowDetailParam {
pub fn from(ids: &[&str]) -> Self { pub fn from(ids: &[&str]) -> Self {
Self { ids: ids.join(",") } Self { ids: ids.join(",") }

View File

@ -0,0 +1,22 @@
-- Add `updated_at` column to `af_collab` table
ALTER TABLE public.af_collab
ADD COLUMN updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP;
-- Create or replace function to update `updated_at` column
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = CURRENT_TIMESTAMP;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- Create trigger to update `updated_at` column
CREATE TRIGGER set_updated_at
BEFORE INSERT OR UPDATE ON public.af_collab
FOR EACH ROW
EXECUTE FUNCTION update_updated_at_column();
-- Create index on `updated_at` column
CREATE INDEX idx_af_collab_updated_at
ON public.af_collab (updated_at);

View File

@ -5,6 +5,7 @@ use actix_web::{web, Scope};
use actix_web::{HttpRequest, Result}; use actix_web::{HttpRequest, Result};
use anyhow::{anyhow, Context}; use anyhow::{anyhow, Context};
use bytes::BytesMut; use bytes::BytesMut;
use chrono::{DateTime, Duration, Utc};
use collab::entity::EncodedCollab; use collab::entity::EncodedCollab;
use collab_entity::CollabType; use collab_entity::CollabType;
use futures_util::future::try_join_all; use futures_util::future::try_join_all;
@ -260,6 +261,10 @@ pub fn workspace_scope() -> Scope {
web::resource("/{workspace_id}/database/{database_id}/row") web::resource("/{workspace_id}/database/{database_id}/row")
.route(web::get().to(list_database_row_id_handler)), .route(web::get().to(list_database_row_id_handler)),
) )
.service(
web::resource("/{workspace_id}/database/{database_id}/row/updated")
.route(web::get().to(list_database_row_id_updated_handler)),
)
.service( .service(
web::resource("/{workspace_id}/database/{database_id}/row/detail") web::resource("/{workspace_id}/database/{database_id}/row/detail")
.route(web::get().to(list_database_row_details_handler)), .route(web::get().to(list_database_row_details_handler)),
@ -1892,9 +1897,42 @@ async fn list_database_row_id_handler(
.enforce_action(&uid, &workspace_id, Action::Read) .enforce_action(&uid, &workspace_id, Action::Read)
.await?; .await?;
let db_rows = let db_rows = biz::collab::ops::list_database_row_ids(
biz::collab::ops::list_database_row(&state.collab_access_control_storage, workspace_id, db_id) &state.collab_access_control_storage,
.await?; &workspace_id,
&db_id,
)
.await?;
Ok(Json(AppResponse::Ok().with_data(db_rows)))
}
async fn list_database_row_id_updated_handler(
user_uuid: UserUuid,
path_param: web::Path<(String, String)>,
state: Data<AppState>,
param: web::Query<ListDatabaseRowUpdatedParam>,
) -> Result<Json<AppResponse<Vec<DatabaseRowUpdatedItem>>>> {
let (workspace_id, db_id) = path_param.into_inner();
let uid = state.user_cache.get_user_uid(&user_uuid).await?;
state
.workspace_access_control
.enforce_action(&uid, &workspace_id, Action::Read)
.await?;
// Default to 1 hour ago
let after: DateTime<Utc> = param
.after
.unwrap_or_else(|| Utc::now() - Duration::hours(1));
let db_rows = biz::collab::ops::list_database_row_ids_updated(
&state.collab_access_control_storage,
&state.pg_pool,
&workspace_id,
&db_id,
&after,
)
.await?;
Ok(Json(AppResponse::Ok().with_data(db_rows))) Ok(Json(AppResponse::Ok().with_data(db_rows)))
} }

View File

@ -3,6 +3,8 @@ use std::sync::Arc;
use app_error::AppError; use app_error::AppError;
use appflowy_collaborate::collab::storage::CollabAccessControlStorage; use appflowy_collaborate::collab::storage::CollabAccessControlStorage;
use chrono::DateTime;
use chrono::Utc;
use collab::preclude::Collab; use collab::preclude::Collab;
use collab_database::database::DatabaseBody; use collab_database::database::DatabaseBody;
use collab_database::entity::FieldType; use collab_database::entity::FieldType;
@ -15,6 +17,7 @@ use collab_entity::CollabType;
use collab_entity::EncodedCollab; use collab_entity::EncodedCollab;
use collab_folder::SectionItem; use collab_folder::SectionItem;
use collab_folder::{CollabOrigin, Folder}; use collab_folder::{CollabOrigin, Folder};
use database::collab::select_last_updated_database_row_ids;
use database::collab::select_workspace_database_oid; use database::collab::select_workspace_database_oid;
use database::collab::{CollabStorage, GetCollabOrigin}; use database::collab::{CollabStorage, GetCollabOrigin};
use database::publish::select_published_view_ids_for_workspace; use database::publish::select_published_view_ids_for_workspace;
@ -24,6 +27,7 @@ use database_entity::dto::{QueryCollab, QueryCollabParams};
use shared_entity::dto::workspace_dto::AFDatabase; use shared_entity::dto::workspace_dto::AFDatabase;
use shared_entity::dto::workspace_dto::AFDatabaseRow; use shared_entity::dto::workspace_dto::AFDatabaseRow;
use shared_entity::dto::workspace_dto::AFDatabaseRowDetail; use shared_entity::dto::workspace_dto::AFDatabaseRowDetail;
use shared_entity::dto::workspace_dto::DatabaseRowUpdatedItem;
use shared_entity::dto::workspace_dto::FavoriteFolderView; use shared_entity::dto::workspace_dto::FavoriteFolderView;
use shared_entity::dto::workspace_dto::FolderViewMinimal; use shared_entity::dto::workspace_dto::FolderViewMinimal;
use shared_entity::dto::workspace_dto::RecentFolderView; use shared_entity::dto::workspace_dto::RecentFolderView;
@ -435,16 +439,16 @@ pub async fn list_database(
Ok(af_databases) Ok(af_databases)
} }
pub async fn list_database_row( pub async fn list_database_row_ids(
collab_storage: &CollabAccessControlStorage, collab_storage: &CollabAccessControlStorage,
workspace_uuid_str: String, workspace_uuid_str: &str,
database_uuid_str: String, database_uuid_str: &str,
) -> Result<Vec<AFDatabaseRow>, AppError> { ) -> Result<Vec<AFDatabaseRow>, AppError> {
let db_collab = get_latest_collab( let db_collab = get_latest_collab(
collab_storage, collab_storage,
GetCollabOrigin::Server, GetCollabOrigin::Server,
&workspace_uuid_str, workspace_uuid_str,
&database_uuid_str, database_uuid_str,
CollabType::Database, CollabType::Database,
) )
.await?; .await?;
@ -479,6 +483,25 @@ pub async fn list_database_row(
Ok(db_rows) Ok(db_rows)
} }
pub async fn list_database_row_ids_updated(
collab_storage: &CollabAccessControlStorage,
pg_pool: &PgPool,
workspace_uuid_str: &str,
database_uuid_str: &str,
after: &DateTime<Utc>,
) -> Result<Vec<DatabaseRowUpdatedItem>, AppError> {
let row_ids = list_database_row_ids(collab_storage, workspace_uuid_str, database_uuid_str)
.await?
.into_iter()
.map(|row| row.id)
.collect::<Vec<String>>();
let workspace_uuid: Uuid = workspace_uuid_str.parse()?;
let updated_row_ids =
select_last_updated_database_row_ids(pg_pool, &workspace_uuid, &row_ids, after).await?;
Ok(updated_row_ids)
}
pub async fn list_database_row_details( pub async fn list_database_row_details(
collab_storage: &CollabAccessControlStorage, collab_storage: &CollabAccessControlStorage,
uid: i64, uid: i64,

View File

@ -24,6 +24,13 @@ async fn workspace_list_database() {
.unwrap(); .unwrap();
assert_eq!(db_row_ids.len(), 5, "{:?}", db_row_ids); assert_eq!(db_row_ids.len(), 5, "{:?}", db_row_ids);
} }
{
let db_row_ids = c
.list_database_row_ids_updated(&workspace_id, &todos_db.id, None)
.await
.unwrap();
assert_eq!(db_row_ids.len(), 5, "{:?}", db_row_ids);
}
{ {
let db_row_ids = c let db_row_ids = c