chore: migrate collab storage to appflowy-collaborate (#566)

* chore: migrate collab storage to appflowy-collaborate

* fix: clippy error

* chore: remove handler

---------

Co-authored-by: nathan <nathan@appflowy.io>
This commit is contained in:
Khor Shu Heng 2024-05-21 10:06:26 +08:00 committed by GitHub
parent ec7eb54bfc
commit 67d9fad7d7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 237 additions and 222 deletions

2
Cargo.lock generated
View File

@ -671,6 +671,8 @@ dependencies = [
"tokio-util",
"tracing",
"uuid",
"validator",
"workspace-access",
"yrs",
]

View File

@ -1,5 +1,6 @@
use collab_entity::CollabType;
use std::collections::BTreeMap;
use std::fmt::Display;
use std::ops::Deref;
use std::str::FromStr;
@ -23,9 +24,9 @@ pub struct MessageId {
pub sequence_number: u16,
}
impl ToString for MessageId {
fn to_string(&self) -> String {
format!("{}-{}", self.timestamp_ms, self.sequence_number)
impl Display for MessageId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}-{}", self.timestamp_ms, self.sequence_number)
}
}
@ -233,9 +234,9 @@ impl FromRedisValue for RedisString {
}
}
impl ToString for RedisString {
fn to_string(&self) -> String {
self.0.clone()
impl Display for RedisString {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0.clone())
}
}

View File

@ -54,6 +54,8 @@ redis = "0.25.2"
parking_lot = "0.12.1"
lazy_static = "1.4.0"
itertools = "0.12.0"
validator = "0.16.1"
workspace-access.workspace = true
[dev-dependencies]
rand = "0.8.5"

View File

@ -1,11 +1,15 @@
use async_trait::async_trait;
use std::sync::Arc;
use tracing::instrument;
use crate::collab::cache::CollabCache;
use access_control::access::ObjectType;
use access_control::access::{enable_access_control, AccessControl};
use access_control::act::{Action, ActionVariant};
use access_control::collab::{CollabAccessControl, RealtimeAccessControl};
use access_control::workspace::WorkspaceAccessControl;
use app_error::AppError;
use database::collab::CollabStorageAccessControl;
use database_entity::dto::AFAccessLevel;
#[derive(Clone)]
@ -159,3 +163,85 @@ impl RealtimeAccessControl for RealtimeCollabAccessControlImpl {
.await
}
}
#[derive(Clone)]
pub struct CollabStorageAccessControlImpl<CollabAC, WorkspaceAC> {
pub collab_access_control: Arc<CollabAC>,
pub workspace_access_control: Arc<WorkspaceAC>,
pub cache: CollabCache,
}
#[async_trait]
impl<CollabAC, WorkspaceAC> CollabStorageAccessControl
for CollabStorageAccessControlImpl<CollabAC, WorkspaceAC>
where
CollabAC: CollabAccessControl,
WorkspaceAC: WorkspaceAccessControl,
{
async fn update_policy(
&self,
uid: &i64,
oid: &str,
level: AFAccessLevel,
) -> Result<(), AppError> {
self
.collab_access_control
.update_access_level_policy(uid, oid, level)
.await
}
async fn enforce_read_collab(
&self,
workspace_id: &str,
uid: &i64,
oid: &str,
) -> Result<bool, AppError> {
let collab_exists = self.cache.is_exist(oid).await?;
if !collab_exists {
// If the collab does not exist, we should not enforce the access control. We consider the user
// has the permission to read the collab
return Ok(true);
}
self
.collab_access_control
.enforce_action(workspace_id, uid, oid, Action::Read)
.await
}
async fn enforce_write_collab(
&self,
workspace_id: &str,
uid: &i64,
oid: &str,
) -> Result<bool, AppError> {
let collab_exists = self.cache.is_exist(oid).await?;
if !collab_exists {
// If the collab does not exist, we should not enforce the access control. we consider the user
// has the permission to write the collab
return Ok(true);
}
self
.collab_access_control
.enforce_action(workspace_id, uid, oid, Action::Write)
.await
}
async fn enforce_write_workspace(&self, uid: &i64, workspace_id: &str) -> Result<bool, AppError> {
self
.workspace_access_control
.enforce_action(uid, workspace_id, Action::Write)
.await
}
async fn enforce_delete(
&self,
workspace_id: &str,
uid: &i64,
oid: &str,
) -> Result<bool, AppError> {
self
.collab_access_control
.enforce_access_level(workspace_id, uid, oid, AFAccessLevel::FullAccess)
.await
}
}

View File

@ -3,3 +3,8 @@ pub mod cache;
pub mod disk_cache;
pub mod mem_cache;
pub mod notification;
pub mod queue;
mod queue_redis_ops;
pub mod storage;
pub mod validator;
pub use queue_redis_ops::{PendingWrite, RedisSortedSet, WritePriority};

View File

@ -12,16 +12,16 @@ use tokio::sync::Mutex;
use tokio::time::{interval, sleep, sleep_until, Instant};
use tracing::{error, instrument, trace, warn};
use crate::collab::cache::CollabCache;
use app_error::AppError;
use appflowy_collaborate::collab::cache::CollabCache;
use database_entity::dto::{CollabParams, QueryCollab, QueryCollabResult};
use crate::biz::collab::metrics::CollabMetrics;
use crate::biz::collab::queue_redis_ops::{
use crate::collab::queue_redis_ops::{
get_pending_meta, remove_all_pending_meta, remove_pending_meta, storage_cache_key, PendingWrite,
WritePriority, PENDING_WRITE_META_EXPIRE_SECS,
};
use crate::biz::collab::RedisSortedSet;
use crate::collab::RedisSortedSet;
use crate::metrics::CollabMetrics;
use crate::state::RedisConnectionManager;
type PendingWriteSet = Arc<RedisSortedSet>;

View File

@ -1,4 +1,4 @@
use crate::biz::collab::queue::PendingWriteMeta;
use crate::collab::queue::PendingWriteMeta;
use crate::state::RedisConnectionManager;
use app_error::AppError;
use futures_util::StreamExt;
@ -235,7 +235,7 @@ impl PartialOrd for PendingWrite {
#[cfg(test)]
mod tests {
use crate::biz::collab::{PendingWrite, RedisSortedSet, WritePriority};
use crate::collab::{PendingWrite, RedisSortedSet, WritePriority};
use anyhow::Context;
use std::time::Duration;

View File

@ -12,11 +12,11 @@ use tokio::time::timeout;
use tracing::{error, instrument, trace};
use validator::Validate;
use crate::collab::access_control::CollabAccessControlImpl;
use crate::collab::cache::CollabCache;
use crate::command::{CLCommandSender, CollaborationCommand};
use crate::shared_state::RealtimeSharedState;
use app_error::AppError;
use appflowy_collaborate::collab::access_control::CollabAccessControlImpl;
use appflowy_collaborate::collab::cache::CollabCache;
use appflowy_collaborate::command::{CLCommandSender, CollaborationCommand};
use appflowy_collaborate::shared_state::RealtimeSharedState;
use database::collab::{AppResult, CollabMetadata, CollabStorage, CollabStorageAccessControl};
use database_entity::dto::{
AFAccessLevel, AFSnapshotMeta, AFSnapshotMetas, CollabParams, InsertSnapshotParams, QueryCollab,
@ -24,12 +24,12 @@ use database_entity::dto::{
};
use workspace_access::WorkspaceAccessControlImpl;
use crate::api::util::CollabValidator;
use crate::biz::collab::access_control::CollabStorageAccessControlImpl;
use crate::biz::collab::metrics::CollabMetrics;
use crate::biz::collab::queue::{StorageQueue, REDIS_PENDING_WRITE_QUEUE};
use crate::biz::collab::queue_redis_ops::WritePriority;
use crate::biz::snapshot::SnapshotControl;
use crate::collab::access_control::CollabStorageAccessControlImpl;
use crate::collab::queue::{StorageQueue, REDIS_PENDING_WRITE_QUEUE};
use crate::collab::queue_redis_ops::WritePriority;
use crate::collab::validator::CollabValidator;
use crate::metrics::CollabMetrics;
use crate::snapshot::SnapshotControl;
use crate::state::RedisConnectionManager;
pub type CollabAccessControlStorage = CollabStorageImpl<

View File

@ -0,0 +1,18 @@
use app_error::AppError;
use async_trait::async_trait;
use collab_rt_protocol::validate_encode_collab;
use database_entity::dto::CollabParams;
#[async_trait]
pub trait CollabValidator {
async fn check_encode_collab(&self) -> Result<(), AppError>;
}
#[async_trait]
impl CollabValidator for CollabParams {
async fn check_encode_collab(&self) -> Result<(), AppError> {
validate_encode_collab(&self.object_id, &self.encoded_collab_v1, &self.collab_type)
.await
.map_err(|err| AppError::NoRequiredData(err.to_string()))
}
}

View File

@ -301,10 +301,10 @@ impl EditState {
}
}
#[allow(dead_code)]
struct CollabUpdateStreamingImpl {
sender: mpsc::UnboundedSender<Vec<u8>>,
stopped: Arc<AtomicBool>,
message_processor: tokio::task::JoinHandle<()>,
}
impl CollabUpdateStreamingImpl {
@ -314,24 +314,18 @@ impl CollabUpdateStreamingImpl {
collab_redis_stream: &CollabRedisStream,
) -> Result<Self, StreamError> {
let stream = collab_redis_stream
.collab_update_stream(&workspace_id, &object_id, "collaborate_update_producer")
.collab_update_stream(workspace_id, object_id, "collaborate_update_producer")
.await?;
let stopped = Arc::new(AtomicBool::new(false));
let (sender, receiver) = mpsc::unbounded_channel();
let message_processor = {
let stopped = stopped.clone();
tokio::spawn(async move {
if let Err(err) = Self::consume_messages(receiver, stream).await {
error!("Failed to consume incoming updates: {}", err);
}
stopped.store(true, Ordering::SeqCst);
})
};
Ok(Self {
sender,
stopped,
message_processor,
})
let cloned_stopped = stopped.clone();
tokio::spawn(async move {
if let Err(err) = Self::consume_messages(receiver, stream).await {
error!("Failed to consume incoming updates: {}", err);
}
cloned_stopped.store(true, Ordering::SeqCst);
});
Ok(Self { sender, stopped })
}
async fn consume_messages(

View File

@ -5,10 +5,11 @@ pub mod command;
pub mod connect_state;
pub mod error;
mod group;
mod metrics;
pub mod metrics;
mod permission;
mod rt_server;
pub mod shared_state;
pub mod snapshot;
mod state;
mod util;

View File

@ -157,3 +157,78 @@ pub(crate) fn spawn_metrics<S>(
}
});
}
#[derive(Clone)]
pub struct CollabMetrics {
success_write_snapshot_count: Gauge,
total_write_snapshot_count: Gauge,
success_write_collab_count: Gauge,
total_write_collab_count: Gauge,
total_queue_collab_count: Gauge,
success_queue_collab_count: Gauge,
}
impl CollabMetrics {
fn init() -> Self {
Self {
success_write_snapshot_count: Gauge::default(),
total_write_snapshot_count: Default::default(),
success_write_collab_count: Default::default(),
total_write_collab_count: Default::default(),
total_queue_collab_count: Default::default(),
success_queue_collab_count: Default::default(),
}
}
pub fn register(registry: &mut Registry) -> Self {
let metrics = Self::init();
let realtime_registry = registry.sub_registry_with_prefix("collab");
realtime_registry.register(
"success_write_snapshot_count",
"success write snapshot to db",
metrics.success_write_snapshot_count.clone(),
);
realtime_registry.register(
"total_attempt_write_snapshot_count",
"total attempt write snapshot to db",
metrics.total_write_snapshot_count.clone(),
);
realtime_registry.register(
"success_write_collab_count",
"success write collab",
metrics.success_write_collab_count.clone(),
);
realtime_registry.register(
"total_write_collab_count",
"total write collab",
metrics.total_write_collab_count.clone(),
);
realtime_registry.register(
"success_queue_collab_count",
"success queue collab",
metrics.success_queue_collab_count.clone(),
);
realtime_registry.register(
"total_queue_collab_count",
"total queue pending collab",
metrics.total_queue_collab_count.clone(),
);
metrics
}
pub fn record_write_snapshot(&self, success_attempt: i64, total_attempt: i64) {
self.success_write_snapshot_count.set(success_attempt);
self.total_write_snapshot_count.set(total_attempt);
}
pub fn record_write_collab(&self, success_attempt: i64, total_attempt: i64) {
self.success_write_collab_count.set(success_attempt);
self.total_write_collab_count.set(total_attempt);
}
pub fn record_queue_collab(&self, success_attempt: i64, total_attempt: i64) {
self.success_queue_collab_count.set(success_attempt);
self.total_queue_collab_count.set(total_attempt);
}
}

View File

@ -1,6 +1,6 @@
use crate::biz::collab::metrics::CollabMetrics;
use crate::biz::snapshot::cache::SnapshotCache;
use crate::biz::snapshot::queue::PendingQueue;
use crate::metrics::CollabMetrics;
use crate::snapshot::cache::SnapshotCache;
use crate::snapshot::queue::PendingQueue;
use crate::state::RedisConnectionManager;
use anyhow::anyhow;
use app_error::AppError;

View File

@ -15,11 +15,11 @@ use app_error::AppError;
use appflowy_collaborate::actix_ws::client::rt_client::RealtimeClient;
use appflowy_collaborate::actix_ws::server::RealtimeServerActor;
use appflowy_collaborate::collab::access_control::RealtimeCollabAccessControlImpl;
use appflowy_collaborate::collab::storage::CollabAccessControlStorage;
use collab_rt_entity::user::{AFUserChange, RealtimeUser, UserMessage};
use collab_rt_entity::RealtimeMessage;
use shared_entity::response::AppResponseError;
use crate::biz::collab::storage::CollabAccessControlStorage;
use crate::biz::user::auth::jwt::{authorization_from_token, UserUuid};
use crate::state::AppState;

View File

@ -9,12 +9,8 @@ use access_control::access::{enable_access_control, AccessControl};
use crate::api::ai_tool::ai_tool_scope;
use crate::api::chat::chat_scope;
use crate::biz::collab::access_control::{
CollabMiddlewareAccessControl, CollabStorageAccessControlImpl,
};
use crate::biz::collab::storage::CollabStorageImpl;
use crate::biz::collab::access_control::CollabMiddlewareAccessControl;
use crate::biz::pg_listener::PgListeners;
use crate::biz::snapshot::SnapshotControl;
use crate::biz::workspace::access_control::WorkspaceMiddlewareAccessControl;
use crate::config::config::{Config, DatabaseSetting, GoTrueSetting, S3Setting};
use crate::middleware::access_control_mw::MiddlewareAccessControlTransform;
@ -32,11 +28,13 @@ use anyhow::{Context, Error};
use appflowy_ai_client::client::AppFlowyAIClient;
use appflowy_collaborate::actix_ws::server::RealtimeServerActor;
use appflowy_collaborate::collab::access_control::{
CollabAccessControlImpl, RealtimeCollabAccessControlImpl,
CollabAccessControlImpl, CollabStorageAccessControlImpl, RealtimeCollabAccessControlImpl,
};
use appflowy_collaborate::collab::cache::CollabCache;
use appflowy_collaborate::collab::storage::CollabStorageImpl;
use appflowy_collaborate::command::{CLCommandReceiver, CLCommandSender};
use appflowy_collaborate::shared_state::RealtimeSharedState;
use appflowy_collaborate::snapshot::SnapshotControl;
use appflowy_collaborate::CollaborationServer;
use database::file::bucket_s3_impl::S3BucketStorage;
use openssl::ssl::{SslAcceptor, SslAcceptorBuilder, SslFiletype, SslMethod};

View File

@ -8,10 +8,8 @@ use tracing::{instrument, trace};
use access_control::act::Action;
use access_control::collab::CollabAccessControl;
use access_control::workspace::WorkspaceAccessControl;
use app_error::AppError;
use appflowy_collaborate::collab::cache::CollabCache;
use database::collab::CollabStorageAccessControl;
use database_entity::dto::AFAccessLevel;
use crate::api::workspace::{COLLAB_PATTERN, V1_COLLAB_PATTERN};
@ -139,85 +137,3 @@ where
}
}
}
#[derive(Clone)]
pub struct CollabStorageAccessControlImpl<CollabAC, WorkspaceAC> {
pub(crate) collab_access_control: Arc<CollabAC>,
pub(crate) workspace_access_control: Arc<WorkspaceAC>,
pub(crate) cache: CollabCache,
}
#[async_trait]
impl<CollabAC, WorkspaceAC> CollabStorageAccessControl
for CollabStorageAccessControlImpl<CollabAC, WorkspaceAC>
where
CollabAC: CollabAccessControl,
WorkspaceAC: WorkspaceAccessControl,
{
async fn update_policy(
&self,
uid: &i64,
oid: &str,
level: AFAccessLevel,
) -> Result<(), AppError> {
self
.collab_access_control
.update_access_level_policy(uid, oid, level)
.await
}
async fn enforce_read_collab(
&self,
workspace_id: &str,
uid: &i64,
oid: &str,
) -> Result<bool, AppError> {
let collab_exists = self.cache.is_exist(oid).await?;
if !collab_exists {
// If the collab does not exist, we should not enforce the access control. We consider the user
// has the permission to read the collab
return Ok(true);
}
self
.collab_access_control
.enforce_action(workspace_id, uid, oid, Action::Read)
.await
}
async fn enforce_write_collab(
&self,
workspace_id: &str,
uid: &i64,
oid: &str,
) -> Result<bool, AppError> {
let collab_exists = self.cache.is_exist(oid).await?;
if !collab_exists {
// If the collab does not exist, we should not enforce the access control. we consider the user
// has the permission to write the collab
return Ok(true);
}
self
.collab_access_control
.enforce_action(workspace_id, uid, oid, Action::Write)
.await
}
async fn enforce_write_workspace(&self, uid: &i64, workspace_id: &str) -> Result<bool, AppError> {
self
.workspace_access_control
.enforce_action(uid, workspace_id, Action::Write)
.await
}
async fn enforce_delete(
&self,
workspace_id: &str,
uid: &i64,
oid: &str,
) -> Result<bool, AppError> {
self
.collab_access_control
.enforce_access_level(workspace_id, uid, oid, AFAccessLevel::FullAccess)
.await
}
}

View File

@ -1,77 +0,0 @@
use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::registry::Registry;
#[derive(Clone)]
pub struct CollabMetrics {
success_write_snapshot_count: Gauge,
total_write_snapshot_count: Gauge,
success_write_collab_count: Gauge,
total_write_collab_count: Gauge,
total_queue_collab_count: Gauge,
success_queue_collab_count: Gauge,
}
impl CollabMetrics {
fn init() -> Self {
Self {
success_write_snapshot_count: Gauge::default(),
total_write_snapshot_count: Default::default(),
success_write_collab_count: Default::default(),
total_write_collab_count: Default::default(),
total_queue_collab_count: Default::default(),
success_queue_collab_count: Default::default(),
}
}
pub fn register(registry: &mut Registry) -> Self {
let metrics = Self::init();
let realtime_registry = registry.sub_registry_with_prefix("collab");
realtime_registry.register(
"success_write_snapshot_count",
"success write snapshot to db",
metrics.success_write_snapshot_count.clone(),
);
realtime_registry.register(
"total_attempt_write_snapshot_count",
"total attempt write snapshot to db",
metrics.total_write_snapshot_count.clone(),
);
realtime_registry.register(
"success_write_collab_count",
"success write collab",
metrics.success_write_collab_count.clone(),
);
realtime_registry.register(
"total_write_collab_count",
"total write collab",
metrics.total_write_collab_count.clone(),
);
realtime_registry.register(
"success_queue_collab_count",
"success queue collab",
metrics.success_queue_collab_count.clone(),
);
realtime_registry.register(
"total_queue_collab_count",
"total queue pending collab",
metrics.total_queue_collab_count.clone(),
);
metrics
}
pub fn record_write_snapshot(&self, success_attempt: i64, total_attempt: i64) {
self.success_write_snapshot_count.set(success_attempt);
self.total_write_snapshot_count.set(total_attempt);
}
pub fn record_write_collab(&self, success_attempt: i64, total_attempt: i64) {
self.success_write_collab_count.set(success_attempt);
self.total_write_collab_count.set(total_attempt);
}
pub fn record_queue_collab(&self, success_attempt: i64, total_attempt: i64) {
self.success_queue_collab_count.set(success_attempt);
self.total_queue_collab_count.set(total_attempt);
}
}

View File

@ -1,7 +1,2 @@
pub mod access_control;
pub mod metrics;
pub mod ops;
pub mod queue;
mod queue_redis_ops;
pub use queue_redis_ops::{PendingWrite, RedisSortedSet, WritePriority};
pub mod storage;

View File

@ -1,7 +1,6 @@
pub mod chat;
pub mod collab;
pub mod pg_listener;
pub mod snapshot;
pub mod user;
pub mod utils;
pub mod workspace;

View File

@ -1,6 +1,6 @@
use crate::biz::collab::storage::CollabAccessControlStorage;
use app_error::AppError;
use appflowy_collaborate::collab::storage::CollabAccessControlStorage;
use collab::core::origin::CollabOrigin;
use collab::preclude::{Any, Collab, MapPrelim};
use collab_entity::define::WORKSPACE_DATABASES;

View File

@ -9,6 +9,7 @@ use uuid::Uuid;
use access_control::workspace::WorkspaceAccessControl;
use app_error::AppError;
use appflowy_collaborate::collab::storage::CollabAccessControlStorage;
use database::collab::upsert_collab_member_with_txn;
use database::file::bucket_s3_impl::BucketClientS3Impl;
use database::file::BucketStorage;
@ -34,7 +35,6 @@ use shared_entity::dto::workspace_dto::{
use shared_entity::response::AppResponseError;
use workspace_template::document::get_started::GetStartedDocumentTemplate;
use crate::biz::collab::storage::CollabAccessControlStorage;
use crate::biz::user::user_init::initialize_workspace_for_user;
use crate::mailer::{Mailer, WorkspaceInviteMailerParam};
use crate::state::GoTrueAdmin;

View File

@ -13,6 +13,8 @@ use access_control::metrics::AccessControlMetrics;
use app_error::AppError;
use appflowy_collaborate::collab::access_control::CollabAccessControlImpl;
use appflowy_collaborate::collab::cache::CollabCache;
use appflowy_collaborate::collab::storage::CollabAccessControlStorage;
use appflowy_collaborate::metrics::CollabMetrics;
use appflowy_collaborate::shared_state::RealtimeSharedState;
use appflowy_collaborate::CollabRealtimeMetrics;
use database::file::bucket_s3_impl::S3BucketStorage;
@ -22,8 +24,6 @@ use snowflake::Snowflake;
use workspace_access::WorkspaceAccessControlImpl;
use crate::api::metrics::RequestMetrics;
use crate::biz::collab::metrics::CollabMetrics;
use crate::biz::collab::storage::CollabAccessControlStorage;
use crate::biz::pg_listener::PgListeners;
use crate::config::config::Config;
use crate::mailer::Mailer;

View File

@ -1,8 +1,8 @@
use crate::collab::util::{generate_random_bytes, redis_connection_manager};
use crate::sql_test::util::{setup_db, test_create_user};
use appflowy_cloud::biz::collab::queue::StorageQueue;
use appflowy_cloud::biz::collab::WritePriority;
use appflowy_collaborate::collab::cache::CollabCache;
use appflowy_collaborate::collab::queue::StorageQueue;
use appflowy_collaborate::collab::WritePriority;
use client_api_test::setup_log;
use collab::entity::EncodedCollab;
use collab_entity::CollabType;