chore: allow collab persistence and edit state behaviour to be configurable (#637)

This commit is contained in:
Khor Shu Heng 2024-06-23 23:19:54 +08:00 committed by GitHub
parent 537824e82b
commit c7d474c9cf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 55 additions and 2 deletions

View File

@ -24,6 +24,7 @@ use collab_stream::error::StreamError;
use collab_stream::model::{CollabUpdateEvent, StreamBinary};
use collab_stream::stream_group::StreamGroup;
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, Ordering};
use std::time::Duration;
use tokio::sync::mpsc;
use tracing::{debug, error, event, trace};
use yrs::updates::decoder::Decode;
@ -64,11 +65,18 @@ impl CollabGroup {
storage: Arc<S>,
is_new_collab: bool,
collab_redis_stream: Arc<CollabRedisStream>,
persistence_interval: Duration,
edit_state_max_count: u32,
edit_state_max_secs: i64,
) -> Result<Self, StreamError>
where
S: CollabStorage,
{
let edit_state = Arc::new(EditState::new(100, 360, is_new_collab));
let edit_state = Arc::new(EditState::new(
edit_state_max_count,
edit_state_max_secs,
is_new_collab,
));
let broadcast = CollabBroadcast::new(
&object_id,
10,
@ -87,6 +95,7 @@ impl CollabGroup {
edit_state.clone(),
collab.downgrade(),
collab_type.clone(),
persistence_interval,
)
.run(rx),
);

View File

@ -1,4 +1,5 @@
use std::sync::Arc;
use std::time::Duration;
use collab::core::collab::{DataSource, MutexCollab};
use collab::core::origin::CollabOrigin;
@ -32,6 +33,9 @@ pub struct GroupManager<S, AC> {
metrics_calculate: CollabMetricsCalculate,
collab_redis_stream: Arc<CollabRedisStream>,
control_event_stream: Arc<Mutex<StreamGroup>>,
persistence_interval: Duration,
edit_state_max_count: u32,
edit_state_max_secs: i64,
}
impl<S, AC> GroupManager<S, AC>
@ -44,6 +48,9 @@ where
access_control: Arc<AC>,
metrics_calculate: CollabMetricsCalculate,
collab_stream: CollabRedisStream,
persistence_interval: Duration,
edit_state_max_count: u32,
edit_state_max_secs: i64,
) -> Result<Self, RealtimeError> {
let collab_stream = Arc::new(collab_stream);
let control_event_stream = collab_stream
@ -58,6 +65,9 @@ where
metrics_calculate,
collab_redis_stream: collab_stream,
control_event_stream,
persistence_interval,
edit_state_max_count,
edit_state_max_secs,
})
}
@ -239,6 +249,9 @@ where
self.storage.clone(),
is_new_collab,
self.collab_redis_stream.clone(),
self.persistence_interval,
self.edit_state_max_count,
self.edit_state_max_secs,
)
.await?,
);

View File

@ -23,12 +23,14 @@ pub(crate) struct GroupPersistence<S> {
edit_state: Arc<EditState>,
mutex_collab: WeakMutexCollab,
collab_type: CollabType,
persistence_interval: Duration,
}
impl<S> GroupPersistence<S>
where
S: CollabStorage,
{
#[allow(clippy::too_many_arguments)]
pub fn new(
workspace_id: String,
object_id: String,
@ -37,6 +39,7 @@ where
edit_state: Arc<EditState>,
mutex_collab: WeakMutexCollab,
collab_type: CollabType,
persistence_interval: Duration,
) -> Self {
Self {
workspace_id,
@ -46,11 +49,12 @@ where
edit_state,
mutex_collab,
collab_type,
persistence_interval,
}
}
pub async fn run(self, mut destroy_group_rx: mpsc::Receiver<MutexCollab>) {
let mut interval = interval(Duration::from_secs(60));
let mut interval = interval(self.persistence_interval);
// TODO(nathan): remove this sleep when creating a new collab, applying all the updates
// workarounds for the issue that the collab doesn't contain the required data when first created
sleep(Duration::from_secs(5)).await;

View File

@ -43,12 +43,16 @@ where
S: CollabStorage,
AC: RealtimeAccessControl,
{
#[allow(clippy::too_many_arguments)]
pub async fn new(
storage: Arc<S>,
access_control: AC,
metrics: Arc<CollabRealtimeMetrics>,
command_recv: CLCommandReceiver,
redis_connection_manager: RedisConnectionManager,
group_persistence_interval: Duration,
edit_state_max_count: u32,
edit_state_max_secs: i64,
) -> Result<Self, RealtimeError> {
if cfg!(feature = "collab-rt-multi-thread") {
info!("CollaborationServer with multi-thread feature enabled");
@ -64,6 +68,9 @@ where
access_control.clone(),
metrics_calculate.clone(),
collab_stream,
group_persistence_interval,
edit_state_max_count,
edit_state_max_secs,
)
.await?,
);

View File

@ -130,6 +130,9 @@ pub async fn run_actix_server(
state.metrics.realtime_metrics.clone(),
rt_cmd_recv,
state.redis_connection_manager.clone(),
Duration::from_secs(config.collab.group_persistence_interval_secs),
config.collab.edit_state_max_count,
config.collab.edit_state_max_secs,
)
.await
.unwrap();

View File

@ -17,6 +17,7 @@ pub struct Config {
pub s3: S3Setting,
pub appflowy_ai: AppFlowyAISetting,
pub grpc_history: GrpcHistorySetting,
pub collab: CollabSetting,
pub mailer: MailerSetting,
}
@ -122,6 +123,13 @@ pub struct GrpcHistorySetting {
pub addrs: String,
}
#[derive(Clone, Debug)]
pub struct CollabSetting {
pub group_persistence_interval_secs: u64,
pub edit_state_max_count: u32,
pub edit_state_max_secs: i64,
}
// Default values favor local development.
pub fn get_configuration() -> Result<Config, anyhow::Error> {
let config = Config {
@ -177,6 +185,15 @@ pub fn get_configuration() -> Result<Config, anyhow::Error> {
grpc_history: GrpcHistorySetting {
addrs: get_env_var("APPFLOWY_GRPC_HISTORY_ADDRS", "http://localhost:50051"),
},
collab: CollabSetting {
group_persistence_interval_secs: get_env_var(
"APPFLOWY_COLLAB_GROUP_PERSISTENCE_INTERVAL",
"60",
)
.parse()?,
edit_state_max_count: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_COUNT", "100").parse()?,
edit_state_max_secs: get_env_var("APPFLOWY_COLLAB_EDIT_STATE_MAX_SECS", "360").parse()?,
},
mailer: MailerSetting {
smtp_host: get_env_var("APPFLOWY_MAILER_SMTP_HOST", "smtp.gmail.com"),
smtp_port: get_env_var("APPFLOWY_MAILER_SMTP_PORT", "465").parse()?,