diff --git a/Cargo.lock b/Cargo.lock index 34c565f1..ca27195c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -647,6 +647,7 @@ dependencies = [ "futures", "futures-util", "indexmap 2.2.5", + "itertools 0.12.0", "lazy_static", "md5", "parking_lot 0.12.1", @@ -656,6 +657,7 @@ dependencies = [ "semver", "serde", "serde_json", + "sqlx", "thiserror", "tokio", "tokio-stream", diff --git a/services/appflowy-collaborate/Cargo.toml b/services/appflowy-collaborate/Cargo.toml index ce7d0b9a..7470b63c 100644 --- a/services/appflowy-collaborate/Cargo.toml +++ b/services/appflowy-collaborate/Cargo.toml @@ -12,7 +12,7 @@ path = "src/main.rs" path = "src/lib.rs" [dependencies] -app-error = { workspace = true } +app-error = { workspace = true, features = ["sqlx_error", "tokio_error"] } dashmap.workspace = true async-stream.workspace = true futures.workspace = true @@ -24,6 +24,7 @@ tokio = { workspace = true, features = ["net", "sync", "macros", "rt-multi-threa async-trait = "0.1.77" serde.workspace = true serde_json.workspace = true +sqlx = { workspace = true, default-features = false, features = ["runtime-tokio-rustls", "macros", "postgres", "uuid", "chrono"] } thiserror = "1.0.56" anyhow = "1" bytes.workspace = true @@ -47,6 +48,7 @@ semver = "1.0.22" redis = "0.25.2" parking_lot = "0.12.1" lazy_static = "1.4.0" +itertools = "0.12.0" [dev-dependencies] rand = "0.8.5" diff --git a/src/biz/collab/cache.rs b/services/appflowy-collaborate/src/collab/cache.rs similarity index 98% rename from src/biz/collab/cache.rs rename to services/appflowy-collaborate/src/collab/cache.rs index 13eeb99b..5759e7ef 100644 --- a/src/biz/collab/cache.rs +++ b/services/appflowy-collaborate/src/collab/cache.rs @@ -1,19 +1,22 @@ -use crate::biz::collab::disk_cache::CollabDiskCache; -use crate::biz::collab::mem_cache::CollabMemCache; -use crate::state::RedisConnectionManager; -use app_error::AppError; -use collab::entity::EncodedCollab; -use collab_entity::CollabType; -use database::collab::CollabMetadata; -use database_entity::dto::{CollabParams, QueryCollab, QueryCollabResult}; -use futures_util::{stream, StreamExt}; -use itertools::{Either, Itertools}; -use sqlx::{PgPool, Transaction}; use std::collections::HashMap; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; + +use collab::entity::EncodedCollab; +use collab_entity::CollabType; +use futures_util::{stream, StreamExt}; +use itertools::{Either, Itertools}; +use sqlx::{PgPool, Transaction}; use tracing::{error, event, Level}; +use app_error::AppError; +use database::collab::CollabMetadata; +use database_entity::dto::{CollabParams, QueryCollab, QueryCollabResult}; + +use crate::collab::disk_cache::CollabDiskCache; +use crate::collab::mem_cache::CollabMemCache; +use crate::state::RedisConnectionManager; + #[derive(Clone)] pub struct CollabCache { disk_cache: CollabDiskCache, diff --git a/src/biz/collab/disk_cache.rs b/services/appflowy-collaborate/src/collab/disk_cache.rs similarity index 99% rename from src/biz/collab/disk_cache.rs rename to services/appflowy-collaborate/src/collab/disk_cache.rs index 2e11e679..1810a577 100644 --- a/src/biz/collab/disk_cache.rs +++ b/services/appflowy-collaborate/src/collab/disk_cache.rs @@ -1,18 +1,20 @@ +use std::collections::HashMap; +use std::time::Duration; + use anyhow::anyhow; -use app_error::AppError; use collab::entity::EncodedCollab; use collab_entity::CollabType; +use sqlx::{PgPool, Transaction}; +use tokio::time::sleep; +use tracing::{event, instrument, Level}; + +use app_error::AppError; use database::collab::{ batch_select_collab_blob, insert_into_af_collab, is_collab_exists, select_blob_from_af_collab, select_collab_meta_from_af_collab, AppResult, }; use database::pg_row::AFCollabRowMeta; use database_entity::dto::{CollabParams, QueryCollab, QueryCollabResult}; -use sqlx::{PgPool, Transaction}; -use std::collections::HashMap; -use std::time::Duration; -use tokio::time::sleep; -use tracing::{event, instrument, Level}; #[derive(Clone)] pub struct CollabDiskCache { diff --git a/src/biz/collab/mem_cache.rs b/services/appflowy-collaborate/src/collab/mem_cache.rs similarity index 100% rename from src/biz/collab/mem_cache.rs rename to services/appflowy-collaborate/src/collab/mem_cache.rs index 74acaadd..33c2eda1 100644 --- a/src/biz/collab/mem_cache.rs +++ b/services/appflowy-collaborate/src/collab/mem_cache.rs @@ -1,13 +1,13 @@ -use crate::state::RedisConnectionManager; +use anyhow::anyhow; use collab::entity::EncodedCollab; use redis::{pipe, AsyncCommands}; - -use anyhow::anyhow; -use app_error::AppError; - -use database::collab::CollabMetadata; use tracing::{error, instrument, trace}; +use app_error::AppError; +use database::collab::CollabMetadata; + +use crate::state::RedisConnectionManager; + const SEVEN_DAYS: i64 = 604800; const ONE_MONTH: u64 = 2592000; #[derive(Clone)] diff --git a/services/appflowy-collaborate/src/collab/mod.rs b/services/appflowy-collaborate/src/collab/mod.rs new file mode 100644 index 00000000..befdd3d5 --- /dev/null +++ b/services/appflowy-collaborate/src/collab/mod.rs @@ -0,0 +1,3 @@ +pub mod cache; +pub mod disk_cache; +pub mod mem_cache; diff --git a/services/appflowy-collaborate/src/lib.rs b/services/appflowy-collaborate/src/lib.rs index 34a0ae9e..5b77b46a 100644 --- a/services/appflowy-collaborate/src/lib.rs +++ b/services/appflowy-collaborate/src/lib.rs @@ -1,4 +1,5 @@ mod client; +pub mod collab; pub mod command; pub mod connect_state; pub mod error; @@ -7,6 +8,7 @@ mod metrics; mod permission; mod rt_server; pub mod shared_state; +mod state; mod util; pub use metrics::*; diff --git a/services/appflowy-collaborate/src/state.rs b/services/appflowy-collaborate/src/state.rs new file mode 100644 index 00000000..704b004e --- /dev/null +++ b/services/appflowy-collaborate/src/state.rs @@ -0,0 +1 @@ +pub type RedisConnectionManager = redis::aio::ConnectionManager; diff --git a/src/application.rs b/src/application.rs index 9d254907..a0a95b1b 100644 --- a/src/application.rs +++ b/src/application.rs @@ -14,7 +14,6 @@ use crate::biz::casbin::{ use crate::biz::collab::access_control::{ CollabMiddlewareAccessControl, CollabStorageAccessControlImpl, }; -use crate::biz::collab::cache::CollabCache; use crate::biz::collab::storage::CollabStorageImpl; use crate::biz::pg_listener::PgListeners; use crate::biz::snapshot::SnapshotControl; @@ -33,6 +32,7 @@ use actix_web::cookie::Key; use actix_web::{dev::Server, web, web::Data, App, HttpServer}; use anyhow::{Context, Error}; use appflowy_ai_client::client::AppFlowyAIClient; +use appflowy_collaborate::collab::cache::CollabCache; use appflowy_collaborate::command::{CLCommandReceiver, CLCommandSender}; use appflowy_collaborate::shared_state::RealtimeSharedState; use appflowy_collaborate::CollaborationServer; diff --git a/src/biz/collab/access_control.rs b/src/biz/collab/access_control.rs index dbfeb6e2..60fd52a7 100644 --- a/src/biz/collab/access_control.rs +++ b/src/biz/collab/access_control.rs @@ -1,19 +1,20 @@ -use crate::api::workspace::{COLLAB_PATTERN, V1_COLLAB_PATTERN}; -use crate::biz::workspace::access_control::WorkspaceAccessControl; -use crate::middleware::access_control_mw::{AccessResource, MiddlewareAccessControl}; +use std::collections::HashMap; +use std::sync::Arc; + use actix_router::{Path, ResourceDef, Url}; use actix_web::http::Method; -use app_error::AppError; use async_trait::async_trait; +use tracing::{instrument, trace}; + +use access_control::act::Action; +use app_error::AppError; +use appflowy_collaborate::collab::cache::CollabCache; use database::collab::CollabStorageAccessControl; use database_entity::dto::AFAccessLevel; -use crate::biz::collab::cache::CollabCache; - -use access_control::act::Action; -use std::collections::HashMap; -use std::sync::Arc; -use tracing::{instrument, trace}; +use crate::api::workspace::{COLLAB_PATTERN, V1_COLLAB_PATTERN}; +use crate::biz::workspace::access_control::WorkspaceAccessControl; +use crate::middleware::access_control_mw::{AccessResource, MiddlewareAccessControl}; #[async_trait] pub trait CollabAccessControl: Sync + Send + 'static { diff --git a/src/biz/collab/mod.rs b/src/biz/collab/mod.rs index 8ae33bf2..0f29cd70 100644 --- a/src/biz/collab/mod.rs +++ b/src/biz/collab/mod.rs @@ -1,7 +1,4 @@ pub mod access_control; -pub mod cache; -pub mod disk_cache; -pub mod mem_cache; pub mod metrics; pub mod ops; pub mod queue; diff --git a/src/biz/collab/queue.rs b/src/biz/collab/queue.rs index 1d1165b3..77aaf283 100644 --- a/src/biz/collab/queue.rs +++ b/src/biz/collab/queue.rs @@ -1,27 +1,29 @@ -use crate::biz::collab::cache::CollabCache; -use crate::biz::collab::queue_redis_ops::{ - get_pending_meta, remove_all_pending_meta, remove_pending_meta, storage_cache_key, PendingWrite, - WritePriority, PENDING_WRITE_META_EXPIRE_SECS, -}; - -use crate::biz::collab::metrics::CollabMetrics; -use crate::biz::collab::RedisSortedSet; -use crate::state::RedisConnectionManager; -use anyhow::{anyhow, Context}; -use app_error::AppError; -use collab_entity::CollabType; -use database_entity::dto::{CollabParams, QueryCollab, QueryCollabResult}; -use serde::{Deserialize, Serialize}; -use sqlx::PgPool; use std::collections::HashMap; use std::ops::DerefMut; use std::sync::atomic::AtomicI64; use std::sync::Arc; use std::time::Duration; + +use anyhow::{anyhow, Context}; +use collab_entity::CollabType; +use serde::{Deserialize, Serialize}; +use sqlx::PgPool; use tokio::sync::Mutex; use tokio::time::{interval, sleep, sleep_until, Instant}; use tracing::{error, instrument, trace, warn}; +use app_error::AppError; +use appflowy_collaborate::collab::cache::CollabCache; +use database_entity::dto::{CollabParams, QueryCollab, QueryCollabResult}; + +use crate::biz::collab::metrics::CollabMetrics; +use crate::biz::collab::queue_redis_ops::{ + get_pending_meta, remove_all_pending_meta, remove_pending_meta, storage_cache_key, PendingWrite, + WritePriority, PENDING_WRITE_META_EXPIRE_SECS, +}; +use crate::biz::collab::RedisSortedSet; +use crate::state::RedisConnectionManager; + type PendingWriteSet = Arc; #[derive(Clone)] pub struct StorageQueue { diff --git a/src/biz/collab/storage.rs b/src/biz/collab/storage.rs index aa5e40e6..58244531 100644 --- a/src/biz/collab/storage.rs +++ b/src/biz/collab/storage.rs @@ -1,6 +1,5 @@ use crate::biz::casbin::{CollabAccessControlImpl, WorkspaceAccessControlImpl}; use crate::biz::collab::access_control::CollabStorageAccessControlImpl; -use crate::biz::collab::cache::CollabCache; use crate::biz::snapshot::SnapshotControl; use crate::api::util::CollabValidator; @@ -9,6 +8,7 @@ use crate::biz::collab::queue::{StorageQueue, REDIS_PENDING_WRITE_QUEUE}; use crate::biz::collab::queue_redis_ops::WritePriority; use crate::state::RedisConnectionManager; use app_error::AppError; +use appflowy_collaborate::collab::cache::CollabCache; use appflowy_collaborate::command::{CLCommandSender, CollaborationCommand}; use appflowy_collaborate::shared_state::RealtimeSharedState; use async_trait::async_trait; diff --git a/src/state.rs b/src/state.rs index c16383e5..0eed990a 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,6 +1,5 @@ use crate::api::metrics::RequestMetrics; use crate::biz::casbin::{CollabAccessControlImpl, WorkspaceAccessControlImpl}; -use crate::biz::collab::cache::CollabCache; use crate::biz::collab::metrics::CollabMetrics; use crate::biz::collab::storage::CollabAccessControlStorage; @@ -11,6 +10,7 @@ use access_control::access::AccessControl; use access_control::metrics::AccessControlMetrics; use app_error::AppError; use appflowy_ai_client::client::AppFlowyAIClient; +use appflowy_collaborate::collab::cache::CollabCache; use appflowy_collaborate::shared_state::RealtimeSharedState; use appflowy_collaborate::CollabRealtimeMetrics; use dashmap::DashMap; diff --git a/tests/collab/pending_write_test.rs b/tests/collab/pending_write_test.rs index efc40ff2..7c8cd754 100644 --- a/tests/collab/pending_write_test.rs +++ b/tests/collab/pending_write_test.rs @@ -1,8 +1,8 @@ use crate::collab::util::{generate_random_bytes, redis_connection_manager}; use crate::sql_test::util::{setup_db, test_create_user}; -use appflowy_cloud::biz::collab::cache::CollabCache; use appflowy_cloud::biz::collab::queue::StorageQueue; use appflowy_cloud::biz::collab::WritePriority; +use appflowy_collaborate::collab::cache::CollabCache; use client_api_test_util::setup_log; use collab::entity::EncodedCollab; use collab_entity::CollabType; diff --git a/tests/collab/storage_test.rs b/tests/collab/storage_test.rs index 58a872d8..2b32aa19 100644 --- a/tests/collab/storage_test.rs +++ b/tests/collab/storage_test.rs @@ -1,20 +1,18 @@ -use crate::collab::util::{redis_connection_manager, test_encode_collab_v1}; +use std::collections::HashMap; -use app_error::ErrorCode; - -use appflowy_cloud::biz::collab::mem_cache::CollabMemCache; - -use client_api_test_util::*; use collab::entity::EncodedCollab; use collab_entity::CollabType; +use sqlx::types::Uuid; + +use app_error::ErrorCode; +use appflowy_collaborate::collab::mem_cache::CollabMemCache; +use client_api_test_util::*; +use database::collab::CollabMetadata; use database_entity::dto::{ CreateCollabParams, DeleteCollabParams, QueryCollab, QueryCollabParams, QueryCollabResult, }; -use sqlx::types::Uuid; -use database::collab::CollabMetadata; - -use std::collections::HashMap; +use crate::collab::util::{redis_connection_manager, test_encode_collab_v1}; #[tokio::test] async fn success_insert_collab_test() {