Merge branch 'main' into feat/delete-user
This commit is contained in:
commit
7c36c712c8
|
|
@ -131,3 +131,4 @@ APPFLOWY_INDEXER_REDIS_URL=redis://redis:6379
|
|||
|
||||
# AppFlowy Collaborate
|
||||
APPFLOWY_COLLABORATE_MULTI_THREAD=false
|
||||
APPFLOWY_COLLABORATE_REMOVE_BATCH_SIZE=100
|
||||
|
|
|
|||
1
dev.env
1
dev.env
|
|
@ -122,3 +122,4 @@ APPFLOWY_INDEXER_REDIS_URL=redis://redis:6379
|
|||
|
||||
# AppFlowy Collaborate
|
||||
APPFLOWY_COLLABORATE_MULTI_THREAD=false
|
||||
APPFLOWY_COLLABORATE_REMOVE_BATCH_SIZE=100
|
||||
|
|
|
|||
|
|
@ -6,13 +6,13 @@ use dashmap::mapref::one::RefMut;
|
|||
use dashmap::try_result::TryResult;
|
||||
use dashmap::DashMap;
|
||||
use tokio::time::sleep;
|
||||
use tracing::{error, event, info, warn};
|
||||
|
||||
use collab_rt_entity::user::RealtimeUser;
|
||||
use tracing::{error, event, info, trace, warn};
|
||||
|
||||
use crate::config::get_env_var;
|
||||
use crate::error::RealtimeError;
|
||||
use crate::group::group_init::CollabGroup;
|
||||
use crate::metrics::CollabRealtimeMetrics;
|
||||
use collab_rt_entity::user::RealtimeUser;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct GroupManagementState {
|
||||
|
|
@ -20,14 +20,20 @@ pub(crate) struct GroupManagementState {
|
|||
/// Keep track of all [Collab] objects that a user is subscribed to.
|
||||
editing_by_user: Arc<DashMap<RealtimeUser, HashSet<Editing>>>,
|
||||
metrics_calculate: Arc<CollabRealtimeMetrics>,
|
||||
/// By default, the number of groups to remove in a single batch is 50.
|
||||
remove_batch_size: usize,
|
||||
}
|
||||
|
||||
impl GroupManagementState {
|
||||
pub(crate) fn new(metrics_calculate: Arc<CollabRealtimeMetrics>) -> Self {
|
||||
let remove_batch_size = get_env_var("APPFLOWY_COLLABORATE_REMOVE_BATCH_SIZE", "50")
|
||||
.parse::<usize>()
|
||||
.unwrap_or(50);
|
||||
Self {
|
||||
group_by_object_id: Arc::new(DashMap::new()),
|
||||
editing_by_user: Arc::new(DashMap::new()),
|
||||
metrics_calculate,
|
||||
remove_batch_size,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -39,18 +45,18 @@ impl GroupManagementState {
|
|||
let (object_id, group) = (entry.key(), entry.value());
|
||||
if group.is_inactive().await {
|
||||
inactive_group_ids.push(object_id.clone());
|
||||
if inactive_group_ids.len() > 10 {
|
||||
if inactive_group_ids.len() > self.remove_batch_size {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!(
|
||||
"total groups:{}, inactive group:{:?}, inactive group ids:{:?}",
|
||||
"total groups:{}, inactive group:{}",
|
||||
self.group_by_object_id.len() as i64,
|
||||
inactive_group_ids.len(),
|
||||
inactive_group_ids,
|
||||
);
|
||||
|
||||
trace!("inactive group ids:{:?}", inactive_group_ids);
|
||||
for object_id in &inactive_group_ids {
|
||||
self.remove_group(object_id).await;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -282,9 +282,13 @@ fn spawn_period_check_inactive_group<S, AC>(
|
|||
S: CollabStorage,
|
||||
AC: RealtimeAccessControl,
|
||||
{
|
||||
let mut interval = interval(Duration::from_secs(60));
|
||||
let mut interval = interval(Duration::from_secs(20));
|
||||
let cloned_group_sender_by_object_id = group_sender_by_object_id.clone();
|
||||
tokio::task::spawn_local(async move {
|
||||
tokio::spawn(async move {
|
||||
// when appflowy-collaborate start, wait for 60 seconds to start the check. Since no groups will
|
||||
// be inactive in the first 60 seconds.
|
||||
tokio::time::sleep(Duration::from_secs(60)).await;
|
||||
|
||||
loop {
|
||||
interval.tick().await;
|
||||
if let Some(groups) = weak_groups.upgrade() {
|
||||
|
|
|
|||
Loading…
Reference in New Issue