From e738508d798964e64cac307cf0fc3fd17cb51f60 Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Thu, 24 Oct 2024 19:04:06 +0800 Subject: [PATCH] fix: Support notion database row document (#929) * chore: fix import notion database --- Cargo.lock | 15 +++---- Cargo.toml | 14 +++---- .../src/import_worker/worker.rs | 42 ++++++++++++------- services/appflowy-worker/src/s3_client.rs | 7 ++-- 4 files changed, 47 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6c040f5c..d3718eb7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2239,7 +2239,7 @@ dependencies = [ [[package]] name = "collab" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=e77fd84e0e32b4dc1dcfa271547517e5b0d8e987#e77fd84e0e32b4dc1dcfa271547517e5b0d8e987" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=3493d8ca30850d1ee84f5ba1f6d7b673614b172e#3493d8ca30850d1ee84f5ba1f6d7b673614b172e" dependencies = [ "anyhow", "arc-swap", @@ -2264,7 +2264,7 @@ dependencies = [ [[package]] name = "collab-database" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=e77fd84e0e32b4dc1dcfa271547517e5b0d8e987#e77fd84e0e32b4dc1dcfa271547517e5b0d8e987" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=3493d8ca30850d1ee84f5ba1f6d7b673614b172e#3493d8ca30850d1ee84f5ba1f6d7b673614b172e" dependencies = [ "anyhow", "async-trait", @@ -2303,7 +2303,7 @@ dependencies = [ [[package]] name = "collab-document" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=e77fd84e0e32b4dc1dcfa271547517e5b0d8e987#e77fd84e0e32b4dc1dcfa271547517e5b0d8e987" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=3493d8ca30850d1ee84f5ba1f6d7b673614b172e#3493d8ca30850d1ee84f5ba1f6d7b673614b172e" dependencies = [ "anyhow", "arc-swap", @@ -2324,7 +2324,7 @@ dependencies = [ [[package]] name = "collab-entity" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=e77fd84e0e32b4dc1dcfa271547517e5b0d8e987#e77fd84e0e32b4dc1dcfa271547517e5b0d8e987" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=3493d8ca30850d1ee84f5ba1f6d7b673614b172e#3493d8ca30850d1ee84f5ba1f6d7b673614b172e" dependencies = [ "anyhow", "bytes", @@ -2344,7 +2344,7 @@ dependencies = [ [[package]] name = "collab-folder" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=e77fd84e0e32b4dc1dcfa271547517e5b0d8e987#e77fd84e0e32b4dc1dcfa271547517e5b0d8e987" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=3493d8ca30850d1ee84f5ba1f6d7b673614b172e#3493d8ca30850d1ee84f5ba1f6d7b673614b172e" dependencies = [ "anyhow", "arc-swap", @@ -2366,7 +2366,7 @@ dependencies = [ [[package]] name = "collab-importer" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=e77fd84e0e32b4dc1dcfa271547517e5b0d8e987#e77fd84e0e32b4dc1dcfa271547517e5b0d8e987" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=3493d8ca30850d1ee84f5ba1f6d7b673614b172e#3493d8ca30850d1ee84f5ba1f6d7b673614b172e" dependencies = [ "anyhow", "async-recursion", @@ -2379,6 +2379,7 @@ dependencies = [ "collab-document", "collab-entity", "collab-folder", + "csv", "fancy-regex 0.13.0", "futures", "futures-lite", @@ -2468,7 +2469,7 @@ dependencies = [ [[package]] name = "collab-user" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=e77fd84e0e32b4dc1dcfa271547517e5b0d8e987#e77fd84e0e32b4dc1dcfa271547517e5b0d8e987" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=3493d8ca30850d1ee84f5ba1f6d7b673614b172e#3493d8ca30850d1ee84f5ba1f6d7b673614b172e" dependencies = [ "anyhow", "collab", diff --git a/Cargo.toml b/Cargo.toml index afaa123d..c1d7f157 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -304,13 +304,13 @@ debug = true [patch.crates-io] # It's diffcult to resovle different version with the same crate used in AppFlowy Frontend and the Client-API crate. # So using patch to workaround this issue. -collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "e77fd84e0e32b4dc1dcfa271547517e5b0d8e987" } -collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "e77fd84e0e32b4dc1dcfa271547517e5b0d8e987" } -collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "e77fd84e0e32b4dc1dcfa271547517e5b0d8e987" } -collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "e77fd84e0e32b4dc1dcfa271547517e5b0d8e987" } -collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "e77fd84e0e32b4dc1dcfa271547517e5b0d8e987" } -collab-database = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "e77fd84e0e32b4dc1dcfa271547517e5b0d8e987" } -collab-importer = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "e77fd84e0e32b4dc1dcfa271547517e5b0d8e987" } +collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "3493d8ca30850d1ee84f5ba1f6d7b673614b172e" } +collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "3493d8ca30850d1ee84f5ba1f6d7b673614b172e" } +collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "3493d8ca30850d1ee84f5ba1f6d7b673614b172e" } +collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "3493d8ca30850d1ee84f5ba1f6d7b673614b172e" } +collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "3493d8ca30850d1ee84f5ba1f6d7b673614b172e" } +collab-database = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "3493d8ca30850d1ee84f5ba1f6d7b673614b172e" } +collab-importer = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "3493d8ca30850d1ee84f5ba1f6d7b673614b172e" } [features] history = [] diff --git a/services/appflowy-worker/src/import_worker/worker.rs b/services/appflowy-worker/src/import_worker/worker.rs index e9eaeb33..95d2b296 100644 --- a/services/appflowy-worker/src/import_worker/worker.rs +++ b/services/appflowy-worker/src/import_worker/worker.rs @@ -12,7 +12,7 @@ use collab::core::origin::CollabOrigin; use collab::entity::EncodedCollab; use collab_database::workspace_database::WorkspaceDatabase; use collab_entity::CollabType; -use collab_folder::Folder; +use collab_folder::{Folder, View, ViewLayout}; use collab_importer::imported_collab::ImportType; use collab_importer::notion::page::CollabResource; use collab_importer::notion::NotionImporter; @@ -44,7 +44,7 @@ use serde_json::from_str; use sqlx::types::chrono; use sqlx::types::chrono::{DateTime, Utc}; use sqlx::{PgPool, Pool, Postgres}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::env::temp_dir; use std::fmt::Display; use std::fs::Permissions; @@ -435,7 +435,7 @@ async fn download_and_unzip_file( &import_task.md5_base64, ) .await?; - info!( + trace!( "[Import] {} start unzip file: {:?}", import_task.workspace_id, file.path_buf() @@ -579,21 +579,22 @@ async fn process_unzip_file( let mut resources = vec![]; let mut collab_params_list = vec![]; let mut database_view_ids_by_database_id: HashMap> = HashMap::new(); + let mut orphan_view_ids = HashSet::new(); let mem_cache = CollabMemCache::new(redis_client.clone()); let timestamp = chrono::Utc::now().timestamp(); // 3. Collect all collabs and resources let mut stream = imported.into_collab_stream().await; - while let Some(imported_collab) = stream.next().await { + while let Some(imported_collab_info) = stream.next().await { trace!( "[Import]: {} imported collab: {}", import_task.workspace_id, - imported_collab + imported_collab_info ); - resources.push(imported_collab.resource); + resources.extend(imported_collab_info.resources); collab_params_list.extend( - imported_collab - .collabs + imported_collab_info + .imported_collabs .into_iter() .map(|imported_collab| CollabParams { object_id: imported_collab.object_id, @@ -604,12 +605,14 @@ async fn process_unzip_file( .collect::>(), ); - match imported_collab.import_type { + match imported_collab_info.import_type { ImportType::Database { database_id, view_ids, + row_document_ids, } => { database_view_ids_by_database_id.insert(database_id, view_ids); + orphan_view_ids.extend(row_document_ids); }, ImportType::Document => { // do nothing @@ -668,7 +671,18 @@ async fn process_unzip_file( collab_params_list.push(w_database_collab_params); } - // 5. Encode Folder + // 5. Insert orphan view to folder + let orphan_views = orphan_view_ids + .into_iter() + .map(|orphan_view_id| { + View::orphan_view(&orphan_view_id, ViewLayout::Document, Some(import_task.uid)) + }) + .collect::>(); + if !orphan_views.is_empty() { + folder.insert_views(orphan_views); + } + + // 6. Encode Folder let folder_collab = folder .encode_collab_v1(|collab| CollabType::Folder.validate_require_data(collab)) .map_err(|err| ImportError::Internal(err.into()))?; @@ -698,7 +712,7 @@ async fn process_unzip_file( let upload_resources = process_resources(resources).await; - // 6. Start a transaction to insert all collabs + // 7. Start a transaction to insert all collabs let mut transaction = pg_pool.begin().await.map_err(|err| { ImportError::Internal(anyhow!( "Failed to start transaction when importing data: {:?}", @@ -711,7 +725,7 @@ async fn process_unzip_file( import_task.workspace_id ); - // 7. write all collab to disk + // 8. write all collab to disk insert_into_af_collab_bulk_for_user( &mut transaction, &import_task.uid, @@ -810,13 +824,13 @@ async fn process_unzip_file( return result; } - // 7. after inserting all collabs, upload all files to S3 + // 9. after inserting all collabs, upload all files to S3 trace!("[Import]: {} upload files to s3", import_task.workspace_id,); batch_upload_files_to_s3(&import_task.workspace_id, s3_client, upload_resources) .await .map_err(|err| ImportError::Internal(anyhow!("Failed to upload files to S3: {:?}", err)))?; - // 8. delete zip file regardless of success or failure + // 10. delete zip file regardless of success or failure match fs::remove_dir_all(unzip_dir_path).await { Ok(_) => trace!( "[Import]: {} deleted unzip file: {:?}", diff --git a/services/appflowy-worker/src/s3_client.rs b/services/appflowy-worker/src/s3_client.rs index 9a157262..676d7611 100644 --- a/services/appflowy-worker/src/s3_client.rs +++ b/services/appflowy-worker/src/s3_client.rs @@ -17,7 +17,7 @@ use tokio::fs; use tokio::fs::OpenOptions; use tokio::io::AsyncWriteExt; use tokio_util::compat::TokioAsyncReadCompatExt; -use tracing::{error, info, trace}; +use tracing::{error, trace}; use uuid::Uuid; #[async_trait] @@ -203,9 +203,10 @@ pub async fn download_file( zip_file_path ); write_stream_to_file(&zip_file_path, expected_md5_base64, stream).await?; - info!( + trace!( "[Import] {} finish writing stream to file: {:?}", - workspace_id, zip_file_path + workspace_id, + zip_file_path ); Ok(AutoRemoveDownloadedFile { zip_file_path,