From 67d9fad7d7d7a653c8cdfe8bc616740b30615c1f Mon Sep 17 00:00:00 2001 From: Khor Shu Heng <32997938+khorshuheng@users.noreply.github.com> Date: Tue, 21 May 2024 10:06:26 +0800 Subject: [PATCH] chore: migrate collab storage to appflowy-collaborate (#566) * chore: migrate collab storage to appflowy-collaborate * fix: clippy error * chore: remove handler --------- Co-authored-by: nathan --- Cargo.lock | 2 + libs/collab-stream/src/model.rs | 13 +-- services/appflowy-collaborate/Cargo.toml | 2 + .../src/collab/access_control.rs | 86 +++++++++++++++++++ .../appflowy-collaborate/src/collab/mod.rs | 5 ++ .../appflowy-collaborate/src}/collab/queue.rs | 8 +- .../src}/collab/queue_redis_ops.rs | 4 +- .../src}/collab/storage.rs | 20 ++--- .../src/collab/validator.rs | 18 ++++ .../src/group/group_init.rs | 26 +++--- services/appflowy-collaborate/src/lib.rs | 3 +- services/appflowy-collaborate/src/metrics.rs | 75 ++++++++++++++++ .../src}/snapshot/cache.rs | 0 .../appflowy-collaborate/src}/snapshot/mod.rs | 0 .../src}/snapshot/queue.rs | 0 .../src}/snapshot/snapshot_control.rs | 6 +- src/api/ws.rs | 2 +- src/application.rs | 10 +-- src/biz/collab/access_control.rs | 84 ------------------ src/biz/collab/metrics.rs | 77 ----------------- src/biz/collab/mod.rs | 5 -- src/biz/mod.rs | 1 - src/biz/user/user_init.rs | 2 +- src/biz/workspace/ops.rs | 2 +- src/state.rs | 4 +- tests/collab/pending_write_test.rs | 4 +- 26 files changed, 237 insertions(+), 222 deletions(-) rename {src/biz => services/appflowy-collaborate/src}/collab/queue.rs (98%) rename {src/biz => services/appflowy-collaborate/src}/collab/queue_redis_ops.rs (98%) rename {src/biz => services/appflowy-collaborate/src}/collab/storage.rs (94%) create mode 100644 services/appflowy-collaborate/src/collab/validator.rs rename {src/biz => services/appflowy-collaborate/src}/snapshot/cache.rs (100%) rename {src/biz => services/appflowy-collaborate/src}/snapshot/mod.rs (100%) rename {src/biz => services/appflowy-collaborate/src}/snapshot/queue.rs (100%) rename {src/biz => services/appflowy-collaborate/src}/snapshot/snapshot_control.rs (98%) delete mode 100644 src/biz/collab/metrics.rs diff --git a/Cargo.lock b/Cargo.lock index c55c8a0c..9c185d4d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -671,6 +671,8 @@ dependencies = [ "tokio-util", "tracing", "uuid", + "validator", + "workspace-access", "yrs", ] diff --git a/libs/collab-stream/src/model.rs b/libs/collab-stream/src/model.rs index f2f981cb..98792ac9 100644 --- a/libs/collab-stream/src/model.rs +++ b/libs/collab-stream/src/model.rs @@ -1,5 +1,6 @@ use collab_entity::CollabType; use std::collections::BTreeMap; +use std::fmt::Display; use std::ops::Deref; use std::str::FromStr; @@ -23,9 +24,9 @@ pub struct MessageId { pub sequence_number: u16, } -impl ToString for MessageId { - fn to_string(&self) -> String { - format!("{}-{}", self.timestamp_ms, self.sequence_number) +impl Display for MessageId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}-{}", self.timestamp_ms, self.sequence_number) } } @@ -233,9 +234,9 @@ impl FromRedisValue for RedisString { } } -impl ToString for RedisString { - fn to_string(&self) -> String { - self.0.clone() +impl Display for RedisString { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0.clone()) } } diff --git a/services/appflowy-collaborate/Cargo.toml b/services/appflowy-collaborate/Cargo.toml index c074251f..63d0e90c 100644 --- a/services/appflowy-collaborate/Cargo.toml +++ b/services/appflowy-collaborate/Cargo.toml @@ -54,6 +54,8 @@ redis = "0.25.2" parking_lot = "0.12.1" lazy_static = "1.4.0" itertools = "0.12.0" +validator = "0.16.1" +workspace-access.workspace = true [dev-dependencies] rand = "0.8.5" diff --git a/services/appflowy-collaborate/src/collab/access_control.rs b/services/appflowy-collaborate/src/collab/access_control.rs index d5407b35..0a0d353f 100644 --- a/services/appflowy-collaborate/src/collab/access_control.rs +++ b/services/appflowy-collaborate/src/collab/access_control.rs @@ -1,11 +1,15 @@ use async_trait::async_trait; +use std::sync::Arc; use tracing::instrument; +use crate::collab::cache::CollabCache; use access_control::access::ObjectType; use access_control::access::{enable_access_control, AccessControl}; use access_control::act::{Action, ActionVariant}; use access_control::collab::{CollabAccessControl, RealtimeAccessControl}; +use access_control::workspace::WorkspaceAccessControl; use app_error::AppError; +use database::collab::CollabStorageAccessControl; use database_entity::dto::AFAccessLevel; #[derive(Clone)] @@ -159,3 +163,85 @@ impl RealtimeAccessControl for RealtimeCollabAccessControlImpl { .await } } + +#[derive(Clone)] +pub struct CollabStorageAccessControlImpl { + pub collab_access_control: Arc, + pub workspace_access_control: Arc, + pub cache: CollabCache, +} + +#[async_trait] +impl CollabStorageAccessControl + for CollabStorageAccessControlImpl +where + CollabAC: CollabAccessControl, + WorkspaceAC: WorkspaceAccessControl, +{ + async fn update_policy( + &self, + uid: &i64, + oid: &str, + level: AFAccessLevel, + ) -> Result<(), AppError> { + self + .collab_access_control + .update_access_level_policy(uid, oid, level) + .await + } + + async fn enforce_read_collab( + &self, + workspace_id: &str, + uid: &i64, + oid: &str, + ) -> Result { + let collab_exists = self.cache.is_exist(oid).await?; + if !collab_exists { + // If the collab does not exist, we should not enforce the access control. We consider the user + // has the permission to read the collab + return Ok(true); + } + self + .collab_access_control + .enforce_action(workspace_id, uid, oid, Action::Read) + .await + } + + async fn enforce_write_collab( + &self, + workspace_id: &str, + uid: &i64, + oid: &str, + ) -> Result { + let collab_exists = self.cache.is_exist(oid).await?; + if !collab_exists { + // If the collab does not exist, we should not enforce the access control. we consider the user + // has the permission to write the collab + return Ok(true); + } + self + .collab_access_control + .enforce_action(workspace_id, uid, oid, Action::Write) + .await + } + + async fn enforce_write_workspace(&self, uid: &i64, workspace_id: &str) -> Result { + self + .workspace_access_control + .enforce_action(uid, workspace_id, Action::Write) + .await + } + + async fn enforce_delete( + &self, + workspace_id: &str, + uid: &i64, + oid: &str, + ) -> Result { + self + .collab_access_control + .enforce_access_level(workspace_id, uid, oid, AFAccessLevel::FullAccess) + .await + } +} diff --git a/services/appflowy-collaborate/src/collab/mod.rs b/services/appflowy-collaborate/src/collab/mod.rs index 3eed3b9e..3480b52b 100644 --- a/services/appflowy-collaborate/src/collab/mod.rs +++ b/services/appflowy-collaborate/src/collab/mod.rs @@ -3,3 +3,8 @@ pub mod cache; pub mod disk_cache; pub mod mem_cache; pub mod notification; +pub mod queue; +mod queue_redis_ops; +pub mod storage; +pub mod validator; +pub use queue_redis_ops::{PendingWrite, RedisSortedSet, WritePriority}; diff --git a/src/biz/collab/queue.rs b/services/appflowy-collaborate/src/collab/queue.rs similarity index 98% rename from src/biz/collab/queue.rs rename to services/appflowy-collaborate/src/collab/queue.rs index 77aaf283..3f0c2e6d 100644 --- a/src/biz/collab/queue.rs +++ b/services/appflowy-collaborate/src/collab/queue.rs @@ -12,16 +12,16 @@ use tokio::sync::Mutex; use tokio::time::{interval, sleep, sleep_until, Instant}; use tracing::{error, instrument, trace, warn}; +use crate::collab::cache::CollabCache; use app_error::AppError; -use appflowy_collaborate::collab::cache::CollabCache; use database_entity::dto::{CollabParams, QueryCollab, QueryCollabResult}; -use crate::biz::collab::metrics::CollabMetrics; -use crate::biz::collab::queue_redis_ops::{ +use crate::collab::queue_redis_ops::{ get_pending_meta, remove_all_pending_meta, remove_pending_meta, storage_cache_key, PendingWrite, WritePriority, PENDING_WRITE_META_EXPIRE_SECS, }; -use crate::biz::collab::RedisSortedSet; +use crate::collab::RedisSortedSet; +use crate::metrics::CollabMetrics; use crate::state::RedisConnectionManager; type PendingWriteSet = Arc; diff --git a/src/biz/collab/queue_redis_ops.rs b/services/appflowy-collaborate/src/collab/queue_redis_ops.rs similarity index 98% rename from src/biz/collab/queue_redis_ops.rs rename to services/appflowy-collaborate/src/collab/queue_redis_ops.rs index 53ca04b5..9477e371 100644 --- a/src/biz/collab/queue_redis_ops.rs +++ b/services/appflowy-collaborate/src/collab/queue_redis_ops.rs @@ -1,4 +1,4 @@ -use crate::biz::collab::queue::PendingWriteMeta; +use crate::collab::queue::PendingWriteMeta; use crate::state::RedisConnectionManager; use app_error::AppError; use futures_util::StreamExt; @@ -235,7 +235,7 @@ impl PartialOrd for PendingWrite { #[cfg(test)] mod tests { - use crate::biz::collab::{PendingWrite, RedisSortedSet, WritePriority}; + use crate::collab::{PendingWrite, RedisSortedSet, WritePriority}; use anyhow::Context; use std::time::Duration; diff --git a/src/biz/collab/storage.rs b/services/appflowy-collaborate/src/collab/storage.rs similarity index 94% rename from src/biz/collab/storage.rs rename to services/appflowy-collaborate/src/collab/storage.rs index 047a4dc5..5e436072 100644 --- a/src/biz/collab/storage.rs +++ b/services/appflowy-collaborate/src/collab/storage.rs @@ -12,11 +12,11 @@ use tokio::time::timeout; use tracing::{error, instrument, trace}; use validator::Validate; +use crate::collab::access_control::CollabAccessControlImpl; +use crate::collab::cache::CollabCache; +use crate::command::{CLCommandSender, CollaborationCommand}; +use crate::shared_state::RealtimeSharedState; use app_error::AppError; -use appflowy_collaborate::collab::access_control::CollabAccessControlImpl; -use appflowy_collaborate::collab::cache::CollabCache; -use appflowy_collaborate::command::{CLCommandSender, CollaborationCommand}; -use appflowy_collaborate::shared_state::RealtimeSharedState; use database::collab::{AppResult, CollabMetadata, CollabStorage, CollabStorageAccessControl}; use database_entity::dto::{ AFAccessLevel, AFSnapshotMeta, AFSnapshotMetas, CollabParams, InsertSnapshotParams, QueryCollab, @@ -24,12 +24,12 @@ use database_entity::dto::{ }; use workspace_access::WorkspaceAccessControlImpl; -use crate::api::util::CollabValidator; -use crate::biz::collab::access_control::CollabStorageAccessControlImpl; -use crate::biz::collab::metrics::CollabMetrics; -use crate::biz::collab::queue::{StorageQueue, REDIS_PENDING_WRITE_QUEUE}; -use crate::biz::collab::queue_redis_ops::WritePriority; -use crate::biz::snapshot::SnapshotControl; +use crate::collab::access_control::CollabStorageAccessControlImpl; +use crate::collab::queue::{StorageQueue, REDIS_PENDING_WRITE_QUEUE}; +use crate::collab::queue_redis_ops::WritePriority; +use crate::collab::validator::CollabValidator; +use crate::metrics::CollabMetrics; +use crate::snapshot::SnapshotControl; use crate::state::RedisConnectionManager; pub type CollabAccessControlStorage = CollabStorageImpl< diff --git a/services/appflowy-collaborate/src/collab/validator.rs b/services/appflowy-collaborate/src/collab/validator.rs new file mode 100644 index 00000000..ff8f32b1 --- /dev/null +++ b/services/appflowy-collaborate/src/collab/validator.rs @@ -0,0 +1,18 @@ +use app_error::AppError; +use async_trait::async_trait; +use collab_rt_protocol::validate_encode_collab; +use database_entity::dto::CollabParams; + +#[async_trait] +pub trait CollabValidator { + async fn check_encode_collab(&self) -> Result<(), AppError>; +} + +#[async_trait] +impl CollabValidator for CollabParams { + async fn check_encode_collab(&self) -> Result<(), AppError> { + validate_encode_collab(&self.object_id, &self.encoded_collab_v1, &self.collab_type) + .await + .map_err(|err| AppError::NoRequiredData(err.to_string())) + } +} diff --git a/services/appflowy-collaborate/src/group/group_init.rs b/services/appflowy-collaborate/src/group/group_init.rs index 0ef37ce3..4b108f47 100644 --- a/services/appflowy-collaborate/src/group/group_init.rs +++ b/services/appflowy-collaborate/src/group/group_init.rs @@ -301,10 +301,10 @@ impl EditState { } } +#[allow(dead_code)] struct CollabUpdateStreamingImpl { sender: mpsc::UnboundedSender>, stopped: Arc, - message_processor: tokio::task::JoinHandle<()>, } impl CollabUpdateStreamingImpl { @@ -314,24 +314,18 @@ impl CollabUpdateStreamingImpl { collab_redis_stream: &CollabRedisStream, ) -> Result { let stream = collab_redis_stream - .collab_update_stream(&workspace_id, &object_id, "collaborate_update_producer") + .collab_update_stream(workspace_id, object_id, "collaborate_update_producer") .await?; let stopped = Arc::new(AtomicBool::new(false)); let (sender, receiver) = mpsc::unbounded_channel(); - let message_processor = { - let stopped = stopped.clone(); - tokio::spawn(async move { - if let Err(err) = Self::consume_messages(receiver, stream).await { - error!("Failed to consume incoming updates: {}", err); - } - stopped.store(true, Ordering::SeqCst); - }) - }; - Ok(Self { - sender, - stopped, - message_processor, - }) + let cloned_stopped = stopped.clone(); + tokio::spawn(async move { + if let Err(err) = Self::consume_messages(receiver, stream).await { + error!("Failed to consume incoming updates: {}", err); + } + cloned_stopped.store(true, Ordering::SeqCst); + }); + Ok(Self { sender, stopped }) } async fn consume_messages( diff --git a/services/appflowy-collaborate/src/lib.rs b/services/appflowy-collaborate/src/lib.rs index c00b7cb0..eb065dd7 100644 --- a/services/appflowy-collaborate/src/lib.rs +++ b/services/appflowy-collaborate/src/lib.rs @@ -5,10 +5,11 @@ pub mod command; pub mod connect_state; pub mod error; mod group; -mod metrics; +pub mod metrics; mod permission; mod rt_server; pub mod shared_state; +pub mod snapshot; mod state; mod util; diff --git a/services/appflowy-collaborate/src/metrics.rs b/services/appflowy-collaborate/src/metrics.rs index 99f7deef..36cb3e46 100644 --- a/services/appflowy-collaborate/src/metrics.rs +++ b/services/appflowy-collaborate/src/metrics.rs @@ -157,3 +157,78 @@ pub(crate) fn spawn_metrics( } }); } + +#[derive(Clone)] +pub struct CollabMetrics { + success_write_snapshot_count: Gauge, + total_write_snapshot_count: Gauge, + success_write_collab_count: Gauge, + total_write_collab_count: Gauge, + total_queue_collab_count: Gauge, + success_queue_collab_count: Gauge, +} + +impl CollabMetrics { + fn init() -> Self { + Self { + success_write_snapshot_count: Gauge::default(), + total_write_snapshot_count: Default::default(), + success_write_collab_count: Default::default(), + total_write_collab_count: Default::default(), + total_queue_collab_count: Default::default(), + success_queue_collab_count: Default::default(), + } + } + + pub fn register(registry: &mut Registry) -> Self { + let metrics = Self::init(); + let realtime_registry = registry.sub_registry_with_prefix("collab"); + realtime_registry.register( + "success_write_snapshot_count", + "success write snapshot to db", + metrics.success_write_snapshot_count.clone(), + ); + realtime_registry.register( + "total_attempt_write_snapshot_count", + "total attempt write snapshot to db", + metrics.total_write_snapshot_count.clone(), + ); + realtime_registry.register( + "success_write_collab_count", + "success write collab", + metrics.success_write_collab_count.clone(), + ); + realtime_registry.register( + "total_write_collab_count", + "total write collab", + metrics.total_write_collab_count.clone(), + ); + realtime_registry.register( + "success_queue_collab_count", + "success queue collab", + metrics.success_queue_collab_count.clone(), + ); + realtime_registry.register( + "total_queue_collab_count", + "total queue pending collab", + metrics.total_queue_collab_count.clone(), + ); + + metrics + } + + pub fn record_write_snapshot(&self, success_attempt: i64, total_attempt: i64) { + self.success_write_snapshot_count.set(success_attempt); + self.total_write_snapshot_count.set(total_attempt); + } + + pub fn record_write_collab(&self, success_attempt: i64, total_attempt: i64) { + self.success_write_collab_count.set(success_attempt); + self.total_write_collab_count.set(total_attempt); + } + + pub fn record_queue_collab(&self, success_attempt: i64, total_attempt: i64) { + self.success_queue_collab_count.set(success_attempt); + self.total_queue_collab_count.set(total_attempt); + } +} diff --git a/src/biz/snapshot/cache.rs b/services/appflowy-collaborate/src/snapshot/cache.rs similarity index 100% rename from src/biz/snapshot/cache.rs rename to services/appflowy-collaborate/src/snapshot/cache.rs diff --git a/src/biz/snapshot/mod.rs b/services/appflowy-collaborate/src/snapshot/mod.rs similarity index 100% rename from src/biz/snapshot/mod.rs rename to services/appflowy-collaborate/src/snapshot/mod.rs diff --git a/src/biz/snapshot/queue.rs b/services/appflowy-collaborate/src/snapshot/queue.rs similarity index 100% rename from src/biz/snapshot/queue.rs rename to services/appflowy-collaborate/src/snapshot/queue.rs diff --git a/src/biz/snapshot/snapshot_control.rs b/services/appflowy-collaborate/src/snapshot/snapshot_control.rs similarity index 98% rename from src/biz/snapshot/snapshot_control.rs rename to services/appflowy-collaborate/src/snapshot/snapshot_control.rs index 6f0ae0fe..f3ceff43 100644 --- a/src/biz/snapshot/snapshot_control.rs +++ b/services/appflowy-collaborate/src/snapshot/snapshot_control.rs @@ -1,6 +1,6 @@ -use crate::biz::collab::metrics::CollabMetrics; -use crate::biz::snapshot::cache::SnapshotCache; -use crate::biz::snapshot::queue::PendingQueue; +use crate::metrics::CollabMetrics; +use crate::snapshot::cache::SnapshotCache; +use crate::snapshot::queue::PendingQueue; use crate::state::RedisConnectionManager; use anyhow::anyhow; use app_error::AppError; diff --git a/src/api/ws.rs b/src/api/ws.rs index a6713bfa..ad5a521d 100644 --- a/src/api/ws.rs +++ b/src/api/ws.rs @@ -15,11 +15,11 @@ use app_error::AppError; use appflowy_collaborate::actix_ws::client::rt_client::RealtimeClient; use appflowy_collaborate::actix_ws::server::RealtimeServerActor; use appflowy_collaborate::collab::access_control::RealtimeCollabAccessControlImpl; +use appflowy_collaborate::collab::storage::CollabAccessControlStorage; use collab_rt_entity::user::{AFUserChange, RealtimeUser, UserMessage}; use collab_rt_entity::RealtimeMessage; use shared_entity::response::AppResponseError; -use crate::biz::collab::storage::CollabAccessControlStorage; use crate::biz::user::auth::jwt::{authorization_from_token, UserUuid}; use crate::state::AppState; diff --git a/src/application.rs b/src/application.rs index 0e77907c..7885d035 100644 --- a/src/application.rs +++ b/src/application.rs @@ -9,12 +9,8 @@ use access_control::access::{enable_access_control, AccessControl}; use crate::api::ai_tool::ai_tool_scope; use crate::api::chat::chat_scope; -use crate::biz::collab::access_control::{ - CollabMiddlewareAccessControl, CollabStorageAccessControlImpl, -}; -use crate::biz::collab::storage::CollabStorageImpl; +use crate::biz::collab::access_control::CollabMiddlewareAccessControl; use crate::biz::pg_listener::PgListeners; -use crate::biz::snapshot::SnapshotControl; use crate::biz::workspace::access_control::WorkspaceMiddlewareAccessControl; use crate::config::config::{Config, DatabaseSetting, GoTrueSetting, S3Setting}; use crate::middleware::access_control_mw::MiddlewareAccessControlTransform; @@ -32,11 +28,13 @@ use anyhow::{Context, Error}; use appflowy_ai_client::client::AppFlowyAIClient; use appflowy_collaborate::actix_ws::server::RealtimeServerActor; use appflowy_collaborate::collab::access_control::{ - CollabAccessControlImpl, RealtimeCollabAccessControlImpl, + CollabAccessControlImpl, CollabStorageAccessControlImpl, RealtimeCollabAccessControlImpl, }; use appflowy_collaborate::collab::cache::CollabCache; +use appflowy_collaborate::collab::storage::CollabStorageImpl; use appflowy_collaborate::command::{CLCommandReceiver, CLCommandSender}; use appflowy_collaborate::shared_state::RealtimeSharedState; +use appflowy_collaborate::snapshot::SnapshotControl; use appflowy_collaborate::CollaborationServer; use database::file::bucket_s3_impl::S3BucketStorage; use openssl::ssl::{SslAcceptor, SslAcceptorBuilder, SslFiletype, SslMethod}; diff --git a/src/biz/collab/access_control.rs b/src/biz/collab/access_control.rs index cf6588d8..a0275ca5 100644 --- a/src/biz/collab/access_control.rs +++ b/src/biz/collab/access_control.rs @@ -8,10 +8,8 @@ use tracing::{instrument, trace}; use access_control::act::Action; use access_control::collab::CollabAccessControl; -use access_control::workspace::WorkspaceAccessControl; use app_error::AppError; use appflowy_collaborate::collab::cache::CollabCache; -use database::collab::CollabStorageAccessControl; use database_entity::dto::AFAccessLevel; use crate::api::workspace::{COLLAB_PATTERN, V1_COLLAB_PATTERN}; @@ -139,85 +137,3 @@ where } } } - -#[derive(Clone)] -pub struct CollabStorageAccessControlImpl { - pub(crate) collab_access_control: Arc, - pub(crate) workspace_access_control: Arc, - pub(crate) cache: CollabCache, -} - -#[async_trait] -impl CollabStorageAccessControl - for CollabStorageAccessControlImpl -where - CollabAC: CollabAccessControl, - WorkspaceAC: WorkspaceAccessControl, -{ - async fn update_policy( - &self, - uid: &i64, - oid: &str, - level: AFAccessLevel, - ) -> Result<(), AppError> { - self - .collab_access_control - .update_access_level_policy(uid, oid, level) - .await - } - - async fn enforce_read_collab( - &self, - workspace_id: &str, - uid: &i64, - oid: &str, - ) -> Result { - let collab_exists = self.cache.is_exist(oid).await?; - if !collab_exists { - // If the collab does not exist, we should not enforce the access control. We consider the user - // has the permission to read the collab - return Ok(true); - } - self - .collab_access_control - .enforce_action(workspace_id, uid, oid, Action::Read) - .await - } - - async fn enforce_write_collab( - &self, - workspace_id: &str, - uid: &i64, - oid: &str, - ) -> Result { - let collab_exists = self.cache.is_exist(oid).await?; - if !collab_exists { - // If the collab does not exist, we should not enforce the access control. we consider the user - // has the permission to write the collab - return Ok(true); - } - self - .collab_access_control - .enforce_action(workspace_id, uid, oid, Action::Write) - .await - } - - async fn enforce_write_workspace(&self, uid: &i64, workspace_id: &str) -> Result { - self - .workspace_access_control - .enforce_action(uid, workspace_id, Action::Write) - .await - } - - async fn enforce_delete( - &self, - workspace_id: &str, - uid: &i64, - oid: &str, - ) -> Result { - self - .collab_access_control - .enforce_access_level(workspace_id, uid, oid, AFAccessLevel::FullAccess) - .await - } -} diff --git a/src/biz/collab/metrics.rs b/src/biz/collab/metrics.rs deleted file mode 100644 index 3a2c0fad..00000000 --- a/src/biz/collab/metrics.rs +++ /dev/null @@ -1,77 +0,0 @@ -use prometheus_client::metrics::gauge::Gauge; -use prometheus_client::registry::Registry; - -#[derive(Clone)] -pub struct CollabMetrics { - success_write_snapshot_count: Gauge, - total_write_snapshot_count: Gauge, - success_write_collab_count: Gauge, - total_write_collab_count: Gauge, - total_queue_collab_count: Gauge, - success_queue_collab_count: Gauge, -} - -impl CollabMetrics { - fn init() -> Self { - Self { - success_write_snapshot_count: Gauge::default(), - total_write_snapshot_count: Default::default(), - success_write_collab_count: Default::default(), - total_write_collab_count: Default::default(), - total_queue_collab_count: Default::default(), - success_queue_collab_count: Default::default(), - } - } - - pub fn register(registry: &mut Registry) -> Self { - let metrics = Self::init(); - let realtime_registry = registry.sub_registry_with_prefix("collab"); - realtime_registry.register( - "success_write_snapshot_count", - "success write snapshot to db", - metrics.success_write_snapshot_count.clone(), - ); - realtime_registry.register( - "total_attempt_write_snapshot_count", - "total attempt write snapshot to db", - metrics.total_write_snapshot_count.clone(), - ); - realtime_registry.register( - "success_write_collab_count", - "success write collab", - metrics.success_write_collab_count.clone(), - ); - realtime_registry.register( - "total_write_collab_count", - "total write collab", - metrics.total_write_collab_count.clone(), - ); - realtime_registry.register( - "success_queue_collab_count", - "success queue collab", - metrics.success_queue_collab_count.clone(), - ); - realtime_registry.register( - "total_queue_collab_count", - "total queue pending collab", - metrics.total_queue_collab_count.clone(), - ); - - metrics - } - - pub fn record_write_snapshot(&self, success_attempt: i64, total_attempt: i64) { - self.success_write_snapshot_count.set(success_attempt); - self.total_write_snapshot_count.set(total_attempt); - } - - pub fn record_write_collab(&self, success_attempt: i64, total_attempt: i64) { - self.success_write_collab_count.set(success_attempt); - self.total_write_collab_count.set(total_attempt); - } - - pub fn record_queue_collab(&self, success_attempt: i64, total_attempt: i64) { - self.success_queue_collab_count.set(success_attempt); - self.total_queue_collab_count.set(total_attempt); - } -} diff --git a/src/biz/collab/mod.rs b/src/biz/collab/mod.rs index 0f29cd70..109ec9d1 100644 --- a/src/biz/collab/mod.rs +++ b/src/biz/collab/mod.rs @@ -1,7 +1,2 @@ pub mod access_control; -pub mod metrics; pub mod ops; -pub mod queue; -mod queue_redis_ops; -pub use queue_redis_ops::{PendingWrite, RedisSortedSet, WritePriority}; -pub mod storage; diff --git a/src/biz/mod.rs b/src/biz/mod.rs index aca62d60..ff5291b4 100644 --- a/src/biz/mod.rs +++ b/src/biz/mod.rs @@ -1,7 +1,6 @@ pub mod chat; pub mod collab; pub mod pg_listener; -pub mod snapshot; pub mod user; pub mod utils; pub mod workspace; diff --git a/src/biz/user/user_init.rs b/src/biz/user/user_init.rs index b1c95203..e42081c3 100644 --- a/src/biz/user/user_init.rs +++ b/src/biz/user/user_init.rs @@ -1,6 +1,6 @@ -use crate::biz::collab::storage::CollabAccessControlStorage; use app_error::AppError; +use appflowy_collaborate::collab::storage::CollabAccessControlStorage; use collab::core::origin::CollabOrigin; use collab::preclude::{Any, Collab, MapPrelim}; use collab_entity::define::WORKSPACE_DATABASES; diff --git a/src/biz/workspace/ops.rs b/src/biz/workspace/ops.rs index 4a85f99e..1f5fa30b 100644 --- a/src/biz/workspace/ops.rs +++ b/src/biz/workspace/ops.rs @@ -9,6 +9,7 @@ use uuid::Uuid; use access_control::workspace::WorkspaceAccessControl; use app_error::AppError; +use appflowy_collaborate::collab::storage::CollabAccessControlStorage; use database::collab::upsert_collab_member_with_txn; use database::file::bucket_s3_impl::BucketClientS3Impl; use database::file::BucketStorage; @@ -34,7 +35,6 @@ use shared_entity::dto::workspace_dto::{ use shared_entity::response::AppResponseError; use workspace_template::document::get_started::GetStartedDocumentTemplate; -use crate::biz::collab::storage::CollabAccessControlStorage; use crate::biz::user::user_init::initialize_workspace_for_user; use crate::mailer::{Mailer, WorkspaceInviteMailerParam}; use crate::state::GoTrueAdmin; diff --git a/src/state.rs b/src/state.rs index 3614ba5d..da11c4df 100644 --- a/src/state.rs +++ b/src/state.rs @@ -13,6 +13,8 @@ use access_control::metrics::AccessControlMetrics; use app_error::AppError; use appflowy_collaborate::collab::access_control::CollabAccessControlImpl; use appflowy_collaborate::collab::cache::CollabCache; +use appflowy_collaborate::collab::storage::CollabAccessControlStorage; +use appflowy_collaborate::metrics::CollabMetrics; use appflowy_collaborate::shared_state::RealtimeSharedState; use appflowy_collaborate::CollabRealtimeMetrics; use database::file::bucket_s3_impl::S3BucketStorage; @@ -22,8 +24,6 @@ use snowflake::Snowflake; use workspace_access::WorkspaceAccessControlImpl; use crate::api::metrics::RequestMetrics; -use crate::biz::collab::metrics::CollabMetrics; -use crate::biz::collab::storage::CollabAccessControlStorage; use crate::biz::pg_listener::PgListeners; use crate::config::config::Config; use crate::mailer::Mailer; diff --git a/tests/collab/pending_write_test.rs b/tests/collab/pending_write_test.rs index 090084af..ff3ae1ae 100644 --- a/tests/collab/pending_write_test.rs +++ b/tests/collab/pending_write_test.rs @@ -1,8 +1,8 @@ use crate::collab::util::{generate_random_bytes, redis_connection_manager}; use crate::sql_test::util::{setup_db, test_create_user}; -use appflowy_cloud::biz::collab::queue::StorageQueue; -use appflowy_cloud::biz::collab::WritePriority; use appflowy_collaborate::collab::cache::CollabCache; +use appflowy_collaborate::collab::queue::StorageQueue; +use appflowy_collaborate::collab::WritePriority; use client_api_test::setup_log; use collab::entity::EncodedCollab; use collab_entity::CollabType;