From 854215705cdebdd97dd326a9bd0cc81788427aca Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Mon, 14 Oct 2024 08:41:23 +0200 Subject: [PATCH] chore: code cleanup --- Cargo.toml | 150 +++++++++--------- libs/collab-stream/src/client.rs | 5 +- libs/collab-stream/src/lease.rs | 7 +- .../appflowy-collaborate/src/application.rs | 2 - .../appflowy-collaborate/src/group/cmd.rs | 9 +- .../src/group/group_init.rs | 40 +++-- .../appflowy-collaborate/src/group/manager.rs | 20 +-- .../appflowy-collaborate/src/rt_server.rs | 4 - src/application.rs | 2 - 9 files changed, 108 insertions(+), 131 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ec1a4834..4c90f48d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,9 +9,9 @@ edition = "2021" actix.workspace = true actix-web.workspace = true actix-http = { workspace = true, default-features = false, features = [ - "openssl", - "compress-brotli", - "compress-gzip", + "openssl", + "compress-brotli", + "compress-gzip", ] } actix-rt = "2.9.0" actix-web-actors = { version = "4.3" } @@ -28,20 +28,20 @@ serde_repr.workspace = true serde.workspace = true tokio = { workspace = true, features = [ - "macros", - "rt-multi-thread", - "sync", - "fs", - "time", - "full", + "macros", + "rt-multi-thread", + "sync", + "fs", + "time", + "full", ] } tokio-stream.workspace = true tokio-util = { version = "0.7.10", features = ["io"] } futures-util = { workspace = true, features = ["std", "io"] } once_cell = "1.19.0" chrono = { version = "0.4.37", features = [ - "serde", - "clock", + "serde", + "clock", ], default-features = false } derive_more = { version = "0.99" } secrecy.workspace = true @@ -49,10 +49,10 @@ rand = { version = "0.8", features = ["std_rng"] } anyhow = "1.0.79" thiserror = "1.0.56" reqwest = { workspace = true, features = [ - "json", - "rustls-tls", - "cookies", - "stream", + "json", + "rustls-tls", + "cookies", + "stream", ] } unicode-segmentation = "1.10" lazy_static.workspace = true @@ -62,31 +62,31 @@ bytes = "1.5.0" rcgen = { version = "0.10.0", features = ["pem", "x509-parser"] } mime = "0.3.17" aws-sdk-s3 = { version = "1.36.0", features = [ - "behavior-version-latest", - "rt-tokio", + "behavior-version-latest", + "rt-tokio", ] } aws-config = { version = "1.5.1", features = ["behavior-version-latest"] } redis = { workspace = true, features = [ - "json", - "tokio-comp", - "connection-manager", + "json", + "tokio-comp", + "connection-manager", ] } tracing = { version = "0.1.40", features = ["log"] } tracing-subscriber = { version = "0.3.18", features = [ - "registry", - "env-filter", - "ansi", - "json", - "tracing-log", + "registry", + "env-filter", + "ansi", + "json", + "tracing-log", ] } tracing-bunyan-formatter = "0.3.9" sqlx = { workspace = true, default-features = false, features = [ - "runtime-tokio-rustls", - "macros", - "postgres", - "uuid", - "chrono", - "migrate", + "runtime-tokio-rustls", + "macros", + "postgres", + "uuid", + "chrono", + "migrate", ] } async-trait.workspace = true prometheus-client.workspace = true @@ -129,10 +129,10 @@ infra = { path = "libs/infra" } authentication.workspace = true access-control.workspace = true app-error = { workspace = true, features = [ - "sqlx_error", - "actix_web_error", - "tokio_error", - "appflowy_ai_error", + "sqlx_error", + "actix_web_error", + "tokio_error", + "appflowy_ai_error", ] } shared-entity = { path = "libs/shared-entity", features = ["cloud"] } workspace-template = { workspace = true } @@ -164,11 +164,11 @@ assert-json-diff = "2.0.2" scraper = "0.17.1" client-api-test = { path = "libs/client-api-test", features = ["collab-sync"] } client-api = { path = "libs/client-api", features = [ - "collab-sync", - "test_util", - "sync_verbose_log", - "test_fast_sync", - "enable_brotli", + "collab-sync", + "test_util", + "sync_verbose_log", + "test_fast_sync", + "enable_brotli", ] } opener = "0.6.1" image = "0.23.14" @@ -190,38 +190,38 @@ path = "src/lib.rs" [workspace] members = [ - # libs - "libs/snowflake", - "libs/collab-rt-entity", - "libs/database", - "libs/database-entity", - "libs/client-api", - "libs/infra", - "libs/shared-entity", - "libs/gotrue", - "libs/gotrue-entity", - "admin_frontend", - "libs/app-error", - "libs/workspace-template", - "libs/encrypt", - "libs/authentication", - "libs/access-control", - "libs/collab-rt-protocol", - "libs/collab-stream", - "libs/client-websocket", - "libs/client-api-test", - "libs/wasm-test", - "libs/client-api-wasm", - "libs/appflowy-ai-client", - "libs/client-api-entity", - # services - "services/appflowy-history", - "services/appflowy-collaborate", - "services/appflowy-worker", - # xtask - "xtask", - "libs/tonic-proto", - "libs/mailer", + # libs + "libs/snowflake", + "libs/collab-rt-entity", + "libs/database", + "libs/database-entity", + "libs/client-api", + "libs/infra", + "libs/shared-entity", + "libs/gotrue", + "libs/gotrue-entity", + "admin_frontend", + "libs/app-error", + "libs/workspace-template", + "libs/encrypt", + "libs/authentication", + "libs/access-control", + "libs/collab-rt-protocol", + "libs/collab-stream", + "libs/client-websocket", + "libs/client-api-test", + "libs/wasm-test", + "libs/client-api-wasm", + "libs/appflowy-ai-client", + "libs/client-api-entity", + # services + "services/appflowy-history", + "services/appflowy-collaborate", + "services/appflowy-worker", + # xtask + "xtask", + "libs/tonic-proto", + "libs/mailer", ] [workspace.dependencies] @@ -250,9 +250,9 @@ uuid = { version = "1.6.1", features = ["v4", "v5"] } anyhow = "1.0.79" actix = "0.13.3" actix-web = { version = "4.5.1", default-features = false, features = [ - "openssl", - "compress-brotli", - "compress-gzip", + "openssl", + "compress-brotli", + "compress-gzip", ] } actix-http = { version = "3.6.0", default-features = false } tokio = { version = "1.36.0", features = ["sync"] } diff --git a/libs/collab-stream/src/client.rs b/libs/collab-stream/src/client.rs index 6c5a7c2c..eba44cca 100644 --- a/libs/collab-stream/src/client.rs +++ b/libs/collab-stream/src/client.rs @@ -3,13 +3,12 @@ use crate::error::StreamError; use crate::lease::{Lease, LeaseAcquisition}; use crate::model::{ AwarenessStreamUpdate, AwarenessStreamUpdateBatch, CollabStreamUpdate, CollabStreamUpdateBatch, - CollabUpdateEvent, MessageId, + MessageId, }; -use crate::pubsub::{CollabStreamPub, CollabStreamSub}; use crate::stream_group::{StreamConfig, StreamGroup}; use futures::Stream; use redis::aio::ConnectionManager; -use redis::streams::{StreamRangeReply, StreamReadOptions, StreamReadReply}; +use redis::streams::{StreamRangeReply, StreamReadOptions}; use redis::{AsyncCommands, FromRedisValue}; use std::time::Duration; use tracing::error; diff --git a/libs/collab-stream/src/lease.rs b/libs/collab-stream/src/lease.rs index 9647dc60..d3e21043 100644 --- a/libs/collab-stream/src/lease.rs +++ b/libs/collab-stream/src/lease.rs @@ -1,11 +1,8 @@ -use crate::client::CollabRedisStream; use crate::error::StreamError; use async_trait::async_trait; -use chrono::{DateTime, NaiveDateTime}; use redis::aio::ConnectionManager; -use redis::{RedisResult, Value}; -use std::sync::Arc; -use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; +use redis::Value; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; const RELEASE_SCRIPT: &str = r#" if redis.call("GET", KEYS[1]) == ARGV[1] then diff --git a/services/appflowy-collaborate/src/application.rs b/services/appflowy-collaborate/src/application.rs index f5466983..0b042517 100644 --- a/services/appflowy-collaborate/src/application.rs +++ b/services/appflowy-collaborate/src/application.rs @@ -77,8 +77,6 @@ pub async fn run_actix_server( rt_cmd_recv, state.redis_connection_manager.clone(), Duration::from_secs(config.collab.group_persistence_interval_secs), - config.collab.edit_state_max_count, - config.collab.edit_state_max_secs, state.indexer_provider.clone(), ) .await diff --git a/services/appflowy-collaborate/src/group/cmd.rs b/services/appflowy-collaborate/src/group/cmd.rs index 176281a8..c87abc34 100644 --- a/services/appflowy-collaborate/src/group/cmd.rs +++ b/services/appflowy-collaborate/src/group/cmd.rs @@ -2,7 +2,6 @@ use async_stream::stream; use collab::core::origin::CollabOrigin; use collab::entity::EncodedCollab; use dashmap::DashMap; -use futures::channel::mpsc::{SendError, Sender}; use futures::Sink; use futures_util::StreamExt; use std::collections::HashMap; @@ -333,22 +332,22 @@ impl Sink for NullSender { type Error = RealtimeError; #[inline] - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } #[inline] - fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { + fn start_send(self: Pin<&mut Self>, _: T) -> Result<(), Self::Error> { Ok(()) } #[inline] - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } #[inline] - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } } diff --git a/services/appflowy-collaborate/src/group/group_init.rs b/services/appflowy-collaborate/src/group/group_init.rs index 5ccfc57d..d95bcf23 100644 --- a/services/appflowy-collaborate/src/group/group_init.rs +++ b/services/appflowy-collaborate/src/group/group_init.rs @@ -1,11 +1,9 @@ use crate::error::RealtimeError; -use crate::indexer::{Indexer, IndexerProvider}; +use crate::indexer::Indexer; use crate::metrics::CollabRealtimeMetrics; -use crate::state::RedisConnectionManager; use anyhow::anyhow; use app_error::AppError; -use arc_swap::{ArcSwap, ArcSwapAny, ArcSwapOption}; -use bytes::Bytes; +use arc_swap::ArcSwap; use collab::core::collab::DataSource; use collab::core::origin::CollabOrigin; use collab::entity::EncodedCollab; @@ -17,7 +15,7 @@ use collab_rt_entity::{ AckCode, AwarenessSync, BroadcastSync, CollabAck, MessageByObjectId, MsgId, }; use collab_rt_entity::{ClientCollabMessage, CollabMessage}; -use collab_rt_protocol::{decode_update, Message, MessageReader, RTProtocolError, SyncMessage}; +use collab_rt_protocol::{Message, MessageReader, RTProtocolError, SyncMessage}; use collab_stream::client::CollabRedisStream; use collab_stream::collab_update_sink::{AwarenessUpdateSink, CollabUpdateSink}; use collab_stream::error::StreamError; @@ -29,24 +27,19 @@ use collab_stream::stream_group::StreamGroup; use dashmap::DashMap; use database::collab::{CollabStorage, GetCollabOrigin}; use database_entity::dto::{ - AFCollabEmbeddings, CollabParams, InsertSnapshotParams, QueryCollabParams, SnapshotData, + AFCollabEmbeddings, CollabParams, InsertSnapshotParams, QueryCollabParams, }; use futures::{pin_mut, Sink, Stream}; use futures_util::{SinkExt, StreamExt}; use std::collections::VecDeque; -use std::fmt::{Display, Formatter}; -use std::os::linux::raw::stat; -use std::pin::Pin; -use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, Ordering}; +use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; -use tokio::sync::{mpsc, Mutex}; -use tokio::time::{interval, MissedTickBehavior}; -use tokio_util::sync::{CancellationToken, DropGuard}; -use tracing::{error, event, info, trace}; -use yrs::encoding::read::Error; -use yrs::sync::AwarenessUpdate; -use yrs::updates::decoder::{Decode, DecoderV1, DecoderV2}; +use tokio::sync::mpsc; +use tokio::time::MissedTickBehavior; +use tokio_util::sync::CancellationToken; +use tracing::{error, info, trace}; +use yrs::updates::decoder::{Decode, DecoderV1}; use yrs::updates::encoder::{Encode, Encoder, EncoderV1}; use yrs::{ReadTxn, StateVector, Update}; @@ -100,6 +93,7 @@ impl CollabGroup { S: CollabStorage, { let persister = CollabPersister::new( + uid, workspace_id.clone(), object_id.clone(), collab_type.clone(), @@ -174,7 +168,7 @@ impl CollabGroup { /// Task used to receive collab updates from Redis. async fn inbound_task(state: Arc) -> Result<(), RealtimeError> { - let mut updates = state.persister.collab_redis_stream.collab_updates( + let updates = state.persister.collab_redis_stream.collab_updates( &state.workspace_id, &state.object_id, None, @@ -233,7 +227,7 @@ impl CollabGroup { /// Task used to receive awareness updates from Redis. async fn inbound_awareness_task(state: Arc) -> Result<(), RealtimeError> { - let mut updates = state.persister.collab_redis_stream.awareness_updates( + let updates = state.persister.collab_redis_stream.awareness_updates( &state.workspace_id, &state.object_id, None, @@ -852,6 +846,7 @@ impl Drop for Subscription { } struct CollabPersister { + uid: i64, workspace_id: String, object_id: String, collab_type: CollabType, @@ -868,6 +863,7 @@ impl CollabPersister { pub const GRACE_PERIOD_MS: u64 = 1000 * 60; // 5min pub fn new( + uid: i64, workspace_id: String, object_id: String, collab_type: CollabType, @@ -878,6 +874,7 @@ impl CollabPersister { let update_sink = collab_redis_stream.collab_update_sink(&workspace_id, &object_id); let awareness_sink = collab_redis_stream.awareness_update_sink(&workspace_id, &object_id); Self { + uid, workspace_id, object_id, collab_type, @@ -936,7 +933,7 @@ impl CollabPersister { // 2. consume all Redis updates on top of it (keep redis msg id) let mut last_message_id = None; let mut tx = collab.transact_mut(); - let mut stream = self.collab_redis_stream.collab_updates( + let stream = self.collab_redis_stream.collab_updates( &self.workspace_id, &self.object_id, None, //TODO: store Redis last msg id somewhere in doc state snapshot and replay from there @@ -1038,10 +1035,9 @@ impl CollabPersister { Err(err) => tracing::warn!("failed to fetch embeddings `{}`: {}", self.object_id, err), } - let uid = 0; //FIXME: what UID should go there? self .storage - .insert_or_update_collab(&self.workspace_id, &uid, params, true) + .queue_insert_or_update_collab(&self.workspace_id, &self.uid, params, true) .await .map_err(|err| RealtimeError::Internal(err.into()))?; diff --git a/services/appflowy-collaborate/src/group/manager.rs b/services/appflowy-collaborate/src/group/manager.rs index 02a2eb68..fc8b7143 100644 --- a/services/appflowy-collaborate/src/group/manager.rs +++ b/services/appflowy-collaborate/src/group/manager.rs @@ -4,7 +4,7 @@ use std::time::Duration; use collab::core::collab::DataSource; use collab::core::origin::CollabOrigin; use collab::entity::EncodedCollab; -use collab::lock::{Mutex, RwLock}; +use collab::lock::Mutex; use collab::preclude::Collab; use collab_entity::CollabType; use tracing::{error, instrument, trace}; @@ -34,8 +34,6 @@ pub struct GroupManager { collab_redis_stream: Arc, control_event_stream: Arc>, persistence_interval: Duration, - edit_state_max_count: u32, - edit_state_max_secs: i64, indexer_provider: Arc, } @@ -50,8 +48,6 @@ where metrics_calculate: Arc, collab_stream: CollabRedisStream, persistence_interval: Duration, - edit_state_max_count: u32, - edit_state_max_secs: i64, indexer_provider: Arc, ) -> Result { let collab_stream = Arc::new(collab_stream); @@ -68,8 +64,6 @@ where collab_redis_stream: collab_stream, control_event_stream, persistence_interval, - edit_state_max_count, - edit_state_max_secs, indexer_provider, }) } @@ -107,17 +101,18 @@ where client_msg_router: &mut ClientMessageRouter, ) -> Result<(), RealtimeError> { // Lock the group and subscribe the user to the group. - if let Some(group) = self.state.get_mut_group(object_id).await { + if let Some(mut e) = self.state.get_mut_group(object_id).await { + let group = e.value_mut(); trace!("[realtime]: {} subscribe group:{}", user, object_id,); let (sink, stream) = client_msg_router.init_client_communication::( - &group.workspace_id, + group.workspace_id(), user, object_id, self.access_control.clone(), ); group.subscribe(user, message_origin.clone(), sink, stream); // explicitly drop the group to release the lock. - drop(group); + drop(e); self.state.insert_user(user, object_id).await?; } else { @@ -160,7 +155,7 @@ where } let result = load_collab(user.uid, object_id, params, self.storage.clone()).await; - let (collab, encode_collab) = { + let encode_collab = { let (mut collab, encode_collab) = match result { Ok(value) => value, Err(err) => { @@ -178,8 +173,7 @@ where }; collab.initialize(); - let collab = Arc::new(RwLock::from(collab)); - (collab, encode_collab) + encode_collab }; let cloned_control_event_stream = self.control_event_stream.clone(); diff --git a/services/appflowy-collaborate/src/rt_server.rs b/services/appflowy-collaborate/src/rt_server.rs index c9640d41..5d52efd9 100644 --- a/services/appflowy-collaborate/src/rt_server.rs +++ b/services/appflowy-collaborate/src/rt_server.rs @@ -53,8 +53,6 @@ where command_recv: CLCommandReceiver, redis_connection_manager: RedisConnectionManager, group_persistence_interval: Duration, - edit_state_max_count: u32, - edit_state_max_secs: i64, indexer_provider: Arc, ) -> Result { let enable_custom_runtime = get_env_var("APPFLOWY_COLLABORATE_MULTI_THREAD", "false") @@ -76,8 +74,6 @@ where metrics.clone(), collab_stream, group_persistence_interval, - edit_state_max_count, - edit_state_max_secs, indexer_provider.clone(), ) .await?, diff --git a/src/application.rs b/src/application.rs index 9b1fba54..ebe9ad0b 100644 --- a/src/application.rs +++ b/src/application.rs @@ -135,8 +135,6 @@ pub async fn run_actix_server( rt_cmd_recv, state.redis_connection_manager.clone(), Duration::from_secs(config.collab.group_persistence_interval_secs), - config.collab.edit_state_max_count, - config.collab.edit_state_max_secs, state.indexer_provider.clone(), ) .await