feat: api for delete items from trash permanently (#1068)

This commit is contained in:
Khor Shu Heng 2024-12-17 13:51:25 +08:00 committed by GitHub
parent 91c2a925bc
commit 68881a92d5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 428 additions and 27 deletions

View File

@ -97,6 +97,40 @@ impl Client {
AppResponse::<()>::from_response(resp).await?.into_error()
}
pub async fn delete_workspace_page_view_from_trash(
&self,
workspace_id: Uuid,
view_id: &str,
) -> Result<(), AppResponseError> {
let url = format!(
"{}/api/workspace/{}/trash/{}",
self.base_url, workspace_id, view_id
);
let resp = self
.http_client_with_auth(Method::DELETE, &url)
.await?
.send()
.await?;
AppResponse::<()>::from_response(resp).await?.into_error()
}
pub async fn delete_all_workspace_page_views_from_trash(
&self,
workspace_id: Uuid,
) -> Result<(), AppResponseError> {
let url = format!(
"{}/api/workspace/{}/delete-all-pages-from-trash",
self.base_url, workspace_id
);
let resp = self
.http_client_with_auth(Method::POST, &url)
.await?
.json(&json!({}))
.send()
.await?;
AppResponse::<()>::from_response(resp).await?.into_error()
}
pub async fn update_workspace_page_view(
&self,
workspace_id: Uuid,

View File

@ -233,6 +233,7 @@ pub struct AppFlowyWebMetrics {
pub update_size_bytes: Histogram,
pub decoding_failure_count: Gauge,
pub apply_update_failure_count: Gauge,
pub apply_update_timeout_count: Gauge,
}
impl AppFlowyWebMetrics {
@ -243,6 +244,7 @@ impl AppFlowyWebMetrics {
update_size_bytes: Histogram::new(update_size_buckets),
decoding_failure_count: Default::default(),
apply_update_failure_count: Default::default(),
apply_update_timeout_count: Default::default(),
}
}
@ -264,6 +266,11 @@ impl AppFlowyWebMetrics {
"Number of updates that failed to apply",
metrics.apply_update_failure_count.clone(),
);
web_update_registry.register(
"apply_update_timeout_count",
"Number of updates that failed to apply within timeout",
metrics.apply_update_timeout_count.clone(),
);
metrics
}
@ -278,4 +285,8 @@ impl AppFlowyWebMetrics {
pub fn incr_apply_update_failure_count(&self, count: i64) {
self.apply_update_failure_count.inc_by(count);
}
pub fn incr_apply_update_timeout_count(&self, count: i64) {
self.apply_update_timeout_count.inc_by(count);
}
}

View File

@ -7,10 +7,13 @@ use actix_web::HttpRequest;
use appflowy_ai_client::dto::AIModel;
use async_trait::async_trait;
use byteorder::{ByteOrder, LittleEndian};
use chrono::Utc;
use collab_rt_entity::user::RealtimeUser;
use collab_rt_protocol::spawn_blocking_validate_encode_collab;
use database_entity::dto::CollabParams;
use std::str::FromStr;
use tokio_stream::StreamExt;
use uuid::Uuid;
#[inline]
pub fn compress_type_from_header_value(headers: &HeaderMap) -> Result<CompressionType, AppError> {
@ -86,6 +89,28 @@ pub fn device_id_from_headers(headers: &HeaderMap) -> Result<&str, AppError> {
)
}
/// Create new realtime user for requests from appflowy web
pub fn realtime_user_for_web_request(
headers: &HeaderMap,
uid: i64,
) -> Result<RealtimeUser, AppError> {
let app_version = client_version_from_headers(headers)
.map(|s| s.to_string())
.unwrap_or_else(|_| "web".to_string());
let device_id = device_id_from_headers(headers)
.map(|s| s.to_string())
.unwrap_or_else(|_| Uuid::new_v4().to_string());
let session_id = device_id.clone();
let user = RealtimeUser {
uid,
device_id,
connect_at: Utc::now().timestamp(),
session_id,
app_version,
};
Ok(user)
}
#[async_trait]
pub trait CollabValidator {
async fn check_encode_collab(&self) -> Result<(), AppError>;

View File

@ -1,4 +1,4 @@
use crate::api::util::{client_version_from_headers, PayloadReader};
use crate::api::util::{client_version_from_headers, realtime_user_for_web_request, PayloadReader};
use crate::api::util::{compress_type_from_header_value, device_id_from_headers, CollabValidator};
use crate::api::ws::RealtimeServerAddr;
use crate::biz;
@ -12,9 +12,9 @@ use crate::biz::workspace::ops::{
get_reactions_on_published_view, remove_comment_on_published_view, remove_reaction_on_comment,
};
use crate::biz::workspace::page_view::{
create_page, create_space, get_page_view_collab, move_page, move_page_to_trash,
restore_all_pages_from_trash, restore_page_from_trash, update_page, update_page_collab_data,
update_space,
create_page, create_space, delete_all_pages_from_trash, delete_trash, get_page_view_collab,
move_page, move_page_to_trash, restore_all_pages_from_trash, restore_page_from_trash,
update_page, update_page_collab_data, update_space,
};
use crate::biz::workspace::publish::get_workspace_default_publish_view_info_meta;
use crate::domain::compression::{
@ -172,6 +172,10 @@ pub fn workspace_scope() -> Scope {
web::resource("/{workspace_id}/restore-all-pages-from-trash")
.route(web::post().to(restore_all_pages_from_trash_handler)),
)
.service(
web::resource("/{workspace_id}/delete-all-pages-from-trash")
.route(web::post().to(delete_all_pages_from_trash_handler)),
)
.service(
web::resource("/{workspace_id}/batch/collab")
.route(web::post().to(batch_create_collab_handler)),
@ -254,6 +258,10 @@ pub fn workspace_scope() -> Scope {
web::resource("/{workspace_id}/favorite").route(web::get().to(get_favorite_views_handler)),
)
.service(web::resource("/{workspace_id}/trash").route(web::get().to(get_trash_views_handler)))
.service(
web::resource("/{workspace_id}/trash/{view_id}")
.route(web::delete().to(delete_page_from_trash_handler)),
)
.service(
web::resource("/published-outline/{publish_namespace}")
.route(web::get().to(get_workspace_publish_outline_handler)),
@ -910,32 +918,27 @@ async fn post_web_update_handler(
server: Data<RealtimeServerAddr>,
req: HttpRequest,
) -> Result<Json<AppResponse<()>>> {
let payload = payload.into_inner();
let app_version = client_version_from_headers(req.headers())
.map(|s| s.to_string())
.unwrap_or_else(|_| "web".to_string());
let device_id = device_id_from_headers(req.headers())
.map(|s| s.to_string())
.unwrap_or_else(|_| Uuid::new_v4().to_string());
let session_id = device_id.clone();
let (workspace_id, object_id) = path.into_inner();
let collab_type = payload.collab_type.clone();
let uid = state
.user_cache
.get_user_uid(&user_uuid)
.await
.map_err(AppResponseError::from)?;
let user = RealtimeUser {
uid,
device_id,
connect_at: timestamp(),
session_id,
app_version,
};
let (workspace_id, object_id) = path.into_inner();
state
.collab_access_control
.enforce_action(
&workspace_id.to_string(),
&uid,
&object_id.to_string(),
Action::Write,
)
.await?;
let user = realtime_user_for_web_request(req.headers(), uid)?;
trace!("create onetime web realtime user: {}", user);
let payload = payload.into_inner();
let collab_type = payload.collab_type.clone();
update_page_collab_data(
&state.metrics.appflowy_web_metrics,
server,
@ -1089,6 +1092,65 @@ async fn restore_all_pages_from_trash_handler(
Ok(Json(AppResponse::Ok()))
}
async fn delete_page_from_trash_handler(
user_uuid: UserUuid,
path: web::Path<(Uuid, String)>,
state: Data<AppState>,
server: Data<RealtimeServerAddr>,
req: HttpRequest,
) -> Result<Json<AppResponse<()>>> {
let uid = state
.user_cache
.get_user_uid(&user_uuid)
.await
.map_err(AppResponseError::from)?;
let (workspace_id, view_id) = path.into_inner();
state
.workspace_access_control
.enforce_action(&uid, &workspace_id.to_string(), Action::Write)
.await?;
let user = realtime_user_for_web_request(req.headers(), uid)?;
delete_trash(
&state.metrics.appflowy_web_metrics,
server,
user,
&state.collab_access_control_storage,
workspace_id,
&view_id,
)
.await?;
Ok(Json(AppResponse::Ok()))
}
async fn delete_all_pages_from_trash_handler(
user_uuid: UserUuid,
path: web::Path<Uuid>,
state: Data<AppState>,
server: Data<RealtimeServerAddr>,
req: HttpRequest,
) -> Result<Json<AppResponse<()>>> {
let uid = state
.user_cache
.get_user_uid(&user_uuid)
.await
.map_err(AppResponseError::from)?;
let workspace_id = path.into_inner();
state
.workspace_access_control
.enforce_action(&uid, &workspace_id.to_string(), Action::Write)
.await?;
let user = realtime_user_for_web_request(req.headers(), uid)?;
delete_all_pages_from_trash(
&state.metrics.appflowy_web_metrics,
server,
user,
&state.collab_access_control_storage,
workspace_id,
)
.await?;
Ok(Json(AppResponse::Ok()))
}
async fn update_page_view_handler(
user_uuid: UserUuid,
path: web::Path<(Uuid, String)>,

View File

@ -50,7 +50,8 @@ use shared_entity::dto::workspace_dto::{
use sqlx::{PgPool, Transaction};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Instant;
use std::time::{Duration, Instant};
use tokio::time::timeout_at;
use tracing::instrument;
use uuid::Uuid;
@ -627,6 +628,43 @@ async fn move_all_views_out_from_trash(folder: &mut Folder) -> Result<FolderUpda
})
}
async fn delete_view_from_trash(view_id: &str, folder: &mut Folder) -> Result<Vec<u8>, AppError> {
let encoded_update = {
let mut txn = folder.collab.transact_mut();
folder
.body
.views
.update_view(&mut txn, view_id, |update| update.set_trash(false).done());
folder.body.views.delete_views(&mut txn, vec![view_id]);
txn.encode_update_v1()
};
Ok(encoded_update)
}
async fn delete_all_views_from_trash(folder: &mut Folder) -> Result<Vec<u8>, AppError> {
let all_trash_ids: Vec<String> = folder
.get_all_trash_sections()
.iter()
.map(|s| s.id.clone())
.collect();
let encoded_update = {
let mut txn = folder.collab.transact_mut();
if let Some(op) = folder
.body
.section
.section_op(&txn, collab_folder::Section::Trash)
{
op.clear(&mut txn);
};
folder.body.views.delete_views(&mut txn, all_trash_ids);
txn.encode_update_v1()
};
Ok(encoded_update)
}
fn folder_to_encoded_collab(folder: &Folder) -> Result<Vec<u8>, AppError> {
let collab_type = CollabType::Folder;
let encoded_folder_collab = folder
@ -1031,6 +1069,39 @@ pub async fn restore_all_pages_from_trash(
Ok(())
}
pub async fn delete_trash(
appflowy_web_metrics: &AppFlowyWebMetrics,
server: Data<RealtimeServerAddr>,
user: RealtimeUser,
collab_storage: &CollabAccessControlStorage,
workspace_id: Uuid,
view_id: &str,
) -> Result<(), AppError> {
let uid = user.uid;
let collab_origin = GetCollabOrigin::User { uid };
let mut folder =
get_latest_collab_folder(collab_storage, collab_origin, &workspace_id.to_string()).await?;
let update = delete_view_from_trash(view_id, &mut folder).await?;
update_workspace_folder_data(appflowy_web_metrics, server, user, workspace_id, update).await?;
Ok(())
}
pub async fn delete_all_pages_from_trash(
appflowy_web_metrics: &AppFlowyWebMetrics,
server: Data<RealtimeServerAddr>,
user: RealtimeUser,
collab_storage: &CollabAccessControlStorage,
workspace_id: Uuid,
) -> Result<(), AppError> {
let uid = user.uid;
let collab_origin = GetCollabOrigin::User { uid };
let mut folder =
get_latest_collab_folder(collab_storage, collab_origin, &workspace_id.to_string()).await?;
let update = delete_all_views_from_trash(&mut folder).await?;
update_workspace_folder_data(appflowy_web_metrics, server, user, workspace_id, update).await?;
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub async fn update_page(
pg_pool: &PgPool,
@ -1273,7 +1344,7 @@ async fn get_page_collab_data_for_document(
#[instrument(level = "debug", skip_all)]
pub async fn update_page_collab_data(
appflowy_web_metrics: &Arc<AppFlowyWebMetrics>,
appflowy_web_metrics: &AppFlowyWebMetrics,
server: Data<RealtimeServerAddr>,
user: RealtimeUser,
workspace_id: Uuid,
@ -1300,3 +1371,54 @@ pub async fn update_page_collab_data(
Ok(())
}
#[instrument(level = "debug", skip_all)]
pub async fn update_workspace_folder_data(
appflowy_web_metrics: &AppFlowyWebMetrics,
server: Data<RealtimeServerAddr>,
user: RealtimeUser,
workspace_id: Uuid,
update: Vec<u8>,
) -> Result<(), AppError> {
appflowy_web_metrics.record_update_size_bytes(update.len());
let (tx, rx) = tokio::sync::oneshot::channel();
let message = ClientHttpUpdateMessage {
user,
workspace_id: workspace_id.to_string(),
object_id: workspace_id.to_string(),
collab_type: CollabType::Folder,
update: Bytes::from(update),
state_vector: None,
return_tx: Some(tx),
};
server
.try_send(message)
.map_err(|err| AppError::Internal(anyhow!("Failed to send message to server: {}", err)))?;
let resp = timeout_at(
tokio::time::Instant::now() + Duration::from_millis(2000),
rx,
)
.await
.map_err(|err| {
appflowy_web_metrics.incr_apply_update_timeout_count(1);
AppError::Internal(anyhow!(
"Failed to receive apply update within timeout: {}",
err
))
})?
.map_err(|err| AppError::Internal(anyhow!("Unable to receive folder update reply: {}", err)))?;
match resp {
Ok(_) => Ok(()),
Err(err) => {
appflowy_web_metrics.incr_apply_update_failure_count(1);
Err(AppError::Internal(anyhow!(
"Failed to apply folder update: {}",
err
)))
},
}
}

View File

@ -262,7 +262,7 @@ async fn move_page_to_another_space() {
}
#[tokio::test]
async fn move_page_to_trash() {
async fn move_page_to_trash_then_restore() {
let registered_user = generate_unique_registered_user().await;
let mut app_client = TestClient::user_with_new_device(registered_user.clone()).await;
let web_client = TestClient::user_with_new_device(registered_user.clone()).await;
@ -359,7 +359,7 @@ async fn move_page_to_trash() {
}
#[tokio::test]
async fn move_page_with_child_to_trash() {
async fn move_page_with_child_to_trash_then_restore() {
let registered_user = generate_unique_registered_user().await;
let mut app_client = TestClient::user_with_new_device(registered_user.clone()).await;
let web_client = TestClient::user_with_new_device(registered_user.clone()).await;
@ -432,6 +432,153 @@ async fn move_page_with_child_to_trash() {
assert!(!view_found);
}
#[tokio::test]
async fn move_page_with_child_to_trash_then_delete_permanently() {
let registered_user = generate_unique_registered_user().await;
let mut app_client = TestClient::user_with_new_device(registered_user.clone()).await;
let web_client = TestClient::user_with_new_device(registered_user.clone()).await;
let workspace_id = app_client.workspace_id().await;
let folder_view = web_client
.api_client
.get_workspace_folder(&workspace_id, Some(2), None)
.await
.unwrap();
let general_space = &folder_view
.children
.into_iter()
.find(|v| v.name == "General")
.unwrap();
app_client.open_workspace_collab(&workspace_id).await;
app_client
.wait_object_sync_complete(&workspace_id)
.await
.unwrap();
app_client
.api_client
.move_workspace_page_view_to_trash(
Uuid::parse_str(&workspace_id).unwrap(),
&general_space.view_id,
)
.await
.unwrap();
let folder = get_latest_folder(&app_client, &workspace_id).await;
let views_in_trash_for_app = folder
.get_my_trash_sections()
.iter()
.map(|v| v.id.clone())
.collect::<HashSet<String>>();
assert!(views_in_trash_for_app.contains(&general_space.view_id));
for view in general_space.children.iter() {
assert!(!views_in_trash_for_app.contains(&view.view_id));
}
let views_in_trash_for_web = web_client
.api_client
.get_workspace_trash(&workspace_id)
.await
.unwrap()
.views
.iter()
.map(|v| v.view.view_id.clone())
.collect::<HashSet<String>>();
assert!(views_in_trash_for_web.contains(&general_space.view_id));
web_client
.api_client
.delete_workspace_page_view_from_trash(
Uuid::parse_str(&workspace_id).unwrap(),
&general_space.view_id,
)
.await
.unwrap();
let folder = get_latest_folder(&app_client, &workspace_id).await;
assert!(folder.get_view(&general_space.view_id).is_none());
assert!(!folder
.get_my_trash_sections()
.iter()
.any(|v| v.id == general_space.view_id));
let view_found = web_client
.api_client
.get_workspace_trash(&workspace_id)
.await
.unwrap()
.views
.iter()
.any(|v| v.view.view_id == general_space.view_id);
assert!(!view_found);
}
#[tokio::test]
async fn move_page_with_child_to_trash_then_delete_all_permanently() {
let registered_user = generate_unique_registered_user().await;
let mut app_client = TestClient::user_with_new_device(registered_user.clone()).await;
let web_client = TestClient::user_with_new_device(registered_user.clone()).await;
let workspace_id = app_client.workspace_id().await;
let folder_view = web_client
.api_client
.get_workspace_folder(&workspace_id, Some(2), None)
.await
.unwrap();
let general_space = &folder_view
.children
.into_iter()
.find(|v| v.name == "General")
.unwrap();
app_client.open_workspace_collab(&workspace_id).await;
app_client
.wait_object_sync_complete(&workspace_id)
.await
.unwrap();
app_client
.api_client
.move_workspace_page_view_to_trash(
Uuid::parse_str(&workspace_id).unwrap(),
&general_space.view_id,
)
.await
.unwrap();
let folder = get_latest_folder(&app_client, &workspace_id).await;
let views_in_trash_for_app = folder
.get_my_trash_sections()
.iter()
.map(|v| v.id.clone())
.collect::<HashSet<String>>();
assert!(views_in_trash_for_app.contains(&general_space.view_id));
for view in general_space.children.iter() {
assert!(!views_in_trash_for_app.contains(&view.view_id));
}
let views_in_trash_for_web = web_client
.api_client
.get_workspace_trash(&workspace_id)
.await
.unwrap()
.views
.iter()
.map(|v| v.view.view_id.clone())
.collect::<HashSet<String>>();
assert!(views_in_trash_for_web.contains(&general_space.view_id));
web_client
.api_client
.delete_all_workspace_page_views_from_trash(Uuid::parse_str(&workspace_id).unwrap())
.await
.unwrap();
let folder = get_latest_folder(&app_client, &workspace_id).await;
assert!(folder.get_view(&general_space.view_id).is_none());
assert!(!folder
.get_my_trash_sections()
.iter()
.any(|v| v.id == general_space.view_id));
let view_found = web_client
.api_client
.get_workspace_trash(&workspace_id)
.await
.unwrap()
.views
.iter()
.any(|v| v.view.view_id == general_space.view_id);
assert!(!view_found);
}
#[tokio::test]
async fn update_page() {
let registered_user = generate_unique_registered_user().await;