diff --git a/Cargo.lock b/Cargo.lock index ab74bfa1..6c2af36e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -533,8 +533,8 @@ dependencies = [ "infra", "itertools 0.11.0", "lazy_static", - "lru", "mime", + "moka", "once_cell", "opener", "openssl", @@ -690,6 +690,15 @@ dependencies = [ "serde_json", ] +[[package]] +name = "async-lock" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b" +dependencies = [ + "event-listener", +] + [[package]] name = "async-trait" version = "0.1.77" @@ -1053,6 +1062,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "bytecount" +version = "0.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1e5f035d16fc623ae5f74981db80a439803888314e3a555fd6f04acd51a3205" + [[package]] name = "bytemuck" version = "1.14.1" @@ -1083,6 +1098,37 @@ dependencies = [ "bytes", ] +[[package]] +name = "camino" +version = "1.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c59e92b5a388f549b863a7bea62612c09f24c8393560709a54558a9abdfb3b9c" +dependencies = [ + "serde", +] + +[[package]] +name = "cargo-platform" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ceed8ef69d8518a5dda55c07425450b58a4e1946f4951eab6d7191ee86c2443d" +dependencies = [ + "serde", +] + +[[package]] +name = "cargo_metadata" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa" +dependencies = [ + "camino", + "cargo-platform", + "semver", + "serde", + "serde_json", +] + [[package]] name = "casbin" version = "2.1.0" @@ -1916,6 +1962,15 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "error-chain" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc" +dependencies = [ + "version_check", +] + [[package]] name = "etcetera" version = "0.8.0" @@ -2224,6 +2279,12 @@ version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "gotrue" version = "0.1.0" @@ -2896,15 +2957,6 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" -[[package]] -name = "lru" -version = "0.12.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db2c024b41519440580066ba82aab04092b333e09066a5eb86c7c4890df31f22" -dependencies = [ - "hashbrown 0.14.3", -] - [[package]] name = "mac" version = "0.1.1" @@ -3050,6 +3102,30 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "moka" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1911e88d5831f748a4097a43862d129e3c6fca831eecac9b8db6d01d93c9de2" +dependencies = [ + "async-lock", + "async-trait", + "crossbeam-channel", + "crossbeam-epoch", + "crossbeam-utils", + "futures-util", + "once_cell", + "parking_lot 0.12.1", + "quanta", + "rustc_version", + "skeptic", + "smallvec", + "tagptr", + "thiserror", + "triomphe", + "uuid", +] + [[package]] name = "multimap" version = "0.8.3" @@ -3855,6 +3931,32 @@ dependencies = [ "psl-types", ] +[[package]] +name = "pulldown-cmark" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57206b407293d2bcd3af849ce869d52068623f19e1b5ff8e8778e3309439682b" +dependencies = [ + "bitflags 2.4.2", + "memchr", + "unicase", +] + +[[package]] +name = "quanta" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ca0b7bac0b97248c40bb77288fc52029cf1459c0461ea1b05ee32ccf011de2c" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi 0.11.0+wasi-snapshot-preview1", + "web-sys", + "winapi", +] + [[package]] name = "quick-xml" version = "0.26.0" @@ -3951,6 +4053,15 @@ dependencies = [ "rand_core 0.5.1", ] +[[package]] +name = "raw-cpuid" +version = "11.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d86a7c4638d42c44551f4791a20e687dbb4c3de1f33c43dd71e355cd429def1" +dependencies = [ + "bitflags 2.4.2", +] + [[package]] name = "rayon" version = "1.8.1" @@ -4639,6 +4750,9 @@ name = "semver" version = "1.0.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b97ed7a9823b74f99c7742f5336af7be5ecd3eeafcb1507d1fa93347b1d589b0" +dependencies = [ + "serde", +] [[package]] name = "serde" @@ -4825,6 +4939,21 @@ version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" +[[package]] +name = "skeptic" +version = "0.13.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8" +dependencies = [ + "bytecount", + "cargo_metadata", + "error-chain", + "glob", + "pulldown-cmark", + "tempfile", + "walkdir", +] + [[package]] name = "slab" version = "0.4.9" @@ -5263,6 +5392,12 @@ dependencies = [ "libc", ] +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "tap" version = "1.0.1" @@ -5700,6 +5835,12 @@ dependencies = [ "tracing-serde", ] +[[package]] +name = "triomphe" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "859eb650cfee7434994602c3a68b25d77ad9e68c8a6cd491616ef86661382eb3" + [[package]] name = "try-lock" version = "0.2.5" diff --git a/Cargo.toml b/Cargo.toml index 1f23e2ac..dab2c048 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,7 +65,7 @@ casbin = { version = "2.1.0" } dotenvy = "0.15.7" url = "2.5.0" brotli = "3.4.0" -lru.workspace = true +moka.workspace = true # collab collab = { version = "0.1.0", features = ["async-plugin"] } @@ -153,7 +153,7 @@ tracing = { version = "0.1"} collab-entity = { version = "0.1.0" } gotrue = { path = "libs/gotrue" } redis = "0.24.0" -lru = "0.12.2" +moka = { version = "0.12.5", features = ["future"] } [profile.release] lto = true diff --git a/libs/database/src/collab/collab_storage.rs b/libs/database/src/collab/collab_storage.rs index 352bbb3a..4dcf4710 100644 --- a/libs/database/src/collab/collab_storage.rs +++ b/libs/database/src/collab/collab_storage.rs @@ -59,8 +59,6 @@ pub trait CollabStorageAccessControl: Send + Sync + 'static { pub trait CollabStorage: Send + Sync + 'static { fn config(&self) -> &WriteConfig; - async fn status(&self) -> String; - async fn cache_collab(&self, object_id: &str, collab: Weak); async fn remove_collab_cache(&self, object_id: &str); @@ -133,10 +131,6 @@ where self.as_ref().config() } - async fn status(&self) -> String { - self.as_ref().status().await - } - async fn cache_collab(&self, object_id: &str, collab: Weak) { self.as_ref().cache_collab(object_id, collab).await } diff --git a/libs/realtime/src/collaborate/broadcast.rs b/libs/realtime/src/collaborate/broadcast.rs index 64e8cb0c..bbdb5197 100644 --- a/libs/realtime/src/collaborate/broadcast.rs +++ b/libs/realtime/src/collaborate/broadcast.rs @@ -300,7 +300,7 @@ pub struct Subscription { } impl Subscription { - pub async fn stop(mut self) { + pub async fn stop(&mut self) { if let Some(sink_stop_tx) = self.sink_stop_tx.take() { let _ = sink_stop_tx.send(()).await; } diff --git a/libs/realtime/src/collaborate/group.rs b/libs/realtime/src/collaborate/group.rs index 6b2f8f0f..1782fa8c 100644 --- a/libs/realtime/src/collaborate/group.rs +++ b/libs/realtime/src/collaborate/group.rs @@ -73,7 +73,7 @@ where pub async fn remove_user(&self, object_id: &str, user: &U) -> Result<(), Error> { let group_by_object_id = self.group_by_object_id.try_read()?; if let Some(group) = group_by_object_id.get(object_id) { - if let Some(subscriber) = group.subscribers.try_write()?.remove(user) { + if let Some(mut subscriber) = group.subscribers.try_write()?.remove(user) { trace!("Remove subscriber: {}", subscriber.origin); subscriber.stop().await; } @@ -97,21 +97,30 @@ where #[instrument(skip(self))] pub async fn remove_group(&self, object_id: &str) { - match self.group_by_object_id.try_write() { - Ok(mut group_by_object_id) => { - match group_by_object_id.remove(object_id) { - None => { - // The group should be exist, but it's not. This is an unexpected situation. - error!("Group for object_id:{} not found", object_id); - }, - Some(group) => { - group.flush_collab().await; - }, - } - self.storage.remove_collab_cache(object_id).await; + let mut group_by_object_id = match self.group_by_object_id.try_write() { + Ok(lock) => lock, + Err(err) => { + error!("Failed to acquire write lock to remove group: {:?}", err); + return; }, - Err(err) => error!("Failed to acquire write lock to remove group: {:?}", err), + }; + + if let Some(group) = group_by_object_id.remove(object_id) { + group.flush_collab().await; + + // As we've already removed the group, we directly operate on the removed group's subscribers. + if let Ok(mut subscribers) = group.subscribers.try_write() { + for (_, subscriber) in subscribers.iter_mut() { + subscriber.stop().await; + } + } + } else { + // Log error if the group doesn't exist + error!("Group for object_id:{} not found", object_id); } + drop(group_by_object_id); + + self.storage.remove_collab_cache(object_id).await; } pub async fn create_group_if_need( diff --git a/libs/realtime/src/collaborate/server.rs b/libs/realtime/src/collaborate/server.rs index feb3174c..dbb0a8f4 100644 --- a/libs/realtime/src/collaborate/server.rs +++ b/libs/realtime/src/collaborate/server.rs @@ -67,16 +67,11 @@ where let edit_collab_by_user = Arc::new(Mutex::new(HashMap::new())); let weak_groups = Arc::downgrade(&groups); - let weak_storage = Arc::downgrade(&storage); tokio::spawn(async move { - let mut interval = interval(Duration::from_secs(60)); + let mut interval = interval(Duration::from_secs(30)); loop { interval.tick().await; - if let Some(storage) = weak_storage.upgrade() { - info!("{}", storage.status().await); - } - match weak_groups.upgrade() { Some(groups) => { trace!( diff --git a/src/biz/collab/mem_cache.rs b/src/biz/collab/mem_cache.rs index d2e8d581..3b2a075b 100644 --- a/src/biz/collab/mem_cache.rs +++ b/src/biz/collab/mem_cache.rs @@ -1,35 +1,55 @@ use crate::state::RedisClient; +use bytes::Bytes; use collab::core::collab_plugin::EncodedCollab; -use lru::LruCache; -use std::num::NonZeroUsize; +use moka::future::Cache; +use moka::notification::RemovalCause; +use moka::policy::EvictionPolicy; use std::sync::Arc; use tokio::sync::Mutex; -use tracing::error; +use tracing::{error, info}; #[derive(Clone)] pub struct CollabMemCache { - lru_cache: Arc>>>, + cache: Arc>, + #[allow(dead_code)] + redis_client: Arc>, } impl CollabMemCache { - pub fn new(_redis_client: RedisClient) -> Self { - let lru = LruCache::new(NonZeroUsize::new(3000).unwrap()); + pub fn new(redis_client: RedisClient) -> Self { + let eviction_listener = |key, value: Bytes, cause| { + if matches!(cause, RemovalCause::Expired | RemovalCause::Size) { + info!( + "Evicted key {}. value:{}, cause:{:?}", + key, + value.len(), + cause + ); + } + }; + + let cache = Cache::builder() + .weigher(|_key, value: &Bytes| -> u32 { + value.len() as u32 + }) + // This cache will hold up to 200MiB of values. + .max_capacity(200 * 1024 * 1024) + .eviction_policy(EvictionPolicy::tiny_lfu()) + .eviction_listener(eviction_listener) + .build(); Self { - lru_cache: Arc::new(Mutex::new(lru)), + cache: Arc::new(cache), + redis_client: Arc::new(Mutex::new(redis_client)), } } pub async fn len(&self) -> usize { - self - .lru_cache - .try_lock() - .map(|cache| cache.len()) - .unwrap_or(0) + self.cache.entry_count() as usize } pub async fn get_encoded_collab(&self, object_id: &str) -> Option { - let cache = self.lru_cache.lock().await.get(object_id)?.clone(); - tokio::task::spawn_blocking(move || match EncodedCollab::decode_from_bytes(&cache) { + let bytes = self.cache.get(object_id).await?; + tokio::task::spawn_blocking(move || match EncodedCollab::decode_from_bytes(&bytes) { Ok(encoded_collab) => Some(encoded_collab), Err(err) => { error!("Failed to decode collab from redis cache bytes: {:?}", err); @@ -42,10 +62,10 @@ impl CollabMemCache { pub fn cache_encoded_collab(&self, object_id: String, encoded_collab: &EncodedCollab) { let encoded_collab = encoded_collab.clone(); - let cache = self.lru_cache.clone(); + let cache = self.cache.clone(); tokio::task::spawn_blocking(move || match encoded_collab.encode_to_bytes() { Ok(bytes) => { - tokio::spawn(async move { cache.lock().await.put(object_id, bytes) }); + tokio::spawn(async move { cache.insert(object_id, Bytes::from(bytes)).await }); }, Err(e) => { error!("Failed to encode collab to bytes: {:?}", e); @@ -53,8 +73,12 @@ impl CollabMemCache { }); } + pub async fn remove_encoded_collab(&self, object_id: &str) { + self.cache.invalidate(object_id).await; + } + pub fn cache_encoded_collab_bytes(&self, object_id: String, bytes: Vec) { - let cache = self.lru_cache.clone(); - tokio::spawn(async move { cache.lock().await.put(object_id, bytes) }); + let cache = self.cache.clone(); + tokio::spawn(async move { cache.insert(object_id, Bytes::from(bytes)).await }); } } diff --git a/src/biz/collab/storage.rs b/src/biz/collab/storage.rs index b6d90fb1..6720bd35 100644 --- a/src/biz/collab/storage.rs +++ b/src/biz/collab/storage.rs @@ -84,10 +84,6 @@ where self.disk_cache.config() } - async fn status(&self) -> String { - format!("cache collab: {}", self.mem_cache.len().await) - } - async fn cache_collab(&self, object_id: &str, collab: Weak) { tracing::trace!("cache opened collab:{}", object_id); self @@ -104,6 +100,7 @@ where .write() .await .remove(object_id); + self.mem_cache.remove_encoded_collab(object_id).await; } async fn upsert_collab(&self, uid: &i64, params: CreateCollabParams) -> DatabaseResult<()> {