chore: use redis as encoded collab cache (#342)

This commit is contained in:
Nathan.fooo 2024-02-23 00:36:37 +08:00 committed by GitHub
parent 1590e948c6
commit b73e7045dc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 85 additions and 227 deletions

155
Cargo.lock generated
View File

@ -536,7 +536,6 @@ dependencies = [
"itertools 0.11.0",
"lazy_static",
"mime",
"moka",
"once_cell",
"opener",
"openssl",
@ -692,15 +691,6 @@ dependencies = [
"serde_json",
]
[[package]]
name = "async-lock"
version = "2.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b"
dependencies = [
"event-listener",
]
[[package]]
name = "async-stream"
version = "0.3.5"
@ -1086,12 +1076,6 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "bytecount"
version = "0.6.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1e5f035d16fc623ae5f74981db80a439803888314e3a555fd6f04acd51a3205"
[[package]]
name = "bytemuck"
version = "1.14.1"
@ -1122,37 +1106,6 @@ dependencies = [
"bytes",
]
[[package]]
name = "camino"
version = "1.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c59e92b5a388f549b863a7bea62612c09f24c8393560709a54558a9abdfb3b9c"
dependencies = [
"serde",
]
[[package]]
name = "cargo-platform"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ceed8ef69d8518a5dda55c07425450b58a4e1946f4951eab6d7191ee86c2443d"
dependencies = [
"serde",
]
[[package]]
name = "cargo_metadata"
version = "0.14.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa"
dependencies = [
"camino",
"cargo-platform",
"semver",
"serde",
"serde_json",
]
[[package]]
name = "casbin"
version = "2.1.0"
@ -2002,15 +1955,6 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "error-chain"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc"
dependencies = [
"version_check",
]
[[package]]
name = "etcetera"
version = "0.8.0"
@ -2336,12 +2280,6 @@ version = "0.28.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253"
[[package]]
name = "glob"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "gotrue"
version = "0.1.0"
@ -2384,7 +2322,7 @@ dependencies = [
"no-std-compat",
"nonzero_ext",
"parking_lot 0.12.1",
"quanta 0.11.1",
"quanta",
"rand 0.8.5",
"smallvec",
]
@ -3192,30 +3130,6 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "moka"
version = "0.12.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1911e88d5831f748a4097a43862d129e3c6fca831eecac9b8db6d01d93c9de2"
dependencies = [
"async-lock",
"async-trait",
"crossbeam-channel",
"crossbeam-epoch",
"crossbeam-utils",
"futures-util",
"once_cell",
"parking_lot 0.12.1",
"quanta 0.12.2",
"rustc_version",
"skeptic",
"smallvec",
"tagptr",
"thiserror",
"triomphe",
"uuid",
]
[[package]]
name = "multimap"
version = "0.8.3"
@ -4033,17 +3947,6 @@ dependencies = [
"psl-types",
]
[[package]]
name = "pulldown-cmark"
version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57206b407293d2bcd3af849ce869d52068623f19e1b5ff8e8778e3309439682b"
dependencies = [
"bitflags 2.4.2",
"memchr",
"unicase",
]
[[package]]
name = "quanta"
version = "0.11.1"
@ -4054,22 +3957,7 @@ dependencies = [
"libc",
"mach2",
"once_cell",
"raw-cpuid 10.7.0",
"wasi 0.11.0+wasi-snapshot-preview1",
"web-sys",
"winapi",
]
[[package]]
name = "quanta"
version = "0.12.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ca0b7bac0b97248c40bb77288fc52029cf1459c0461ea1b05ee32ccf011de2c"
dependencies = [
"crossbeam-utils",
"libc",
"once_cell",
"raw-cpuid 11.0.1",
"raw-cpuid",
"wasi 0.11.0+wasi-snapshot-preview1",
"web-sys",
"winapi",
@ -4180,15 +4068,6 @@ dependencies = [
"bitflags 1.3.2",
]
[[package]]
name = "raw-cpuid"
version = "11.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d86a7c4638d42c44551f4791a20e687dbb4c3de1f33c43dd71e355cd429def1"
dependencies = [
"bitflags 2.4.2",
]
[[package]]
name = "rayon"
version = "1.8.1"
@ -4882,9 +4761,6 @@ name = "semver"
version = "1.0.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b97ed7a9823b74f99c7742f5336af7be5ecd3eeafcb1507d1fa93347b1d589b0"
dependencies = [
"serde",
]
[[package]]
name = "serde"
@ -5071,21 +4947,6 @@ version = "0.3.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d"
[[package]]
name = "skeptic"
version = "0.13.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8"
dependencies = [
"bytecount",
"cargo_metadata",
"error-chain",
"glob",
"pulldown-cmark",
"tempfile",
"walkdir",
]
[[package]]
name = "slab"
version = "0.4.9"
@ -5524,12 +5385,6 @@ dependencies = [
"libc",
]
[[package]]
name = "tagptr"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
[[package]]
name = "tap"
version = "1.0.1"
@ -5967,12 +5822,6 @@ dependencies = [
"tracing-serde",
]
[[package]]
name = "triomphe"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "859eb650cfee7434994602c3a68b25d77ad9e68c8a6cd491616ef86661382eb3"
[[package]]
name = "try-lock"
version = "0.2.5"

View File

@ -65,7 +65,6 @@ casbin = { version = "2.1.0" }
dotenvy = "0.15.7"
url = "2.5.0"
brotli = "3.4.0"
moka.workspace = true
evmap.workspace = true
dashmap.workspace = true
@ -156,7 +155,6 @@ tracing = { version = "0.1"}
collab-entity = { version = "0.1.0" }
gotrue = { path = "libs/gotrue" }
redis = "0.24.0"
moka = { version = "0.12.5", features = ["future"] }
evmap = "10.0.2"
dashmap = "5.5.3"
futures = "0.3.30"

View File

@ -59,7 +59,7 @@ pub trait CollabStorageAccessControl: Send + Sync + 'static {
pub trait CollabStorage: Send + Sync + 'static {
fn config(&self) -> &WriteConfig;
fn mem_usage(&self) -> usize;
fn encode_collab_mem_hit_rate(&self) -> f64;
async fn cache_collab(&self, object_id: &str, collab: Weak<MutexCollab>);
@ -133,8 +133,8 @@ where
self.as_ref().config()
}
fn mem_usage(&self) -> usize {
self.as_ref().mem_usage()
fn encode_collab_mem_hit_rate(&self) -> f64 {
self.as_ref().encode_collab_mem_hit_rate()
}
async fn cache_collab(&self, object_id: &str, collab: Weak<MutexCollab>) {

View File

@ -5,7 +5,7 @@ use tracing::trace;
#[derive(Clone)]
pub struct RealtimeMetrics {
connected_users: Gauge,
mem_cache_usage: Gauge,
encode_collab_mem_hit_rate: Gauge,
opening_collab_count: Gauge,
}
@ -13,7 +13,7 @@ impl RealtimeMetrics {
fn init() -> Self {
Self {
connected_users: Gauge::default(),
mem_cache_usage: Gauge::default(),
encode_collab_mem_hit_rate: Gauge::default(),
opening_collab_count: Gauge::default(),
}
}
@ -27,9 +27,9 @@ impl RealtimeMetrics {
metrics.connected_users.clone(),
);
realtime_registry.register(
"mem_cache_usage",
"memory cache usage in bytes",
metrics.mem_cache_usage.clone(),
"mem_hit_rate",
"memory hit rate",
metrics.encode_collab_mem_hit_rate.clone(),
);
realtime_registry.register(
"opening_collab_count",
@ -45,10 +45,8 @@ impl RealtimeMetrics {
self.connected_users.set(num as i64);
}
pub fn record_mem_cache_usage(&self, size_in_bytes: usize) {
let size_in_mb = size_in_bytes / 1024;
trace!("[metrics]: mem_cache_usage: {} KB", size_in_mb);
self.mem_cache_usage.set(size_in_mb as i64);
pub fn record_encode_collab_mem_hit_rate(&self, rate: f64) {
self.encode_collab_mem_hit_rate.set(rate as i64);
}
pub fn record_opening_collab_count(&self, count: usize) {

View File

@ -87,7 +87,8 @@ where
if let Some(groups) = weak_groups.upgrade() {
cloned_metrics.record_opening_collab_count(groups.number_of_groups().await);
cloned_metrics.record_connected_users(cloned_client_stream_by_user.len());
cloned_metrics.record_mem_cache_usage(cloned_storage.mem_usage());
cloned_metrics
.record_encode_collab_mem_hit_rate(cloned_storage.encode_collab_mem_hit_rate());
let inactive_group_ids = groups.tick().await;
for id in inactive_group_ids {

View File

@ -1,87 +1,99 @@
use crate::state::RedisClient;
use bytes::Bytes;
use collab::core::collab_plugin::EncodedCollab;
use moka::future::Cache;
use moka::notification::RemovalCause;
use moka::policy::EvictionPolicy;
use redis::AsyncCommands;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::{error, info};
use tracing::{error, trace};
#[derive(Clone)]
pub struct CollabMemCache {
cache: Arc<Cache<String, Bytes>>,
#[allow(dead_code)]
redis_client: Arc<Mutex<RedisClient>>,
hits: Arc<AtomicU64>,
total_attempts: Arc<AtomicU64>,
}
impl CollabMemCache {
pub fn new(redis_client: RedisClient) -> Self {
let eviction_listener = |key, value: Bytes, cause| {
if matches!(cause, RemovalCause::Expired | RemovalCause::Size) {
info!(
"Evicted key {}. value_len:{}, cause:{:?}",
key,
value.len(),
cause
);
}
};
let cache = Cache::builder()
.weigher(|_key, value: &Bytes| -> u32 {
value.len() as u32
})
// This cache will hold up to 200MiB of values.
.max_capacity(200 * 1024 * 1024)
.eviction_policy(EvictionPolicy::tiny_lfu())
.eviction_listener(eviction_listener)
.build();
Self {
cache: Arc::new(cache),
redis_client: Arc::new(Mutex::new(redis_client)),
hits: Arc::new(AtomicU64::new(0)),
total_attempts: Arc::new(AtomicU64::new(0)),
}
}
pub fn usage(&self) -> usize {
self.cache.weighted_size() as usize
}
pub async fn len(&self) -> usize {
self.cache.entry_count() as usize
}
pub async fn get_encoded_collab(&self, object_id: &str) -> Option<EncodedCollab> {
let bytes = self.cache.get(object_id).await?;
tokio::task::spawn_blocking(move || match EncodedCollab::decode_from_bytes(&bytes) {
Ok(encoded_collab) => Some(encoded_collab),
Err(err) => {
error!("Failed to decode collab from redis cache bytes: {:?}", err);
self.total_attempts.fetch_add(1, Ordering::Relaxed);
let result = self
.redis_client
.lock()
.await
.get::<_, Option<Vec<u8>>>(object_id)
.await;
match result {
Ok(Some(bytes)) => match EncodedCollab::decode_from_bytes(&bytes) {
Ok(encoded_collab) => {
self.hits.fetch_add(1, Ordering::Relaxed);
Some(encoded_collab)
},
Err(err) => {
error!("Failed to decode collab from redis cache bytes: {:?}", err);
None
},
},
Ok(None) => {
trace!(
"No encoded collab found in cache for object_id: {}",
object_id
);
None
},
})
.await
.ok()?
Err(err) => {
error!("Failed to get encoded collab from redis: {:?}", err);
None
},
}
}
pub fn cache_encoded_collab(&self, object_id: String, encoded_collab: &EncodedCollab) {
let encoded_collab = encoded_collab.clone();
let cache = self.cache.clone();
tokio::task::spawn_blocking(move || match encoded_collab.encode_to_bytes() {
pub async fn cache_encoded_collab(&self, object_id: String, encoded_collab: &EncodedCollab) {
match encoded_collab.encode_to_bytes() {
Ok(bytes) => {
tokio::spawn(async move { cache.insert(object_id, Bytes::from(bytes)).await });
if let Err(err) = self.set_bytes_in_redis(object_id, bytes).await {
error!("Failed to cache encoded collab: {:?}", err);
}
},
Err(e) => {
error!("Failed to encode collab to bytes: {:?}", e);
},
});
}
pub async fn remove_encoded_collab(&self, object_id: &str) {
self.cache.invalidate(object_id).await;
}
}
pub async fn cache_encoded_collab_bytes(&self, object_id: String, bytes: Vec<u8>) {
self.cache.insert(object_id, Bytes::from(bytes)).await;
if let Err(err) = self.set_bytes_in_redis(object_id, bytes).await {
error!("Failed to cache encoded collab bytes: {:?}", err);
}
}
// Helper function to set bytes in Redis.
async fn set_bytes_in_redis(&self, object_id: String, bytes: Vec<u8>) -> redis::RedisResult<()> {
self
.redis_client
.lock()
.await
.set::<_, Vec<u8>, ()>(object_id, bytes)
.await
}
pub fn get_hit_rate(&self) -> f64 {
let hits = self.hits.load(Ordering::Relaxed) as f64;
let total_attempts = self.total_attempts.load(Ordering::Relaxed) as f64;
if total_attempts == 0.0 {
0.0
} else {
hits / total_attempts
}
}
}

View File

@ -139,8 +139,8 @@ where
self.disk_cache.config()
}
fn mem_usage(&self) -> usize {
self.mem_cache.usage()
fn encode_collab_mem_hit_rate(&self) -> f64 {
self.mem_cache.get_hit_rate()
}
async fn cache_collab(&self, object_id: &str, collab: Weak<MutexCollab>) {
@ -153,7 +153,6 @@ where
async fn remove_collab_cache(&self, object_id: &str) {
tracing::trace!("remove opened collab:{} cache", object_id);
self.opened_collab_by_object_id.remove(object_id);
self.mem_cache.remove_encoded_collab(object_id).await;
}
async fn upsert_collab(&self, uid: &i64, params: CreateCollabParams) -> DatabaseResult<()> {
@ -250,7 +249,8 @@ where
let encoded_collab = self.disk_cache.get_collab_encoded(uid, params).await?;
self
.mem_cache
.cache_encoded_collab(object_id, &encoded_collab);
.cache_encoded_collab(object_id, &encoded_collab)
.await;
Ok(encoded_collab)
},
}