From b73e7045dc1d2f26398a8b473d327b61a2c208cc Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Fri, 23 Feb 2024 00:36:37 +0800 Subject: [PATCH] chore: use redis as encoded collab cache (#342) --- Cargo.lock | 155 +-------------------- Cargo.toml | 2 - libs/database/src/collab/collab_storage.rs | 6 +- libs/realtime/src/collaborate/metrics.rs | 16 +-- libs/realtime/src/collaborate/server.rs | 3 +- src/biz/collab/mem_cache.rs | 122 ++++++++-------- src/biz/collab/storage.rs | 8 +- 7 files changed, 85 insertions(+), 227 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1618f806..30611ea3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -536,7 +536,6 @@ dependencies = [ "itertools 0.11.0", "lazy_static", "mime", - "moka", "once_cell", "opener", "openssl", @@ -692,15 +691,6 @@ dependencies = [ "serde_json", ] -[[package]] -name = "async-lock" -version = "2.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b" -dependencies = [ - "event-listener", -] - [[package]] name = "async-stream" version = "0.3.5" @@ -1086,12 +1076,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "bytecount" -version = "0.6.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1e5f035d16fc623ae5f74981db80a439803888314e3a555fd6f04acd51a3205" - [[package]] name = "bytemuck" version = "1.14.1" @@ -1122,37 +1106,6 @@ dependencies = [ "bytes", ] -[[package]] -name = "camino" -version = "1.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c59e92b5a388f549b863a7bea62612c09f24c8393560709a54558a9abdfb3b9c" -dependencies = [ - "serde", -] - -[[package]] -name = "cargo-platform" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ceed8ef69d8518a5dda55c07425450b58a4e1946f4951eab6d7191ee86c2443d" -dependencies = [ - "serde", -] - -[[package]] -name = "cargo_metadata" -version = "0.14.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa" -dependencies = [ - "camino", - "cargo-platform", - "semver", - "serde", - "serde_json", -] - [[package]] name = "casbin" version = "2.1.0" @@ -2002,15 +1955,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "error-chain" -version = "0.12.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc" -dependencies = [ - "version_check", -] - [[package]] name = "etcetera" version = "0.8.0" @@ -2336,12 +2280,6 @@ version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" -[[package]] -name = "glob" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" - [[package]] name = "gotrue" version = "0.1.0" @@ -2384,7 +2322,7 @@ dependencies = [ "no-std-compat", "nonzero_ext", "parking_lot 0.12.1", - "quanta 0.11.1", + "quanta", "rand 0.8.5", "smallvec", ] @@ -3192,30 +3130,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "moka" -version = "0.12.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1911e88d5831f748a4097a43862d129e3c6fca831eecac9b8db6d01d93c9de2" -dependencies = [ - "async-lock", - "async-trait", - "crossbeam-channel", - "crossbeam-epoch", - "crossbeam-utils", - "futures-util", - "once_cell", - "parking_lot 0.12.1", - "quanta 0.12.2", - "rustc_version", - "skeptic", - "smallvec", - "tagptr", - "thiserror", - "triomphe", - "uuid", -] - [[package]] name = "multimap" version = "0.8.3" @@ -4033,17 +3947,6 @@ dependencies = [ "psl-types", ] -[[package]] -name = "pulldown-cmark" -version = "0.9.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57206b407293d2bcd3af849ce869d52068623f19e1b5ff8e8778e3309439682b" -dependencies = [ - "bitflags 2.4.2", - "memchr", - "unicase", -] - [[package]] name = "quanta" version = "0.11.1" @@ -4054,22 +3957,7 @@ dependencies = [ "libc", "mach2", "once_cell", - "raw-cpuid 10.7.0", - "wasi 0.11.0+wasi-snapshot-preview1", - "web-sys", - "winapi", -] - -[[package]] -name = "quanta" -version = "0.12.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ca0b7bac0b97248c40bb77288fc52029cf1459c0461ea1b05ee32ccf011de2c" -dependencies = [ - "crossbeam-utils", - "libc", - "once_cell", - "raw-cpuid 11.0.1", + "raw-cpuid", "wasi 0.11.0+wasi-snapshot-preview1", "web-sys", "winapi", @@ -4180,15 +4068,6 @@ dependencies = [ "bitflags 1.3.2", ] -[[package]] -name = "raw-cpuid" -version = "11.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d86a7c4638d42c44551f4791a20e687dbb4c3de1f33c43dd71e355cd429def1" -dependencies = [ - "bitflags 2.4.2", -] - [[package]] name = "rayon" version = "1.8.1" @@ -4882,9 +4761,6 @@ name = "semver" version = "1.0.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b97ed7a9823b74f99c7742f5336af7be5ecd3eeafcb1507d1fa93347b1d589b0" -dependencies = [ - "serde", -] [[package]] name = "serde" @@ -5071,21 +4947,6 @@ version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" -[[package]] -name = "skeptic" -version = "0.13.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8" -dependencies = [ - "bytecount", - "cargo_metadata", - "error-chain", - "glob", - "pulldown-cmark", - "tempfile", - "walkdir", -] - [[package]] name = "slab" version = "0.4.9" @@ -5524,12 +5385,6 @@ dependencies = [ "libc", ] -[[package]] -name = "tagptr" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" - [[package]] name = "tap" version = "1.0.1" @@ -5967,12 +5822,6 @@ dependencies = [ "tracing-serde", ] -[[package]] -name = "triomphe" -version = "0.1.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "859eb650cfee7434994602c3a68b25d77ad9e68c8a6cd491616ef86661382eb3" - [[package]] name = "try-lock" version = "0.2.5" diff --git a/Cargo.toml b/Cargo.toml index 5474327f..999cc5cc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,7 +65,6 @@ casbin = { version = "2.1.0" } dotenvy = "0.15.7" url = "2.5.0" brotli = "3.4.0" -moka.workspace = true evmap.workspace = true dashmap.workspace = true @@ -156,7 +155,6 @@ tracing = { version = "0.1"} collab-entity = { version = "0.1.0" } gotrue = { path = "libs/gotrue" } redis = "0.24.0" -moka = { version = "0.12.5", features = ["future"] } evmap = "10.0.2" dashmap = "5.5.3" futures = "0.3.30" diff --git a/libs/database/src/collab/collab_storage.rs b/libs/database/src/collab/collab_storage.rs index 11e48e1f..0f30779c 100644 --- a/libs/database/src/collab/collab_storage.rs +++ b/libs/database/src/collab/collab_storage.rs @@ -59,7 +59,7 @@ pub trait CollabStorageAccessControl: Send + Sync + 'static { pub trait CollabStorage: Send + Sync + 'static { fn config(&self) -> &WriteConfig; - fn mem_usage(&self) -> usize; + fn encode_collab_mem_hit_rate(&self) -> f64; async fn cache_collab(&self, object_id: &str, collab: Weak); @@ -133,8 +133,8 @@ where self.as_ref().config() } - fn mem_usage(&self) -> usize { - self.as_ref().mem_usage() + fn encode_collab_mem_hit_rate(&self) -> f64 { + self.as_ref().encode_collab_mem_hit_rate() } async fn cache_collab(&self, object_id: &str, collab: Weak) { diff --git a/libs/realtime/src/collaborate/metrics.rs b/libs/realtime/src/collaborate/metrics.rs index d4207044..20c1e5bc 100644 --- a/libs/realtime/src/collaborate/metrics.rs +++ b/libs/realtime/src/collaborate/metrics.rs @@ -5,7 +5,7 @@ use tracing::trace; #[derive(Clone)] pub struct RealtimeMetrics { connected_users: Gauge, - mem_cache_usage: Gauge, + encode_collab_mem_hit_rate: Gauge, opening_collab_count: Gauge, } @@ -13,7 +13,7 @@ impl RealtimeMetrics { fn init() -> Self { Self { connected_users: Gauge::default(), - mem_cache_usage: Gauge::default(), + encode_collab_mem_hit_rate: Gauge::default(), opening_collab_count: Gauge::default(), } } @@ -27,9 +27,9 @@ impl RealtimeMetrics { metrics.connected_users.clone(), ); realtime_registry.register( - "mem_cache_usage", - "memory cache usage in bytes", - metrics.mem_cache_usage.clone(), + "mem_hit_rate", + "memory hit rate", + metrics.encode_collab_mem_hit_rate.clone(), ); realtime_registry.register( "opening_collab_count", @@ -45,10 +45,8 @@ impl RealtimeMetrics { self.connected_users.set(num as i64); } - pub fn record_mem_cache_usage(&self, size_in_bytes: usize) { - let size_in_mb = size_in_bytes / 1024; - trace!("[metrics]: mem_cache_usage: {} KB", size_in_mb); - self.mem_cache_usage.set(size_in_mb as i64); + pub fn record_encode_collab_mem_hit_rate(&self, rate: f64) { + self.encode_collab_mem_hit_rate.set(rate as i64); } pub fn record_opening_collab_count(&self, count: usize) { diff --git a/libs/realtime/src/collaborate/server.rs b/libs/realtime/src/collaborate/server.rs index 4d13ec94..90f8a5d3 100644 --- a/libs/realtime/src/collaborate/server.rs +++ b/libs/realtime/src/collaborate/server.rs @@ -87,7 +87,8 @@ where if let Some(groups) = weak_groups.upgrade() { cloned_metrics.record_opening_collab_count(groups.number_of_groups().await); cloned_metrics.record_connected_users(cloned_client_stream_by_user.len()); - cloned_metrics.record_mem_cache_usage(cloned_storage.mem_usage()); + cloned_metrics + .record_encode_collab_mem_hit_rate(cloned_storage.encode_collab_mem_hit_rate()); let inactive_group_ids = groups.tick().await; for id in inactive_group_ids { diff --git a/src/biz/collab/mem_cache.rs b/src/biz/collab/mem_cache.rs index 22183879..ee51e26e 100644 --- a/src/biz/collab/mem_cache.rs +++ b/src/biz/collab/mem_cache.rs @@ -1,87 +1,99 @@ use crate::state::RedisClient; -use bytes::Bytes; use collab::core::collab_plugin::EncodedCollab; -use moka::future::Cache; -use moka::notification::RemovalCause; -use moka::policy::EvictionPolicy; +use redis::AsyncCommands; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use tokio::sync::Mutex; -use tracing::{error, info}; +use tracing::{error, trace}; #[derive(Clone)] pub struct CollabMemCache { - cache: Arc>, - #[allow(dead_code)] redis_client: Arc>, + hits: Arc, + total_attempts: Arc, } impl CollabMemCache { pub fn new(redis_client: RedisClient) -> Self { - let eviction_listener = |key, value: Bytes, cause| { - if matches!(cause, RemovalCause::Expired | RemovalCause::Size) { - info!( - "Evicted key {}. value_len:{}, cause:{:?}", - key, - value.len(), - cause - ); - } - }; - - let cache = Cache::builder() - .weigher(|_key, value: &Bytes| -> u32 { - value.len() as u32 - }) - // This cache will hold up to 200MiB of values. - .max_capacity(200 * 1024 * 1024) - .eviction_policy(EvictionPolicy::tiny_lfu()) - .eviction_listener(eviction_listener) - .build(); Self { - cache: Arc::new(cache), redis_client: Arc::new(Mutex::new(redis_client)), + hits: Arc::new(AtomicU64::new(0)), + total_attempts: Arc::new(AtomicU64::new(0)), } } - pub fn usage(&self) -> usize { - self.cache.weighted_size() as usize - } - - pub async fn len(&self) -> usize { - self.cache.entry_count() as usize - } - pub async fn get_encoded_collab(&self, object_id: &str) -> Option { - let bytes = self.cache.get(object_id).await?; - tokio::task::spawn_blocking(move || match EncodedCollab::decode_from_bytes(&bytes) { - Ok(encoded_collab) => Some(encoded_collab), - Err(err) => { - error!("Failed to decode collab from redis cache bytes: {:?}", err); + self.total_attempts.fetch_add(1, Ordering::Relaxed); + let result = self + .redis_client + .lock() + .await + .get::<_, Option>>(object_id) + .await; + + match result { + Ok(Some(bytes)) => match EncodedCollab::decode_from_bytes(&bytes) { + Ok(encoded_collab) => { + self.hits.fetch_add(1, Ordering::Relaxed); + Some(encoded_collab) + }, + Err(err) => { + error!("Failed to decode collab from redis cache bytes: {:?}", err); + None + }, + }, + Ok(None) => { + trace!( + "No encoded collab found in cache for object_id: {}", + object_id + ); + None }, - }) - .await - .ok()? + Err(err) => { + error!("Failed to get encoded collab from redis: {:?}", err); + None + }, + } } - pub fn cache_encoded_collab(&self, object_id: String, encoded_collab: &EncodedCollab) { - let encoded_collab = encoded_collab.clone(); - let cache = self.cache.clone(); - tokio::task::spawn_blocking(move || match encoded_collab.encode_to_bytes() { + pub async fn cache_encoded_collab(&self, object_id: String, encoded_collab: &EncodedCollab) { + match encoded_collab.encode_to_bytes() { Ok(bytes) => { - tokio::spawn(async move { cache.insert(object_id, Bytes::from(bytes)).await }); + if let Err(err) = self.set_bytes_in_redis(object_id, bytes).await { + error!("Failed to cache encoded collab: {:?}", err); + } }, Err(e) => { error!("Failed to encode collab to bytes: {:?}", e); }, - }); - } - - pub async fn remove_encoded_collab(&self, object_id: &str) { - self.cache.invalidate(object_id).await; + } } pub async fn cache_encoded_collab_bytes(&self, object_id: String, bytes: Vec) { - self.cache.insert(object_id, Bytes::from(bytes)).await; + if let Err(err) = self.set_bytes_in_redis(object_id, bytes).await { + error!("Failed to cache encoded collab bytes: {:?}", err); + } + } + + // Helper function to set bytes in Redis. + async fn set_bytes_in_redis(&self, object_id: String, bytes: Vec) -> redis::RedisResult<()> { + self + .redis_client + .lock() + .await + .set::<_, Vec, ()>(object_id, bytes) + .await + } + + pub fn get_hit_rate(&self) -> f64 { + let hits = self.hits.load(Ordering::Relaxed) as f64; + let total_attempts = self.total_attempts.load(Ordering::Relaxed) as f64; + + if total_attempts == 0.0 { + 0.0 + } else { + hits / total_attempts + } } } diff --git a/src/biz/collab/storage.rs b/src/biz/collab/storage.rs index 0bb8493f..61b320fd 100644 --- a/src/biz/collab/storage.rs +++ b/src/biz/collab/storage.rs @@ -139,8 +139,8 @@ where self.disk_cache.config() } - fn mem_usage(&self) -> usize { - self.mem_cache.usage() + fn encode_collab_mem_hit_rate(&self) -> f64 { + self.mem_cache.get_hit_rate() } async fn cache_collab(&self, object_id: &str, collab: Weak) { @@ -153,7 +153,6 @@ where async fn remove_collab_cache(&self, object_id: &str) { tracing::trace!("remove opened collab:{} cache", object_id); self.opened_collab_by_object_id.remove(object_id); - self.mem_cache.remove_encoded_collab(object_id).await; } async fn upsert_collab(&self, uid: &i64, params: CreateCollabParams) -> DatabaseResult<()> { @@ -250,7 +249,8 @@ where let encoded_collab = self.disk_cache.get_collab_encoded(uid, params).await?; self .mem_cache - .cache_encoded_collab(object_id, &encoded_collab); + .cache_encoded_collab(object_id, &encoded_collab) + .await; Ok(encoded_collab) }, }