diff --git a/services/appflowy-collaborate/src/collab/cache.rs b/services/appflowy-collaborate/src/collab/cache.rs index 66183e4e..cc3a1a5b 100644 --- a/services/appflowy-collaborate/src/collab/cache.rs +++ b/services/appflowy-collaborate/src/collab/cache.rs @@ -14,7 +14,7 @@ use database::collab::CollabMetadata; use database_entity::dto::{CollabParams, QueryCollab, QueryCollabResult}; use crate::collab::disk_cache::CollabDiskCache; -use crate::collab::mem_cache::CollabMemCache; +use crate::collab::mem_cache::{cache_exp_secs_from_collab_type, CollabMemCache}; use crate::state::RedisConnectionManager; #[derive(Clone)] @@ -82,6 +82,7 @@ impl CollabCache { // Retrieve from disk cache as fallback. After retrieval, the value is inserted into the memory cache. let object_id = query.object_id.clone(); + let expiration_secs = cache_exp_secs_from_collab_type(&query.collab_type); let encode_collab = self.disk_cache.get_collab_encoded_from_disk(query).await?; // spawn a task to insert the encoded collab into the memory cache @@ -90,7 +91,7 @@ impl CollabCache { let timestamp = chrono::Utc::now().timestamp(); tokio::spawn(async move { mem_cache - .insert_encode_collab(&object_id, cloned_encode_collab, timestamp) + .insert_encode_collab(&object_id, cloned_encode_collab, timestamp, expiration_secs) .await; }); Ok(encode_collab) @@ -159,6 +160,7 @@ impl CollabCache { &object_id, &encode_collab_data, chrono::Utc::now().timestamp(), + Some(cache_exp_secs_from_collab_type(¶ms.collab_type)), ) .await { @@ -201,7 +203,12 @@ impl CollabCache { let timestamp = chrono::Utc::now().timestamp(); self .mem_cache - .insert_encode_collab_data(¶ms.object_id, ¶ms.encoded_collab_v1, timestamp) + .insert_encode_collab_data( + ¶ms.object_id, + ¶ms.encoded_collab_v1, + timestamp, + Some(cache_exp_secs_from_collab_type(¶ms.collab_type)), + ) .await .map_err(|err| AppError::Internal(err.into()))?; Ok(()) diff --git a/services/appflowy-collaborate/src/collab/mem_cache.rs b/services/appflowy-collaborate/src/collab/mem_cache.rs index dee20268..13c194d4 100644 --- a/services/appflowy-collaborate/src/collab/mem_cache.rs +++ b/services/appflowy-collaborate/src/collab/mem_cache.rs @@ -1,5 +1,6 @@ use anyhow::anyhow; use collab::entity::EncodedCollab; +use collab_entity::CollabType; use redis::{pipe, AsyncCommands}; use tracing::{error, instrument, trace}; @@ -8,7 +9,7 @@ use crate::state::RedisConnectionManager; use app_error::AppError; use database::collab::CollabMetadata; -const SEVEN_DAYS: i64 = 604800; +const SEVEN_DAYS: u64 = 604800; const ONE_MONTH: u64 = 2592000; #[derive(Clone)] pub struct CollabMemCache { @@ -113,13 +114,14 @@ impl CollabMemCache { object_id: &str, encoded_collab: EncodedCollab, timestamp: i64, + expiration_seconds: u64, ) { trace!("Inserting encode collab into cache: {}", object_id); let result = tokio::task::spawn_blocking(move || encoded_collab.encode_to_bytes()).await; match result { Ok(Ok(bytes)) => { if let Err(err) = self - .insert_data_with_timestamp(object_id, &bytes, timestamp) + .insert_data_with_timestamp(object_id, &bytes, timestamp, Some(expiration_seconds)) .await { error!("Failed to cache encoded collab: {:?}", err); @@ -134,14 +136,17 @@ impl CollabMemCache { } } + /// Inserts data into Redis with a conditional timestamp. + /// if the expiration_seconds is None, the data will be expired after 7 days. pub async fn insert_encode_collab_data( &self, object_id: &str, data: &[u8], timestamp: i64, + expiration_seconds: Option, ) -> redis::RedisResult<()> { self - .insert_data_with_timestamp(object_id, data, timestamp) + .insert_data_with_timestamp(object_id, data, timestamp, expiration_seconds) .await } @@ -163,6 +168,7 @@ impl CollabMemCache { object_id: &str, data: &[u8], timestamp: i64, + expiration_seconds: Option, ) -> redis::RedisResult<()> { let cache_object_id = encode_collab_key(object_id); let mut conn = self.connection_manager.clone(); @@ -211,7 +217,7 @@ impl CollabMemCache { .atomic() .set(&cache_object_id, data) .ignore() - .expire(&cache_object_id, SEVEN_DAYS) // Setting the expiration to 7 days + .expire(&cache_object_id, expiration_seconds.unwrap_or(SEVEN_DAYS) as i64) // Setting the expiration to 7 days .ignore(); pipeline.query_async(&mut conn).await?; } @@ -286,3 +292,16 @@ fn encode_collab_key(object_id: &str) -> String { fn collab_meta_key(object_id: &str) -> String { format!("collab_meta_v0:{}", object_id) } + +#[inline] +pub fn cache_exp_secs_from_collab_type(collab_type: &CollabType) -> u64 { + match collab_type { + CollabType::Document => SEVEN_DAYS * 2, + CollabType::Database => SEVEN_DAYS * 2, + CollabType::WorkspaceDatabase => ONE_MONTH, + CollabType::Folder => SEVEN_DAYS, + CollabType::DatabaseRow => ONE_MONTH, + CollabType::UserAwareness => SEVEN_DAYS * 2, + CollabType::Unknown => SEVEN_DAYS, + } +} diff --git a/tests/ai_test/chat_test.rs b/tests/ai_test/chat_test.rs index daa28a48..95a14ebc 100644 --- a/tests/ai_test/chat_test.rs +++ b/tests/ai_test/chat_test.rs @@ -329,8 +329,11 @@ async fn create_chat_context_test() { .get_answer(&workspace_id, &chat_id, question.message_id) .await .unwrap(); - assert!(answer.content.contains("US")); println!("answer: {:?}", answer); + if answer.content.contains("United States") { + return; + } + assert!(answer.content.contains("US")); } // #[tokio::test] diff --git a/tests/collab/storage_test.rs b/tests/collab/storage_test.rs index 9c3d5583..d5bf1580 100644 --- a/tests/collab/storage_test.rs +++ b/tests/collab/storage_test.rs @@ -253,6 +253,7 @@ async fn collab_mem_cache_read_write_test() { &object_id, &encode_collab.encode_to_bytes().unwrap(), timestamp, + None, ) .await .unwrap(); @@ -275,6 +276,7 @@ async fn collab_mem_cache_insert_override_test() { &object_id, &encode_collab.encode_to_bytes().unwrap(), timestamp, + None, ) .await .unwrap(); @@ -289,6 +291,7 @@ async fn collab_mem_cache_insert_override_test() { .encode_to_bytes() .unwrap(), timestamp, + None, ) .await .unwrap(); @@ -308,6 +311,7 @@ async fn collab_mem_cache_insert_override_test() { .encode_to_bytes() .unwrap(), timestamp, + None, ) .await .unwrap();