chore: added locks with timeouts (#765)

* chore: added locks with timeouts

* chore: roll back collab locks in client api
This commit is contained in:
Bartosz Sypytkowski 2024-08-29 10:13:27 +02:00 committed by GitHub
parent 3b79ac5cca
commit 2af1999375
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 166 additions and 152 deletions

12
Cargo.lock generated
View File

@ -2086,7 +2086,7 @@ dependencies = [
[[package]]
name = "collab"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=af8ffdab672e4eccfc426525dcf8daa42cbe7087#af8ffdab672e4eccfc426525dcf8daa42cbe7087"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=b864751dc54c6873d72b368e5cff5e72d9f54da4#b864751dc54c6873d72b368e5cff5e72d9f54da4"
dependencies = [
"anyhow",
"arc-swap",
@ -2111,7 +2111,7 @@ dependencies = [
[[package]]
name = "collab-database"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=af8ffdab672e4eccfc426525dcf8daa42cbe7087#af8ffdab672e4eccfc426525dcf8daa42cbe7087"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=b864751dc54c6873d72b368e5cff5e72d9f54da4#b864751dc54c6873d72b368e5cff5e72d9f54da4"
dependencies = [
"anyhow",
"async-trait",
@ -2140,7 +2140,7 @@ dependencies = [
[[package]]
name = "collab-document"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=af8ffdab672e4eccfc426525dcf8daa42cbe7087#af8ffdab672e4eccfc426525dcf8daa42cbe7087"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=b864751dc54c6873d72b368e5cff5e72d9f54da4#b864751dc54c6873d72b368e5cff5e72d9f54da4"
dependencies = [
"anyhow",
"arc-swap",
@ -2160,7 +2160,7 @@ dependencies = [
[[package]]
name = "collab-entity"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=af8ffdab672e4eccfc426525dcf8daa42cbe7087#af8ffdab672e4eccfc426525dcf8daa42cbe7087"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=b864751dc54c6873d72b368e5cff5e72d9f54da4#b864751dc54c6873d72b368e5cff5e72d9f54da4"
dependencies = [
"anyhow",
"bytes",
@ -2179,7 +2179,7 @@ dependencies = [
[[package]]
name = "collab-folder"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=af8ffdab672e4eccfc426525dcf8daa42cbe7087#af8ffdab672e4eccfc426525dcf8daa42cbe7087"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=b864751dc54c6873d72b368e5cff5e72d9f54da4#b864751dc54c6873d72b368e5cff5e72d9f54da4"
dependencies = [
"anyhow",
"arc-swap",
@ -2263,7 +2263,7 @@ dependencies = [
[[package]]
name = "collab-user"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=af8ffdab672e4eccfc426525dcf8daa42cbe7087#af8ffdab672e4eccfc426525dcf8daa42cbe7087"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=b864751dc54c6873d72b368e5cff5e72d9f54da4#b864751dc54c6873d72b368e5cff5e72d9f54da4"
dependencies = [
"anyhow",
"collab",

View File

@ -285,12 +285,12 @@ debug = true
[patch.crates-io]
# It's diffcult to resovle different version with the same crate used in AppFlowy Frontend and the Client-API crate.
# So using patch to workaround this issue.
collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "af8ffdab672e4eccfc426525dcf8daa42cbe7087" }
collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "af8ffdab672e4eccfc426525dcf8daa42cbe7087" }
collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "af8ffdab672e4eccfc426525dcf8daa42cbe7087" }
collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "af8ffdab672e4eccfc426525dcf8daa42cbe7087" }
collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "af8ffdab672e4eccfc426525dcf8daa42cbe7087" }
collab-database = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "af8ffdab672e4eccfc426525dcf8daa42cbe7087" }
collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "b864751dc54c6873d72b368e5cff5e72d9f54da4" }
collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "b864751dc54c6873d72b368e5cff5e72d9f54da4" }
collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "b864751dc54c6873d72b368e5cff5e72d9f54da4" }
collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "b864751dc54c6873d72b368e5cff5e72d9f54da4" }
collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "b864751dc54c6873d72b368e5cff5e72d9f54da4" }
collab-database = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "b864751dc54c6873d72b368e5cff5e72d9f54da4" }
[features]
history = []

View File

@ -13,6 +13,7 @@ use collab::core::collab::DataSource;
use collab::core::collab_state::SyncState;
use collab::core::origin::{CollabClient, CollabOrigin};
use collab::entity::EncodedCollab;
use collab::lock::{Mutex, RwLock};
use collab::preclude::{Collab, Prelim};
use collab_entity::CollabType;
use collab_folder::Folder;
@ -20,7 +21,6 @@ use collab_user::core::UserAwareness;
use mime::Mime;
use serde::Deserialize;
use serde_json::{json, Value};
use tokio::sync::{Mutex, RwLock};
use tokio::time::{sleep, timeout, Duration};
use tokio_stream::StreamExt;
use tracing::trace;
@ -583,7 +583,7 @@ impl TestClient {
.await
.unwrap();
let collab = Arc::new(RwLock::new(collab)) as CollabRef;
let collab = Arc::new(RwLock::from(collab)) as CollabRef;
#[cfg(feature = "collab-sync")]
{
let handler = self
@ -655,7 +655,7 @@ impl TestClient {
)
.unwrap();
collab.emit_awareness_state();
let collab = Arc::new(RwLock::new(collab)) as CollabRef;
let collab = Arc::new(RwLock::from(collab)) as CollabRef;
#[cfg(feature = "collab-sync")]
{
@ -818,7 +818,7 @@ pub async fn assert_server_collab(
let duration = Duration::from_secs(timeout_secs);
let collab_type = collab_type.clone();
let object_id = object_id.to_string();
let final_json = Arc::new(Mutex::new(json!({})));
let final_json = Arc::new(Mutex::from(json!({})));
// Use tokio::time::timeout to apply a timeout to the entire operation
let cloned_final_json = final_json.clone();

View File

@ -1,21 +1,24 @@
use crate::af_spawn;
use crate::collab_sync::collab_stream::SeqNumCounter;
use crate::collab_sync::{SinkConfig, SyncError, SyncObject};
use anyhow::Error;
use collab::core::origin::{CollabClient, CollabOrigin};
use collab_rt_entity::{ClientCollabMessage, MsgId, ServerCollabMessage, SinkMessage};
use futures_util::SinkExt;
use std::collections::BinaryHeap;
use std::collections::{HashMap, HashSet};
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};
use tokio::sync::{broadcast, watch, Mutex};
use anyhow::Error;
use collab::core::origin::{CollabClient, CollabOrigin};
use collab::lock::Mutex;
use futures_util::SinkExt;
use tokio::sync::{broadcast, watch};
use tokio::time::{interval, sleep};
use tracing::{error, trace, warn};
use collab_rt_entity::{ClientCollabMessage, MsgId, ServerCollabMessage, SinkMessage};
use crate::af_spawn;
use crate::collab_sync::collab_stream::SeqNumCounter;
use crate::collab_sync::{SinkConfig, SyncError, SyncObject};
pub(crate) const SEND_INTERVAL: Duration = Duration::from_secs(8);
pub const COLLAB_SINK_DELAY_MILLIS: u64 = 500;
@ -63,7 +66,7 @@ where
config: SinkConfig,
) -> Self {
let notifier = Arc::new(notifier);
let sender = Arc::new(Mutex::new(sink));
let sender = Arc::new(Mutex::from(sink));
let message_queue = Arc::new(parking_lot::Mutex::new(SinkQueue::new()));
let sending_messages = Arc::new(parking_lot::Mutex::new(HashSet::new()));
let state = Arc::new(CollabSinkState::new());
@ -513,7 +516,7 @@ impl SyncTimestamp {
fn new() -> Self {
let now = Instant::now();
SyncTimestamp {
last_sync: Mutex::new(now.checked_sub(Duration::from_secs(60)).unwrap_or(now)),
last_sync: Mutex::from(now.checked_sub(Duration::from_secs(60)).unwrap_or(now)),
}
}

View File

@ -6,10 +6,10 @@ use std::time::Duration;
use arc_swap::ArcSwap;
use collab::core::origin::CollabOrigin;
use collab::lock::RwLock;
use collab::preclude::Collab;
use futures_util::{SinkExt, StreamExt};
use tokio::select;
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;
use tracing::{error, instrument, trace, warn};
use yrs::encoding::read::Cursor;

View File

@ -1,27 +1,29 @@
use futures_util::{SinkExt, StreamExt};
use parking_lot::RwLock;
use std::borrow::Cow;
use std::collections::HashMap;
use std::fmt::Display;
use futures_util::stream::{SplitSink, SplitStream};
use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION};
use semver::Version;
use std::sync::{Arc, Weak};
use std::time::Duration;
use tokio::sync::broadcast::{channel, Receiver, Sender};
use crate::ws::msg_queue::{AggregateMessageQueue, AggregateMessagesReceiver};
use crate::ws::{ConnectState, ConnectStateNotify, WSError, WebSocketChannel};
use crate::ServerFixIntervalPing;
use crate::{af_spawn, retry_connect};
use futures_util::stream::{SplitSink, SplitStream};
use futures_util::{SinkExt, StreamExt};
use parking_lot::RwLock;
use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION};
use semver::Version;
use tokio::sync::broadcast::{channel, Receiver, Sender};
use tokio::sync::oneshot;
use tokio::sync::Mutex;
use tracing::{error, info, trace, warn};
use client_websocket::{CloseCode, CloseFrame, Message, WebSocketStream};
use collab_rt_entity::user::UserMessage;
use collab_rt_entity::ClientCollabMessage;
use collab_rt_entity::ServerCollabMessage;
use collab_rt_entity::{RealtimeMessage, SystemMessage};
use tokio::sync::{oneshot, Mutex};
use tracing::{error, info, trace, warn};
use crate::ws::msg_queue::{AggregateMessageQueue, AggregateMessagesReceiver};
use crate::ws::{ConnectState, ConnectStateNotify, WSError, WebSocketChannel};
use crate::ServerFixIntervalPing;
use crate::{af_spawn, retry_connect};
pub struct WSClientConfig {
/// specifies the number of messages that the channel can hold at any given
@ -91,7 +93,7 @@ impl WSClient {
let (ws_msg_sender, _) = channel(config.buffer_capacity);
let state_notify = Arc::new(parking_lot::Mutex::new(ConnectStateNotify::new()));
let channels = Arc::new(RwLock::new(HashMap::new()));
let ping = Arc::new(Mutex::new(None));
let ping = Arc::new(Mutex::from(None));
let http_sender = Arc::new(http_sender);
let (user_channel, _) = channel(1);
let (rt_msg_sender, _) = channel(config.buffer_capacity);
@ -106,7 +108,7 @@ impl WSClient {
user_channel: Arc::new(user_channel),
channels,
ping,
stop_ws_msg_loop_tx: Mutex::new(None),
stop_ws_msg_loop_tx: Mutex::from(None),
aggregate_queue,
#[cfg(debug_assertions)]

View File

@ -1,13 +1,16 @@
use client_websocket::Message;
use collab_rt_entity::RealtimeMessage;
use collab_rt_entity::{ClientCollabMessage, MsgId};
use std::collections::{BinaryHeap, HashMap, HashSet};
use std::sync::{Arc, Weak};
use std::time::Duration;
use tokio::sync::{mpsc, Mutex};
use tokio::sync::mpsc;
use tokio::sync::Mutex;
use tokio::time::{sleep_until, Instant};
use tracing::{error, trace};
use client_websocket::Message;
use collab_rt_entity::RealtimeMessage;
use collab_rt_entity::{ClientCollabMessage, MsgId};
pub type AggregateMessagesSender = mpsc::Sender<Message>;
pub type AggregateMessagesReceiver = mpsc::Receiver<Message>;

View File

@ -4,8 +4,8 @@ use std::sync::Arc;
use collab::core::awareness::{Awareness, AwarenessUpdate};
use collab::core::collab::{TransactionExt, TransactionMutExt};
use collab::core::origin::CollabOrigin;
use collab::lock::RwLock;
use collab::preclude::Collab;
use tokio::sync::RwLock;
use yrs::updates::decoder::Decode;
use yrs::updates::encoder::{Encode, Encoder};
use yrs::{ReadTxn, StateVector, Transact, Update};

View File

@ -1,10 +1,10 @@
use std::collections::HashMap;
use std::sync::Arc;
use anyhow::Error;
use async_trait::async_trait;
use collab::core::origin::CollabOrigin;
use collab::lock::RwLock;
use collab::preclude::Collab;
use collab_database::database::{timestamp, DatabaseData};
use collab_database::entity::CreateDatabaseParams;
@ -13,7 +13,6 @@ use collab_document::document::Document;
use collab_entity::CollabType;
use collab_folder::ViewLayout;
use serde_json::Value;
use tokio::sync::RwLock;
use crate::database::database_collab::create_database_collab;
use crate::document::parser::JsonToDocumentParser;

View File

@ -5,12 +5,12 @@ pub use anyhow::Result;
use async_trait::async_trait;
use collab::core::origin::CollabOrigin;
use collab::entity::EncodedCollab;
use collab::lock::RwLock;
use collab::preclude::Collab;
use collab_entity::CollabType;
use collab_folder::{
timestamp, Folder, FolderData, RepeatedViewIdentifier, ViewIdentifier, ViewLayout, Workspace,
};
use tokio::sync::RwLock;
use crate::hierarchy_builder::{FlattedViews, WorkspaceViewBuilder};
@ -91,7 +91,7 @@ impl WorkspaceTemplateBuilder {
}
pub async fn build(&self) -> Result<Vec<TemplateData>> {
let workspace_view_builder = Arc::new(RwLock::new(WorkspaceViewBuilder::new(
let workspace_view_builder = Arc::new(RwLock::from(WorkspaceViewBuilder::new(
self.workspace_id.clone(),
self.uid,
)));

View File

@ -1,22 +1,25 @@
use crate::document::getting_started::*;
use crate::TemplateData;
use crate::TemplateObjectId;
use crate::{hierarchy_builder::WorkspaceViewBuilder, WorkspaceTemplate};
use std::collections::HashMap;
use std::sync::Arc;
use collab::lock::RwLock;
use collab::preclude::uuid_v4;
use collab_database::database::DatabaseData;
use collab_database::entity::CreateDatabaseParams;
use collab_document::document_data::generate_id;
use collab_entity::CollabType;
use serde_json::json;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use crate::document::getting_started::*;
use crate::TemplateData;
use crate::TemplateObjectId;
use crate::{hierarchy_builder::WorkspaceViewBuilder, WorkspaceTemplate};
#[cfg(test)]
mod tests {
use super::*;
use collab_database::database::gen_database_view_id;
use super::*;
#[tokio::test]
async fn create_document_from_desktop_guide_json_test() {
let json_str = include_str!("../../assets/desktop_guide.json");

View File

@ -5,17 +5,17 @@ use std::sync::Arc;
use std::time::Duration;
use anyhow::{anyhow, Context};
use collab::lock::Mutex;
use collab_entity::CollabType;
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use tokio::sync::Mutex;
use tokio::time::{interval, sleep, sleep_until, Instant};
use tracing::{error, instrument, trace, warn};
use crate::collab::cache::CollabCache;
use app_error::AppError;
use database_entity::dto::{AFCollabEmbeddings, CollabParams, QueryCollab, QueryCollabResult};
use crate::collab::cache::CollabCache;
use crate::collab::queue_redis_ops::{
get_pending_meta, remove_pending_meta, storage_cache_key, PendingWrite, WritePriority,
PENDING_WRITE_META_EXPIRE_SECS,
@ -52,7 +52,7 @@ impl StorageQueue {
queue_name: &str,
metrics: Option<Arc<CollabMetrics>>,
) -> Self {
let next_duration = Arc::new(Mutex::new(Duration::from_secs(1)));
let next_duration = Arc::new(Mutex::from(Duration::from_secs(1)));
let pending_id_counter = Arc::new(AtomicI64::new(0));
let pending_write_set = Arc::new(RedisSortedSet::new(connection_manager.clone(), queue_name));

View File

@ -5,7 +5,18 @@ use std::sync::{Arc, Weak};
use anyhow::anyhow;
use bytes::Bytes;
use collab::core::origin::CollabOrigin;
use collab::lock::RwLock;
use collab::preclude::Collab;
use futures_util::{SinkExt, StreamExt};
use tokio::select;
use tokio::sync::broadcast::{channel, Sender};
use tokio::time::Instant;
use tracing::{error, trace, warn};
use yrs::encoding::write::Write;
use yrs::updates::decoder::DecoderV1;
use yrs::updates::encoder::{Encode, Encoder, EncoderV1};
use yrs::Subscription as YrsSubscription;
use collab_rt_entity::user::RealtimeUser;
use collab_rt_entity::MessageByObjectId;
use collab_rt_entity::{AckCode, MsgId};
@ -14,16 +25,6 @@ use collab_rt_entity::{
};
use collab_rt_protocol::{handle_message_follow_protocol, RTProtocolError, SyncMessage};
use collab_rt_protocol::{Message, MessageReader, MSG_SYNC, MSG_SYNC_UPDATE};
use futures_util::{SinkExt, StreamExt};
use tokio::select;
use tokio::sync::broadcast::{channel, Sender};
use tokio::sync::RwLock;
use tokio::time::Instant;
use tracing::{error, trace, warn};
use yrs::encoding::write::Write;
use yrs::updates::decoder::DecoderV1;
use yrs::updates::encoder::{Encode, Encoder, EncoderV1};
use yrs::Subscription as YrsSubscription;
use crate::error::RealtimeError;
use crate::group::group_init::EditState;

View File

@ -6,11 +6,12 @@ use std::time::Duration;
use collab::core::origin::CollabOrigin;
use collab::entity::EncodedCollab;
use collab::lock::RwLock;
use collab::preclude::Collab;
use collab_entity::CollabType;
use dashmap::DashMap;
use futures_util::{SinkExt, StreamExt};
use tokio::sync::{mpsc, RwLock};
use tokio::sync::mpsc;
use tracing::{error, event, info, trace};
use yrs::updates::decoder::Decode;
use yrs::updates::encoder::Encode;

View File

@ -4,9 +4,9 @@ use std::time::Duration;
use collab::core::collab::DataSource;
use collab::core::origin::CollabOrigin;
use collab::entity::EncodedCollab;
use collab::lock::{Mutex, RwLock};
use collab::preclude::Collab;
use collab_entity::CollabType;
use tokio::sync::{Mutex, RwLock};
use tracing::{error, instrument, trace};
use access_control::collab::RealtimeAccessControl;
@ -22,7 +22,6 @@ use database_entity::dto::QueryCollabParams;
use crate::client::client_msg_router::ClientMessageRouter;
use crate::error::{CreateGroupFailedReason, RealtimeError};
use crate::group::group_init::CollabGroup;
use crate::group::state::GroupManagementState;
use crate::indexer::IndexerProvider;
use crate::metrics::CollabMetricsCalculate;
@ -61,7 +60,7 @@ where
.collab_control_stream(CONTROL_STREAM_KEY, "collaboration")
.await
.map_err(|err| RealtimeError::Internal(err.into()))?;
let control_event_stream = Arc::new(Mutex::new(control_event_stream));
let control_event_stream = Arc::new(Mutex::from(control_event_stream));
Ok(Self {
state: GroupManagementState::new(metrics_calculate.clone()),
storage,
@ -195,7 +194,7 @@ where
};
collab.initialize();
let collab = Arc::new(RwLock::new(collab));
let collab = Arc::new(RwLock::from(collab));
(collab, encode_collab)
};

View File

@ -2,9 +2,10 @@ use std::sync::{Arc, Weak};
use std::time::Duration;
use anyhow::anyhow;
use collab::lock::RwLock;
use collab::preclude::Collab;
use collab_entity::{validate_data_for_folder, CollabType};
use tokio::sync::{mpsc, RwLock};
use tokio::sync::mpsc;
use tokio::time::interval;
use tracing::{trace, warn};

View File

@ -1,9 +1,9 @@
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::{Arc, Weak};
use collab::lock::RwLock;
use collab::preclude::{Collab, CollabPlugin};
use collab_entity::CollabType;
use tokio::sync::RwLock;
use tokio::time::sleep;
use tracing::{error, trace};
use yrs::TransactionMut;

View File

@ -1,9 +1,12 @@
use crate::state::RedisConnectionManager;
use anyhow::anyhow;
use app_error::AppError;
use redis::AsyncCommands;
use std::sync::Arc;
use tokio::sync::Mutex;
use anyhow::anyhow;
use collab::lock::Mutex;
use redis::AsyncCommands;
use app_error::AppError;
use crate::state::RedisConnectionManager;
#[derive(Clone)]
pub(crate) struct SnapshotCache {

View File

@ -1,28 +1,29 @@
use crate::metrics::CollabMetrics;
use crate::snapshot::cache::SnapshotCache;
use crate::snapshot::queue::PendingQueue;
use crate::state::RedisConnectionManager;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use anyhow::anyhow;
use app_error::AppError;
use async_stream::stream;
use chrono::{DateTime, Utc};
use collab::lock::{Mutex, RwLock};
use futures_util::StreamExt;
use sqlx::PgPool;
use tokio::time::interval;
use tracing::{debug, error, trace, warn};
use validator::Validate;
use app_error::AppError;
use collab_rt_protocol::validate_encode_collab;
use database::collab::{
create_snapshot_and_maintain_limit, get_all_collab_snapshot_meta, latest_snapshot_time,
select_snapshot, AppResult, COLLAB_SNAPSHOT_LIMIT, SNAPSHOT_PER_HOUR,
};
use database_entity::dto::{AFSnapshotMeta, AFSnapshotMetas, InsertSnapshotParams, SnapshotData};
use futures_util::StreamExt;
use chrono::{DateTime, Utc};
use collab_rt_protocol::validate_encode_collab;
use sqlx::PgPool;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{Mutex, RwLock};
use tokio::time::interval;
use tracing::{debug, error, trace, warn};
use validator::Validate;
use crate::metrics::CollabMetrics;
use crate::snapshot::cache::SnapshotCache;
use crate::snapshot::queue::PendingQueue;
use crate::state::RedisConnectionManager;
pub type SnapshotCommandReceiver = tokio::sync::mpsc::Receiver<SnapshotCommand>;
pub type SnapshotCommandSender = tokio::sync::mpsc::Sender<SnapshotCommand>;
@ -48,7 +49,7 @@ impl SnapshotControl {
pg_pool: PgPool,
collab_metrics: Arc<CollabMetrics>,
) -> Self {
let redis_client = Arc::new(Mutex::new(redis_client));
let redis_client = Arc::new(Mutex::from(redis_client));
let (command_sender, rx) = tokio::sync::mpsc::channel(2000);
let cache = SnapshotCache::new(redis_client);
@ -199,7 +200,7 @@ impl SnapshotCommandRunner {
let queue = PendingQueue::new();
Self {
pg_pool,
queue: RwLock::new(queue),
queue: RwLock::from(queue),
cache,
recv: Some(recv),
success_attempts: Default::default(),

View File

@ -1,11 +1,11 @@
use std::sync::Arc;
use collab::lock::RwLock;
use collab::preclude::updates::encoder::{Encoder, EncoderV2};
use collab::preclude::{Collab, CollabPlugin, ReadTxn, Snapshot, StateVector, TransactionMut};
use collab_entity::CollabType;
use serde_json::Value;
use sqlx::PgPool;
use tokio::sync::RwLock;
use tracing::trace;
use database::history::ops::get_snapshot_meta_list;

View File

@ -2,10 +2,10 @@ use std::ops::Deref;
use std::sync::atomic::AtomicU32;
use std::sync::{Arc, Weak};
use collab::lock::{Mutex, RwLock};
use collab::preclude::updates::encoder::Encode;
use collab::preclude::{Collab, ReadTxn, Snapshot, StateVector};
use collab_entity::CollabType;
use tokio::sync::{Mutex, RwLock};
use tracing::{trace, warn};
use tonic_proto::history::SnapshotMetaPb;

View File

@ -4,10 +4,10 @@ use std::time::Duration;
use collab::core::collab::DataSource;
use collab::core::origin::CollabOrigin;
use collab::error::CollabError;
use collab::lock::RwLock;
use collab::preclude::updates::decoder::Decode;
use collab::preclude::{Collab, Update};
use collab_entity::CollabType;
use tokio::sync::RwLock;
use tokio::time::interval;
use tracing::{error, trace};

View File

@ -1,27 +1,8 @@
use crate::api::metrics::metrics_scope;
use std::net::TcpListener;
use std::sync::Arc;
use std::time::Duration;
use crate::api::file_storage::file_storage_scope;
use crate::api::template::template_scope;
use crate::api::user::user_scope;
use crate::api::workspace::{collab_scope, workspace_scope};
use crate::api::ws::ws_scope;
use crate::mailer::Mailer;
use access_control::access::{enable_access_control, AccessControl};
use gotrue::grant::{Grant, PasswordGrant};
use crate::api::chat::chat_scope;
use crate::api::history::history_scope;
use crate::biz::collab::access_control::CollabMiddlewareAccessControl;
use crate::biz::pg_listener::PgListeners;
use crate::biz::workspace::access_control::WorkspaceMiddlewareAccessControl;
use crate::config::config::{Config, DatabaseSetting, GoTrueSetting, S3Setting};
use crate::middleware::access_control_mw::MiddlewareAccessControlTransform;
use crate::middleware::metrics_mw::MetricsMiddleware;
use crate::middleware::request_id::RequestIdMiddleware;
use crate::self_signed::create_self_signed_certificate;
use crate::state::{AppMetrics, AppState, GoTrueAdmin, UserCache};
use actix::Supervisor;
use actix_identity::IdentityMiddleware;
use actix_session::storage::RedisSessionStore;
use actix_session::SessionMiddleware;
@ -29,6 +10,20 @@ use actix_web::cookie::Key;
use actix_web::middleware::NormalizePath;
use actix_web::{dev::Server, web::Data, App, HttpServer};
use anyhow::{Context, Error};
use aws_sdk_s3::config::{Credentials, Region, SharedCredentialsProvider};
use aws_sdk_s3::operation::create_bucket::CreateBucketError;
use aws_sdk_s3::types::{
BucketInfo, BucketLocationConstraint, BucketType, CreateBucketConfiguration,
};
use collab::lock::Mutex;
use openssl::ssl::{SslAcceptor, SslAcceptorBuilder, SslFiletype, SslMethod};
use openssl::x509::X509;
use secrecy::{ExposeSecret, Secret};
use sqlx::{postgres::PgPoolOptions, PgPool};
use tokio::sync::RwLock;
use tracing::{error, info, warn};
use access_control::access::{enable_access_control, AccessControl};
use appflowy_ai_client::client::AppFlowyAIClient;
use appflowy_collaborate::actix_ws::server::RealtimeServerActor;
use appflowy_collaborate::collab::access_control::{
@ -37,33 +32,36 @@ use appflowy_collaborate::collab::access_control::{
use appflowy_collaborate::collab::cache::CollabCache;
use appflowy_collaborate::collab::storage::CollabStorageImpl;
use appflowy_collaborate::command::{CLCommandReceiver, CLCommandSender};
use appflowy_collaborate::indexer::IndexerProvider;
use appflowy_collaborate::shared_state::RealtimeSharedState;
use appflowy_collaborate::snapshot::SnapshotControl;
use appflowy_collaborate::CollaborationServer;
use aws_sdk_s3::config::{Credentials, Region, SharedCredentialsProvider};
use aws_sdk_s3::operation::create_bucket::CreateBucketError;
use aws_sdk_s3::types::{
BucketInfo, BucketLocationConstraint, BucketType, CreateBucketConfiguration,
};
use openssl::ssl::{SslAcceptor, SslAcceptorBuilder, SslFiletype, SslMethod};
use openssl::x509::X509;
use secrecy::{ExposeSecret, Secret};
use database::file::s3_client_impl::{AwsS3BucketClientImpl, S3BucketStorage};
use gotrue::grant::{Grant, PasswordGrant};
use snowflake::Snowflake;
use sqlx::{postgres::PgPoolOptions, PgPool};
use std::net::TcpListener;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{Mutex, RwLock};
use tonic_proto::history::history_client::HistoryClient;
use workspace_access::WorkspaceAccessControlImpl;
use crate::api::ai::ai_completion_scope;
use crate::api::chat::chat_scope;
use crate::api::file_storage::file_storage_scope;
use crate::api::history::history_scope;
use crate::api::metrics::metrics_scope;
use crate::api::search::search_scope;
use appflowy_collaborate::indexer::IndexerProvider;
use database::file::s3_client_impl::{AwsS3BucketClientImpl, S3BucketStorage};
use tracing::{error, info, warn};
use workspace_access::WorkspaceAccessControlImpl;
use crate::api::template::template_scope;
use crate::api::user::user_scope;
use crate::api::workspace::{collab_scope, workspace_scope};
use crate::api::ws::ws_scope;
use crate::biz::collab::access_control::CollabMiddlewareAccessControl;
use crate::biz::pg_listener::PgListeners;
use crate::biz::workspace::access_control::WorkspaceMiddlewareAccessControl;
use crate::config::config::{Config, DatabaseSetting, GoTrueSetting, S3Setting};
use crate::mailer::Mailer;
use crate::middleware::access_control_mw::MiddlewareAccessControlTransform;
use crate::middleware::metrics_mw::MetricsMiddleware;
use crate::middleware::request_id::RequestIdMiddleware;
use crate::self_signed::create_self_signed_certificate;
use crate::state::{AppMetrics, AppState, GoTrueAdmin, UserCache};
pub struct Application {
port: u16,

View File

@ -1,16 +1,17 @@
use std::sync::Arc;
use appflowy_ai_client::client::AppFlowyAIClient;
use collab::lock::Mutex;
use dashmap::DashMap;
use secrecy::{ExposeSecret, Secret};
use sqlx::PgPool;
use tokio::sync::{Mutex, RwLock};
use tokio::sync::RwLock;
use tokio_stream::StreamExt;
use uuid::Uuid;
use access_control::access::AccessControl;
use access_control::metrics::AccessControlMetrics;
use app_error::AppError;
use appflowy_ai_client::client::AppFlowyAIClient;
use appflowy_collaborate::collab::access_control::CollabAccessControlImpl;
use appflowy_collaborate::collab::cache::CollabCache;
use appflowy_collaborate::collab::storage::CollabAccessControlStorage;
@ -19,7 +20,6 @@ use appflowy_collaborate::metrics::CollabMetrics;
use appflowy_collaborate::shared_state::RealtimeSharedState;
use appflowy_collaborate::CollabRealtimeMetrics;
use database::file::s3_client_impl::{AwsS3BucketClientImpl, S3BucketStorage};
use database::user::{select_all_uid_uuid, select_uid_from_uuid};
use gotrue::grant::{Grant, PasswordGrant};
use snowflake::Snowflake;

View File

@ -4,11 +4,11 @@ use std::time::Duration;
use collab::core::transaction::DocTransactionExtension;
use collab::entity::EncodedCollab;
use collab::lock::Mutex;
use collab::preclude::{Doc, Transact};
use collab_entity::CollabType;
use sqlx::types::Uuid;
use sqlx::PgPool;
use tokio::sync::Mutex;
use tokio::time::sleep;
use app_error::ErrorCode;