Merge branch 'main' into feat/delete-user

This commit is contained in:
Zack Fu Zi Xiang 2024-09-03 02:24:58 +08:00
commit 6acaf69add
No known key found for this signature in database
11 changed files with 81 additions and 60 deletions

View File

@ -53,6 +53,11 @@ pub trait CollabStorageAccessControl: Send + Sync + 'static {
) -> Result<bool, AppError>;
}
pub enum GetCollabOrigin {
User { uid: i64 },
Server,
}
/// Represents a storage mechanism for collaborations.
///
/// This trait provides asynchronous methods for CRUD operations related to collaborations.
@ -113,9 +118,9 @@ pub trait CollabStorage: Send + Sync + 'static {
/// * `Result<RawData>` - Returns the data of the collaboration if found, `Err` otherwise.
async fn get_encode_collab(
&self,
uid: &i64,
origin: GetCollabOrigin,
params: QueryCollabParams,
is_collab_init: bool,
from_editing_collab: bool,
) -> AppResult<EncodedCollab>;
async fn batch_get_collab(
@ -208,13 +213,13 @@ where
async fn get_encode_collab(
&self,
uid: &i64,
origin: GetCollabOrigin,
params: QueryCollabParams,
is_collab_init: bool,
from_editing_collab: bool,
) -> AppResult<EncodedCollab> {
self
.as_ref()
.get_encode_collab(uid, params, is_collab_init)
.get_encode_collab(origin, params, from_editing_collab)
.await
}

View File

@ -5,7 +5,9 @@ use pgvector::Vector;
use sqlx::{Error, Executor, Postgres, Transaction};
use uuid::Uuid;
use database_entity::dto::{AFCollabEmbeddingParams, IndexingStatus};
use database_entity::dto::{
AFCollabEmbeddingParams, IndexingStatus, QueryCollab, QueryCollabParams,
};
pub async fn get_index_status<'a, E>(
tx: E,
@ -143,3 +145,15 @@ pub struct CollabId {
pub workspace_id: Uuid,
pub object_id: String,
}
impl From<CollabId> for QueryCollabParams {
fn from(value: CollabId) -> Self {
QueryCollabParams {
workspace_id: value.workspace_id.to_string(),
inner: QueryCollab {
object_id: value.object_id,
collab_type: value.collab_type,
},
}
}
}

View File

@ -67,11 +67,7 @@ impl CollabCache {
}
}
pub async fn get_encode_collab(
&self,
uid: &i64,
query: QueryCollab,
) -> Result<EncodedCollab, AppError> {
pub async fn get_encode_collab(&self, query: QueryCollab) -> Result<EncodedCollab, AppError> {
self.total_attempts.fetch_add(1, Ordering::Relaxed);
// Attempt to retrieve encoded collab from memory cache, falling back to disk cache if necessary.
if let Some(encoded_collab) = self.mem_cache.get_encode_collab(&query.object_id).await {
@ -86,10 +82,7 @@ impl CollabCache {
// Retrieve from disk cache as fallback. After retrieval, the value is inserted into the memory cache.
let object_id = query.object_id.clone();
let encode_collab = self
.disk_cache
.get_collab_encoded_from_disk(uid, query)
.await?;
let encode_collab = self.disk_cache.get_collab_encoded_from_disk(query).await?;
// spawn a task to insert the encoded collab into the memory cache
let cloned_encode_collab = encode_collab.clone();
@ -180,13 +173,9 @@ impl CollabCache {
pub async fn get_encode_collab_from_disk(
&self,
uid: &i64,
query: QueryCollab,
) -> Result<EncodedCollab, AppError> {
let encode_collab = self
.disk_cache
.get_collab_encoded_from_disk(uid, query)
.await?;
let encode_collab = self.disk_cache.get_collab_encoded_from_disk(query).await?;
Ok(encode_collab)
}
@ -219,11 +208,11 @@ impl CollabCache {
}
pub fn query_state(&self) -> QueryState {
let successful_attempts = self.success_attempts.load(Ordering::Relaxed);
let success_attempts = self.success_attempts.load(Ordering::Relaxed);
let total_attempts = self.total_attempts.load(Ordering::Relaxed);
QueryState {
total_attempts,
success_attempts: successful_attempts,
success_attempts,
}
}
@ -249,6 +238,7 @@ impl CollabCache {
}
}
#[derive(Debug)]
pub struct QueryState {
pub total_attempts: u64,
pub success_attempts: u64,

View File

@ -73,7 +73,6 @@ impl CollabDiskCache {
#[instrument(level = "trace", skip_all)]
pub async fn get_collab_encoded_from_disk(
&self,
_uid: &i64,
query: QueryCollab,
) -> Result<EncodedCollab, AppError> {
event!(

View File

@ -100,7 +100,6 @@ impl StorageQueue {
priority: WritePriority,
) -> Result<(), AppError> {
trace!("queuing {} object to pending write queue", params.object_id,);
// TODO(nathan): compress the data before storing it in Redis
self
.collab_cache
.insert_encode_collab_data_in_mem(params)

View File

@ -17,7 +17,9 @@ use crate::collab::cache::CollabCache;
use crate::command::{CLCommandSender, CollaborationCommand};
use crate::shared_state::RealtimeSharedState;
use app_error::AppError;
use database::collab::{AppResult, CollabMetadata, CollabStorage, CollabStorageAccessControl};
use database::collab::{
AppResult, CollabMetadata, CollabStorage, CollabStorageAccessControl, GetCollabOrigin,
};
use database_entity::dto::{
AFAccessLevel, AFSnapshotMeta, AFSnapshotMetas, CollabParams, InsertSnapshotParams, QueryCollab,
QueryCollabParams, QueryCollabResult, SnapshotData,
@ -277,30 +279,34 @@ where
Ok(())
}
#[instrument(level = "trace", skip_all, fields(oid = %params.object_id, is_collab_init = %is_collab_init))]
#[instrument(level = "trace", skip_all, fields(oid = %params.object_id, from_editing_collab = %from_editing_collab))]
async fn get_encode_collab(
&self,
uid: &i64,
origin: GetCollabOrigin,
params: QueryCollabParams,
is_collab_init: bool,
from_editing_collab: bool,
) -> AppResult<EncodedCollab> {
params.validate()?;
match origin {
GetCollabOrigin::User { uid } => {
// Check if the user has enough permissions to access the collab
let can_read = self
.access_control
.enforce_read_collab(&params.workspace_id, &uid, &params.object_id)
.await?;
// Check if the user has enough permissions to access the collab
let can_read = self
.access_control
.enforce_read_collab(&params.workspace_id, uid, &params.object_id)
.await?;
if !can_read {
return Err(AppError::NotEnoughPermissions {
user: uid.to_string(),
action: format!("read collab:{}", params.object_id),
});
if !can_read {
return Err(AppError::NotEnoughPermissions {
user: uid.to_string(),
action: format!("read collab:{}", params.object_id),
});
}
},
GetCollabOrigin::Server => {},
}
// Early return if editing collab is initialized, as it indicates no need to query further.
if !is_collab_init {
if from_editing_collab {
// Attempt to retrieve encoded collab from the editing collab
if let Some(value) = self.get_encode_collab_from_editing(&params.object_id).await {
trace!(
@ -311,7 +317,7 @@ where
}
}
let encode_collab = self.cache.get_encode_collab(uid, params.inner).await?;
let encode_collab = self.cache.get_encode_collab(params.inner).await?;
Ok(encode_collab)
}

View File

@ -16,7 +16,7 @@ use collab_rt_entity::CollabMessage;
use collab_stream::client::{CollabRedisStream, CONTROL_STREAM_KEY};
use collab_stream::model::CollabControlEvent;
use collab_stream::stream_group::StreamGroup;
use database::collab::CollabStorage;
use database::collab::{CollabStorage, GetCollabOrigin};
use database_entity::dto::QueryCollabParams;
use crate::client::client_msg_router::ClientMessageRouter;
@ -271,7 +271,7 @@ where
S: CollabStorage,
{
let encode_collab = storage
.get_encode_collab(&uid, params.clone(), true)
.get_encode_collab(GetCollabOrigin::User { uid }, params.clone(), false)
.await?;
let result = Collab::new_with_source(
CollabOrigin::Server,

View File

@ -19,7 +19,7 @@ use crate::config::get_env_var;
use crate::indexer::DocumentIndexer;
use app_error::AppError;
use appflowy_ai_client::client::AppFlowyAIClient;
use database::collab::select_blob_from_af_collab;
use database::collab::{CollabStorage, GetCollabOrigin};
use database::index::{get_collabs_without_embeddings, upsert_collab_embeddings};
use database::workspace::select_workspace_settings;
use database_entity::dto::{AFCollabEmbeddingParams, AFCollabEmbeddings, CollabParams};
@ -93,20 +93,22 @@ impl IndexerProvider {
fn get_unindexed_collabs(
&self,
storage: Arc<dyn CollabStorage>,
) -> Pin<Box<dyn Stream<Item = Result<UnindexedCollab, anyhow::Error>> + Send>> {
let db = self.db.clone();
Box::pin(try_stream! {
let collabs = get_collabs_without_embeddings(&db).await?;
if !collabs.is_empty() {
tracing::trace!("found {} unindexed collabs", collabs.len());
}
for cid in collabs {
match &cid.collab_type {
CollabType::Document => {
let collab =
select_blob_from_af_collab(&db, &CollabType::Document, &cid.object_id).await?;
let collab = EncodedCollab::decode_from_bytes(&collab)?;
let collab = storage
.get_encode_collab(GetCollabOrigin::Server, cid.clone().into(), false)
.await?;
yield UnindexedCollab {
workspace_id: cid.workspace_id,
object_id: cid.object_id,
@ -125,8 +127,8 @@ impl IndexerProvider {
})
}
pub async fn handle_unindexed_collabs(indexer: Arc<Self>) {
let mut stream = indexer.get_unindexed_collabs();
pub async fn handle_unindexed_collabs(indexer: Arc<Self>, storage: Arc<dyn CollabStorage>) {
let mut stream = indexer.get_unindexed_collabs(storage);
while let Some(result) = stream.next().await {
match result {
Ok(collab) => {

View File

@ -93,7 +93,7 @@ where
spawn_metrics(metrics.clone(), storage.clone());
spawn_handle_unindexed_collabs(indexer_provider);
spawn_handle_unindexed_collabs(indexer_provider, storage.clone());
Ok(Self {
storage,
@ -271,8 +271,14 @@ where
}
}
fn spawn_handle_unindexed_collabs(indexer_provider: Arc<IndexerProvider>) {
tokio::spawn(IndexerProvider::handle_unindexed_collabs(indexer_provider));
fn spawn_handle_unindexed_collabs(
indexer_provider: Arc<IndexerProvider>,
storage: Arc<dyn CollabStorage>,
) {
tokio::spawn(IndexerProvider::handle_unindexed_collabs(
indexer_provider,
storage,
));
}
fn spawn_period_check_inactive_group<S, AC>(

View File

@ -24,7 +24,7 @@ use authentication::jwt::{OptionalUserUuid, UserUuid};
use collab_rt_entity::realtime_proto::HttpRealtimeMessage;
use collab_rt_entity::RealtimeMessage;
use collab_rt_protocol::validate_encode_collab;
use database::collab::CollabStorage;
use database::collab::{CollabStorage, GetCollabOrigin};
use database::user::select_uid_from_email;
use database_entity::dto::*;
use shared_entity::dto::workspace_dto::*;
@ -764,7 +764,7 @@ async fn get_collab_handler(
let object_id = params.object_id.clone();
let encode_collab = state
.collab_access_control_storage
.get_encode_collab(&uid, params, false)
.get_encode_collab(GetCollabOrigin::User { uid }, params, true)
.await
.map_err(AppResponseError::from)?;
@ -800,7 +800,7 @@ async fn v1_get_collab_handler(
let encode_collab = state
.collab_access_control_storage
.get_encode_collab(&uid, param, false)
.get_encode_collab(GetCollabOrigin::User { uid }, param, true)
.await
.map_err(AppResponseError::from)?;
@ -845,9 +845,9 @@ async fn create_collab_snapshot_handler(
let encoded_collab_v1 = state
.collab_access_control_storage
.get_encode_collab(
&uid,
GetCollabOrigin::User { uid },
QueryCollabParams::new(&object_id, collab_type.clone(), &workspace_id),
false,
true,
)
.await?
.encode_to_bytes()

View File

@ -491,7 +491,7 @@ async fn simulate_small_data_set_write(pool: PgPool) {
collab_type: params.collab_type.clone(),
};
let encode_collab_from_disk = collab_cache
.get_encode_collab_from_disk(&user.uid, query)
.get_encode_collab_from_disk(query)
.await
.unwrap();
@ -558,7 +558,7 @@ async fn simulate_large_data_set_write(pool: PgPool) {
collab_type: params.collab_type.clone(),
};
let encode_collab_from_disk = collab_cache
.get_encode_collab_from_disk(&user.uid, query)
.get_encode_collab_from_disk(query)
.await
.unwrap();
assert_eq!(