From 3cea53cd13554f98a18b1d664e20e22697afae84 Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Fri, 11 Oct 2024 13:06:26 +0200 Subject: [PATCH] chore: create method for receiving the most recent snapshot --- ...b0d6e94b919597d709f930467423c5b4c0ec2.json | 65 +++++++++++++++++++ libs/database/src/collab/collab_db_ops.rs | 22 +++++++ libs/database/src/collab/collab_storage.rs | 17 +++++ .../src/collab/storage.rs | 11 ++++ .../src/group/group_init.rs | 45 ++++++++----- .../src/snapshot/snapshot_control.rs | 30 ++++++++- 6 files changed, 172 insertions(+), 18 deletions(-) create mode 100644 .sqlx/query-88516b9a2a424bc7697337d6f16b0d6e94b919597d709f930467423c5b4c0ec2.json diff --git a/.sqlx/query-88516b9a2a424bc7697337d6f16b0d6e94b919597d709f930467423c5b4c0ec2.json b/.sqlx/query-88516b9a2a424bc7697337d6f16b0d6e94b919597d709f930467423c5b4c0ec2.json new file mode 100644 index 00000000..e2d0093d --- /dev/null +++ b/.sqlx/query-88516b9a2a424bc7697337d6f16b0d6e94b919597d709f930467423c5b4c0ec2.json @@ -0,0 +1,65 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT * FROM af_collab_snapshot\n WHERE workspace_id = $1 AND oid = $2 AND deleted_at IS NULL\n ORDER BY created_at DESC\n LIMIT 1;\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "sid", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "oid", + "type_info": "Text" + }, + { + "ordinal": 2, + "name": "blob", + "type_info": "Bytea" + }, + { + "ordinal": 3, + "name": "len", + "type_info": "Int4" + }, + { + "ordinal": 4, + "name": "encrypt", + "type_info": "Int4" + }, + { + "ordinal": 5, + "name": "deleted_at", + "type_info": "Timestamptz" + }, + { + "ordinal": 6, + "name": "workspace_id", + "type_info": "Uuid" + }, + { + "ordinal": 7, + "name": "created_at", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Text" + ] + }, + "nullable": [ + false, + false, + false, + false, + true, + true, + false, + false + ] + }, + "hash": "88516b9a2a424bc7697337d6f16b0d6e94b919597d709f930467423c5b4c0ec2" +} diff --git a/libs/database/src/collab/collab_db_ops.rs b/libs/database/src/collab/collab_db_ops.rs index 7bdf84ed..719b84ce 100644 --- a/libs/database/src/collab/collab_db_ops.rs +++ b/libs/database/src/collab/collab_db_ops.rs @@ -551,6 +551,28 @@ pub async fn select_snapshot( Ok(row) } +#[inline] +pub async fn select_latest_snapshot( + pg_pool: &PgPool, + workspace_id: &Uuid, + object_id: &str, +) -> Result, Error> { + let row = sqlx::query_as!( + AFSnapshotRow, + r#" + SELECT * FROM af_collab_snapshot + WHERE workspace_id = $1 AND oid = $2 AND deleted_at IS NULL + ORDER BY created_at DESC + LIMIT 1; + "#, + workspace_id, + object_id + ) + .fetch_optional(pg_pool) + .await?; + Ok(row) +} + /// Returns list of snapshots for given object_id in descending order of creation time. pub async fn get_all_collab_snapshot_meta( pg_pool: &PgPool, diff --git a/libs/database/src/collab/collab_storage.rs b/libs/database/src/collab/collab_storage.rs index 2da6cddc..da811087 100644 --- a/libs/database/src/collab/collab_storage.rs +++ b/libs/database/src/collab/collab_storage.rs @@ -165,6 +165,12 @@ pub trait CollabStorage: Send + Sync + 'static { snapshot_id: &i64, ) -> AppResult; + async fn get_latest_snapshot( + &self, + workspace_id: &str, + object_id: &str, + ) -> AppResult>; + /// Returns list of snapshots for given object_id in descending order of creation time. async fn get_collab_snapshot_list(&self, oid: &str) -> AppResult; @@ -303,6 +309,17 @@ where .await } + async fn get_latest_snapshot( + &self, + workspace_id: &str, + object_id: &str, + ) -> AppResult> { + self + .as_ref() + .get_latest_snapshot(workspace_id, object_id) + .await + } + async fn get_collab_snapshot_list(&self, oid: &str) -> AppResult { self.as_ref().get_collab_snapshot_list(oid).await } diff --git a/services/appflowy-collaborate/src/collab/storage.rs b/services/appflowy-collaborate/src/collab/storage.rs index 3bae9ef2..d3283895 100644 --- a/services/appflowy-collaborate/src/collab/storage.rs +++ b/services/appflowy-collaborate/src/collab/storage.rs @@ -484,6 +484,17 @@ where .await } + async fn get_latest_snapshot( + &self, + workspace_id: &str, + object_id: &str, + ) -> AppResult> { + self + .snapshot_control + .get_latest_snapshot(workspace_id, object_id) + .await + } + async fn get_collab_snapshot_list(&self, oid: &str) -> AppResult { self.snapshot_control.get_collab_snapshot_list(oid).await } diff --git a/services/appflowy-collaborate/src/group/group_init.rs b/services/appflowy-collaborate/src/group/group_init.rs index 6e0c2b51..5ccfc57d 100644 --- a/services/appflowy-collaborate/src/group/group_init.rs +++ b/services/appflowy-collaborate/src/group/group_init.rs @@ -29,7 +29,7 @@ use collab_stream::stream_group::StreamGroup; use dashmap::DashMap; use database::collab::{CollabStorage, GetCollabOrigin}; use database_entity::dto::{ - AFCollabEmbeddings, CollabParams, InsertSnapshotParams, QueryCollabParams, + AFCollabEmbeddings, CollabParams, InsertSnapshotParams, QueryCollabParams, SnapshotData, }; use futures::{pin_mut, Sink, Stream}; use futures_util::{SinkExt, StreamExt}; @@ -1077,25 +1077,36 @@ impl CollabPersister { // if we want history-keeping variant, we need to get a snapshot let snapshot = self .storage - .get_collab_snapshot(&self.workspace_id, &self.object_id, &1) + .get_latest_snapshot(&self.workspace_id, &self.object_id) .await .map_err(|err| RealtimeError::Internal(err.into()))?; - let encoded_collab = EncodedCollab::decode_from_bytes(&snapshot.encoded_collab_v1) - .map_err(|err| RealtimeError::Internal(err.into()))?; - encoded_collab.doc_state + match snapshot { + None => None, + Some(snapshot) => { + let encoded_collab = EncodedCollab::decode_from_bytes(&snapshot.encoded_collab_v1) + .map_err(|err| RealtimeError::Internal(err.into()))?; + Some(encoded_collab.doc_state) + }, + } } else { - // if we want a lightweight variant, we need to get a collab - let params = QueryCollabParams::new( - self.object_id.clone(), - self.collab_type.clone(), - self.workspace_id.clone(), - ); - self - .storage - .get_encode_collab(GetCollabOrigin::Server, params, false) - .await - .map_err(|err| RealtimeError::Internal(err.into()))? - .doc_state + None // if we want a lightweight variant, we'll fallback to default + }; + let doc_state = match doc_state { + Some(doc_state) => doc_state, + None => { + // we didn't find a snapshot, or we want a lightweight collab version + let params = QueryCollabParams::new( + self.object_id.clone(), + self.collab_type.clone(), + self.workspace_id.clone(), + ); + self + .storage + .get_encode_collab(GetCollabOrigin::Server, params, false) + .await + .map_err(|err| RealtimeError::Internal(err.into()))? + .doc_state + }, }; let collab: Collab = Collab::new_with_source( diff --git a/services/appflowy-collaborate/src/snapshot/snapshot_control.rs b/services/appflowy-collaborate/src/snapshot/snapshot_control.rs index a63f9c58..ec16c823 100644 --- a/services/appflowy-collaborate/src/snapshot/snapshot_control.rs +++ b/services/appflowy-collaborate/src/snapshot/snapshot_control.rs @@ -10,13 +10,14 @@ use futures_util::StreamExt; use sqlx::PgPool; use tokio::time::interval; use tracing::{debug, error, trace, warn}; +use uuid::Uuid; use validator::Validate; use app_error::AppError; use collab_rt_protocol::spawn_blocking_validate_encode_collab; use database::collab::{ create_snapshot_and_maintain_limit, get_all_collab_snapshot_meta, latest_snapshot_time, - select_snapshot, AppResult, COLLAB_SNAPSHOT_LIMIT, SNAPSHOT_PER_HOUR, + select_latest_snapshot, select_snapshot, AppResult, COLLAB_SNAPSHOT_LIMIT, SNAPSHOT_PER_HOUR, }; use database_entity::dto::{AFSnapshotMeta, AFSnapshotMetas, InsertSnapshotParams, SnapshotData}; @@ -181,6 +182,33 @@ impl SnapshotControl { } } + pub async fn get_latest_snapshot( + &self, + workspace_id: &str, + object_id: &str, + ) -> Result, AppError> { + let key = SnapshotKey::from_object_id(object_id); + match self.cache.try_get(&key.0).await.unwrap_or(None) { + None => { + let wid = Uuid::parse_str(workspace_id)?; + let snapshot = select_latest_snapshot(&self.pg_pool, &wid, object_id).await?; + match snapshot { + None => Ok(None), + Some(row) => Ok(Some(SnapshotData { + object_id: row.oid, + encoded_collab_v1: row.blob, + workspace_id: row.workspace_id.to_string(), + })), + } + }, + Some(snapshot) => Ok(Some(SnapshotData { + encoded_collab_v1: snapshot, + workspace_id: workspace_id.to_string(), + object_id: object_id.to_string(), + })), + } + } + async fn latest_snapshot_time(&self, oid: &str) -> Result>, AppError> { let time = latest_snapshot_time(oid, &self.pg_pool).await?; Ok(time)