chore: cache within memory size (#305)
* chore: cache within memory size * chore: update
This commit is contained in:
parent
03471f3af7
commit
e1307f4f5d
|
|
@ -533,8 +533,8 @@ dependencies = [
|
|||
"infra",
|
||||
"itertools 0.11.0",
|
||||
"lazy_static",
|
||||
"lru",
|
||||
"mime",
|
||||
"moka",
|
||||
"once_cell",
|
||||
"opener",
|
||||
"openssl",
|
||||
|
|
@ -690,6 +690,15 @@ dependencies = [
|
|||
"serde_json",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-lock"
|
||||
version = "2.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b"
|
||||
dependencies = [
|
||||
"event-listener",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-trait"
|
||||
version = "0.1.77"
|
||||
|
|
@ -1053,6 +1062,12 @@ dependencies = [
|
|||
"syn 1.0.109",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "bytecount"
|
||||
version = "0.6.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e1e5f035d16fc623ae5f74981db80a439803888314e3a555fd6f04acd51a3205"
|
||||
|
||||
[[package]]
|
||||
name = "bytemuck"
|
||||
version = "1.14.1"
|
||||
|
|
@ -1083,6 +1098,37 @@ dependencies = [
|
|||
"bytes",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "camino"
|
||||
version = "1.1.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c59e92b5a388f549b863a7bea62612c09f24c8393560709a54558a9abdfb3b9c"
|
||||
dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cargo-platform"
|
||||
version = "0.1.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ceed8ef69d8518a5dda55c07425450b58a4e1946f4951eab6d7191ee86c2443d"
|
||||
dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cargo_metadata"
|
||||
version = "0.14.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4acbb09d9ee8e23699b9634375c72795d095bf268439da88562cf9b501f181fa"
|
||||
dependencies = [
|
||||
"camino",
|
||||
"cargo-platform",
|
||||
"semver",
|
||||
"serde",
|
||||
"serde_json",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "casbin"
|
||||
version = "2.1.0"
|
||||
|
|
@ -1916,6 +1962,15 @@ dependencies = [
|
|||
"windows-sys 0.52.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "error-chain"
|
||||
version = "0.12.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2d2f06b9cac1506ece98fe3231e3cc9c4410ec3d5b1f24ae1c8946f0742cdefc"
|
||||
dependencies = [
|
||||
"version_check",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "etcetera"
|
||||
version = "0.8.0"
|
||||
|
|
@ -2224,6 +2279,12 @@ version = "0.28.1"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253"
|
||||
|
||||
[[package]]
|
||||
name = "glob"
|
||||
version = "0.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
|
||||
|
||||
[[package]]
|
||||
name = "gotrue"
|
||||
version = "0.1.0"
|
||||
|
|
@ -2896,15 +2957,6 @@ version = "0.4.20"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
|
||||
|
||||
[[package]]
|
||||
name = "lru"
|
||||
version = "0.12.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "db2c024b41519440580066ba82aab04092b333e09066a5eb86c7c4890df31f22"
|
||||
dependencies = [
|
||||
"hashbrown 0.14.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mac"
|
||||
version = "0.1.1"
|
||||
|
|
@ -3050,6 +3102,30 @@ dependencies = [
|
|||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "moka"
|
||||
version = "0.12.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b1911e88d5831f748a4097a43862d129e3c6fca831eecac9b8db6d01d93c9de2"
|
||||
dependencies = [
|
||||
"async-lock",
|
||||
"async-trait",
|
||||
"crossbeam-channel",
|
||||
"crossbeam-epoch",
|
||||
"crossbeam-utils",
|
||||
"futures-util",
|
||||
"once_cell",
|
||||
"parking_lot 0.12.1",
|
||||
"quanta",
|
||||
"rustc_version",
|
||||
"skeptic",
|
||||
"smallvec",
|
||||
"tagptr",
|
||||
"thiserror",
|
||||
"triomphe",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "multimap"
|
||||
version = "0.8.3"
|
||||
|
|
@ -3855,6 +3931,32 @@ dependencies = [
|
|||
"psl-types",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pulldown-cmark"
|
||||
version = "0.9.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "57206b407293d2bcd3af849ce869d52068623f19e1b5ff8e8778e3309439682b"
|
||||
dependencies = [
|
||||
"bitflags 2.4.2",
|
||||
"memchr",
|
||||
"unicase",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quanta"
|
||||
version = "0.12.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9ca0b7bac0b97248c40bb77288fc52029cf1459c0461ea1b05ee32ccf011de2c"
|
||||
dependencies = [
|
||||
"crossbeam-utils",
|
||||
"libc",
|
||||
"once_cell",
|
||||
"raw-cpuid",
|
||||
"wasi 0.11.0+wasi-snapshot-preview1",
|
||||
"web-sys",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quick-xml"
|
||||
version = "0.26.0"
|
||||
|
|
@ -3951,6 +4053,15 @@ dependencies = [
|
|||
"rand_core 0.5.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "raw-cpuid"
|
||||
version = "11.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9d86a7c4638d42c44551f4791a20e687dbb4c3de1f33c43dd71e355cd429def1"
|
||||
dependencies = [
|
||||
"bitflags 2.4.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rayon"
|
||||
version = "1.8.1"
|
||||
|
|
@ -4639,6 +4750,9 @@ name = "semver"
|
|||
version = "1.0.21"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b97ed7a9823b74f99c7742f5336af7be5ecd3eeafcb1507d1fa93347b1d589b0"
|
||||
dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde"
|
||||
|
|
@ -4825,6 +4939,21 @@ version = "0.3.11"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d"
|
||||
|
||||
[[package]]
|
||||
name = "skeptic"
|
||||
version = "0.13.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "16d23b015676c90a0f01c197bfdc786c20342c73a0afdda9025adb0bc42940a8"
|
||||
dependencies = [
|
||||
"bytecount",
|
||||
"cargo_metadata",
|
||||
"error-chain",
|
||||
"glob",
|
||||
"pulldown-cmark",
|
||||
"tempfile",
|
||||
"walkdir",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "slab"
|
||||
version = "0.4.9"
|
||||
|
|
@ -5263,6 +5392,12 @@ dependencies = [
|
|||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tagptr"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417"
|
||||
|
||||
[[package]]
|
||||
name = "tap"
|
||||
version = "1.0.1"
|
||||
|
|
@ -5700,6 +5835,12 @@ dependencies = [
|
|||
"tracing-serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "triomphe"
|
||||
version = "0.1.11"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "859eb650cfee7434994602c3a68b25d77ad9e68c8a6cd491616ef86661382eb3"
|
||||
|
||||
[[package]]
|
||||
name = "try-lock"
|
||||
version = "0.2.5"
|
||||
|
|
|
|||
|
|
@ -65,7 +65,7 @@ casbin = { version = "2.1.0" }
|
|||
dotenvy = "0.15.7"
|
||||
url = "2.5.0"
|
||||
brotli = "3.4.0"
|
||||
lru.workspace = true
|
||||
moka.workspace = true
|
||||
|
||||
# collab
|
||||
collab = { version = "0.1.0", features = ["async-plugin"] }
|
||||
|
|
@ -153,7 +153,7 @@ tracing = { version = "0.1"}
|
|||
collab-entity = { version = "0.1.0" }
|
||||
gotrue = { path = "libs/gotrue" }
|
||||
redis = "0.24.0"
|
||||
lru = "0.12.2"
|
||||
moka = { version = "0.12.5", features = ["future"] }
|
||||
|
||||
[profile.release]
|
||||
lto = true
|
||||
|
|
|
|||
|
|
@ -59,8 +59,6 @@ pub trait CollabStorageAccessControl: Send + Sync + 'static {
|
|||
pub trait CollabStorage: Send + Sync + 'static {
|
||||
fn config(&self) -> &WriteConfig;
|
||||
|
||||
async fn status(&self) -> String;
|
||||
|
||||
async fn cache_collab(&self, object_id: &str, collab: Weak<MutexCollab>);
|
||||
|
||||
async fn remove_collab_cache(&self, object_id: &str);
|
||||
|
|
@ -133,10 +131,6 @@ where
|
|||
self.as_ref().config()
|
||||
}
|
||||
|
||||
async fn status(&self) -> String {
|
||||
self.as_ref().status().await
|
||||
}
|
||||
|
||||
async fn cache_collab(&self, object_id: &str, collab: Weak<MutexCollab>) {
|
||||
self.as_ref().cache_collab(object_id, collab).await
|
||||
}
|
||||
|
|
|
|||
|
|
@ -300,7 +300,7 @@ pub struct Subscription {
|
|||
}
|
||||
|
||||
impl Subscription {
|
||||
pub async fn stop(mut self) {
|
||||
pub async fn stop(&mut self) {
|
||||
if let Some(sink_stop_tx) = self.sink_stop_tx.take() {
|
||||
let _ = sink_stop_tx.send(()).await;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -73,7 +73,7 @@ where
|
|||
pub async fn remove_user(&self, object_id: &str, user: &U) -> Result<(), Error> {
|
||||
let group_by_object_id = self.group_by_object_id.try_read()?;
|
||||
if let Some(group) = group_by_object_id.get(object_id) {
|
||||
if let Some(subscriber) = group.subscribers.try_write()?.remove(user) {
|
||||
if let Some(mut subscriber) = group.subscribers.try_write()?.remove(user) {
|
||||
trace!("Remove subscriber: {}", subscriber.origin);
|
||||
subscriber.stop().await;
|
||||
}
|
||||
|
|
@ -97,21 +97,30 @@ where
|
|||
|
||||
#[instrument(skip(self))]
|
||||
pub async fn remove_group(&self, object_id: &str) {
|
||||
match self.group_by_object_id.try_write() {
|
||||
Ok(mut group_by_object_id) => {
|
||||
match group_by_object_id.remove(object_id) {
|
||||
None => {
|
||||
// The group should be exist, but it's not. This is an unexpected situation.
|
||||
error!("Group for object_id:{} not found", object_id);
|
||||
},
|
||||
Some(group) => {
|
||||
group.flush_collab().await;
|
||||
},
|
||||
}
|
||||
self.storage.remove_collab_cache(object_id).await;
|
||||
let mut group_by_object_id = match self.group_by_object_id.try_write() {
|
||||
Ok(lock) => lock,
|
||||
Err(err) => {
|
||||
error!("Failed to acquire write lock to remove group: {:?}", err);
|
||||
return;
|
||||
},
|
||||
Err(err) => error!("Failed to acquire write lock to remove group: {:?}", err),
|
||||
};
|
||||
|
||||
if let Some(group) = group_by_object_id.remove(object_id) {
|
||||
group.flush_collab().await;
|
||||
|
||||
// As we've already removed the group, we directly operate on the removed group's subscribers.
|
||||
if let Ok(mut subscribers) = group.subscribers.try_write() {
|
||||
for (_, subscriber) in subscribers.iter_mut() {
|
||||
subscriber.stop().await;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Log error if the group doesn't exist
|
||||
error!("Group for object_id:{} not found", object_id);
|
||||
}
|
||||
drop(group_by_object_id);
|
||||
|
||||
self.storage.remove_collab_cache(object_id).await;
|
||||
}
|
||||
|
||||
pub async fn create_group_if_need(
|
||||
|
|
|
|||
|
|
@ -67,16 +67,11 @@ where
|
|||
let edit_collab_by_user = Arc::new(Mutex::new(HashMap::new()));
|
||||
|
||||
let weak_groups = Arc::downgrade(&groups);
|
||||
let weak_storage = Arc::downgrade(&storage);
|
||||
tokio::spawn(async move {
|
||||
let mut interval = interval(Duration::from_secs(60));
|
||||
let mut interval = interval(Duration::from_secs(30));
|
||||
loop {
|
||||
interval.tick().await;
|
||||
|
||||
if let Some(storage) = weak_storage.upgrade() {
|
||||
info!("{}", storage.status().await);
|
||||
}
|
||||
|
||||
match weak_groups.upgrade() {
|
||||
Some(groups) => {
|
||||
trace!(
|
||||
|
|
|
|||
|
|
@ -1,35 +1,55 @@
|
|||
use crate::state::RedisClient;
|
||||
use bytes::Bytes;
|
||||
use collab::core::collab_plugin::EncodedCollab;
|
||||
use lru::LruCache;
|
||||
use std::num::NonZeroUsize;
|
||||
use moka::future::Cache;
|
||||
use moka::notification::RemovalCause;
|
||||
use moka::policy::EvictionPolicy;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Mutex;
|
||||
use tracing::error;
|
||||
use tracing::{error, info};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct CollabMemCache {
|
||||
lru_cache: Arc<Mutex<LruCache<String, Vec<u8>>>>,
|
||||
cache: Arc<Cache<String, Bytes>>,
|
||||
#[allow(dead_code)]
|
||||
redis_client: Arc<Mutex<RedisClient>>,
|
||||
}
|
||||
|
||||
impl CollabMemCache {
|
||||
pub fn new(_redis_client: RedisClient) -> Self {
|
||||
let lru = LruCache::new(NonZeroUsize::new(3000).unwrap());
|
||||
pub fn new(redis_client: RedisClient) -> Self {
|
||||
let eviction_listener = |key, value: Bytes, cause| {
|
||||
if matches!(cause, RemovalCause::Expired | RemovalCause::Size) {
|
||||
info!(
|
||||
"Evicted key {}. value:{}, cause:{:?}",
|
||||
key,
|
||||
value.len(),
|
||||
cause
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
let cache = Cache::builder()
|
||||
.weigher(|_key, value: &Bytes| -> u32 {
|
||||
value.len() as u32
|
||||
})
|
||||
// This cache will hold up to 200MiB of values.
|
||||
.max_capacity(200 * 1024 * 1024)
|
||||
.eviction_policy(EvictionPolicy::tiny_lfu())
|
||||
.eviction_listener(eviction_listener)
|
||||
.build();
|
||||
Self {
|
||||
lru_cache: Arc::new(Mutex::new(lru)),
|
||||
cache: Arc::new(cache),
|
||||
redis_client: Arc::new(Mutex::new(redis_client)),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn len(&self) -> usize {
|
||||
self
|
||||
.lru_cache
|
||||
.try_lock()
|
||||
.map(|cache| cache.len())
|
||||
.unwrap_or(0)
|
||||
self.cache.entry_count() as usize
|
||||
}
|
||||
|
||||
pub async fn get_encoded_collab(&self, object_id: &str) -> Option<EncodedCollab> {
|
||||
let cache = self.lru_cache.lock().await.get(object_id)?.clone();
|
||||
tokio::task::spawn_blocking(move || match EncodedCollab::decode_from_bytes(&cache) {
|
||||
let bytes = self.cache.get(object_id).await?;
|
||||
tokio::task::spawn_blocking(move || match EncodedCollab::decode_from_bytes(&bytes) {
|
||||
Ok(encoded_collab) => Some(encoded_collab),
|
||||
Err(err) => {
|
||||
error!("Failed to decode collab from redis cache bytes: {:?}", err);
|
||||
|
|
@ -42,10 +62,10 @@ impl CollabMemCache {
|
|||
|
||||
pub fn cache_encoded_collab(&self, object_id: String, encoded_collab: &EncodedCollab) {
|
||||
let encoded_collab = encoded_collab.clone();
|
||||
let cache = self.lru_cache.clone();
|
||||
let cache = self.cache.clone();
|
||||
tokio::task::spawn_blocking(move || match encoded_collab.encode_to_bytes() {
|
||||
Ok(bytes) => {
|
||||
tokio::spawn(async move { cache.lock().await.put(object_id, bytes) });
|
||||
tokio::spawn(async move { cache.insert(object_id, Bytes::from(bytes)).await });
|
||||
},
|
||||
Err(e) => {
|
||||
error!("Failed to encode collab to bytes: {:?}", e);
|
||||
|
|
@ -53,8 +73,12 @@ impl CollabMemCache {
|
|||
});
|
||||
}
|
||||
|
||||
pub async fn remove_encoded_collab(&self, object_id: &str) {
|
||||
self.cache.invalidate(object_id).await;
|
||||
}
|
||||
|
||||
pub fn cache_encoded_collab_bytes(&self, object_id: String, bytes: Vec<u8>) {
|
||||
let cache = self.lru_cache.clone();
|
||||
tokio::spawn(async move { cache.lock().await.put(object_id, bytes) });
|
||||
let cache = self.cache.clone();
|
||||
tokio::spawn(async move { cache.insert(object_id, Bytes::from(bytes)).await });
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -84,10 +84,6 @@ where
|
|||
self.disk_cache.config()
|
||||
}
|
||||
|
||||
async fn status(&self) -> String {
|
||||
format!("cache collab: {}", self.mem_cache.len().await)
|
||||
}
|
||||
|
||||
async fn cache_collab(&self, object_id: &str, collab: Weak<MutexCollab>) {
|
||||
tracing::trace!("cache opened collab:{}", object_id);
|
||||
self
|
||||
|
|
@ -104,6 +100,7 @@ where
|
|||
.write()
|
||||
.await
|
||||
.remove(object_id);
|
||||
self.mem_cache.remove_encoded_collab(object_id).await;
|
||||
}
|
||||
|
||||
async fn upsert_collab(&self, uid: &i64, params: CreateCollabParams) -> DatabaseResult<()> {
|
||||
|
|
|
|||
Loading…
Reference in New Issue