diff --git a/services/appflowy-collaborate/src/group/group_init.rs b/services/appflowy-collaborate/src/group/group_init.rs index 0ddeb5c6..ec98dd37 100644 --- a/services/appflowy-collaborate/src/group/group_init.rs +++ b/services/appflowy-collaborate/src/group/group_init.rs @@ -78,15 +78,16 @@ impl CollabGroup { collab_type: CollabType, metrics: Arc, storage: Arc, - is_new_collab: bool, collab_redis_stream: Arc, persistence_interval: Duration, prune_grace_period: Duration, indexer: Option>, + state_vector: StateVector, ) -> Result where S: CollabStorage, { + let is_new_collab = state_vector.is_empty(); let persister = CollabPersister::new( uid, workspace_id.clone(), @@ -109,7 +110,7 @@ impl CollabGroup { persister, last_activity: ArcSwap::new(Instant::now().into()), seq_no: AtomicU32::new(0), - state_vector: Default::default(), + state_vector: state_vector.into(), }); /* diff --git a/services/appflowy-collaborate/src/group/manager.rs b/services/appflowy-collaborate/src/group/manager.rs index 01f4d75d..4699cd54 100644 --- a/services/appflowy-collaborate/src/group/manager.rs +++ b/services/appflowy-collaborate/src/group/manager.rs @@ -1,23 +1,23 @@ use std::sync::Arc; use std::time::Duration; +use access_control::collab::RealtimeAccessControl; +use app_error::AppError; use collab::core::collab::DataSource; use collab::core::origin::CollabOrigin; use collab::entity::EncodedCollab; use collab::preclude::Collab; use collab_entity::CollabType; -use tracing::{instrument, trace}; - -use access_control::collab::RealtimeAccessControl; -use app_error::AppError; use collab_rt_entity::user::RealtimeUser; use collab_rt_entity::CollabMessage; use collab_stream::client::CollabRedisStream; use database::collab::{CollabStorage, GetCollabOrigin}; use database_entity::dto::QueryCollabParams; +use tracing::{instrument, trace}; +use yrs::{ReadTxn, StateVector}; use crate::client::client_msg_router::ClientMessageRouter; -use crate::error::{CreateGroupFailedReason, RealtimeError}; +use crate::error::RealtimeError; use crate::group::group_init::CollabGroup; use crate::group::state::GroupManagementState; use crate::indexer::IndexerProvider; @@ -123,8 +123,24 @@ where object_id: &str, collab_type: CollabType, ) -> Result<(), RealtimeError> { - let mut is_new_collab = false; let params = QueryCollabParams::new(object_id, collab_type.clone(), workspace_id); + let res = self + .storage + .get_encode_collab(GetCollabOrigin::Server, params, false) + .await; + let state_vector = match res { + Ok(collab) => Collab::new_with_source( + CollabOrigin::Server, + object_id, + DataSource::DocStateV1(collab.doc_state.into()), + vec![], + false, + )? + .transact() + .state_vector(), + Err(err) if err.is_record_not_found() => StateVector::default(), + Err(err) => return Err(RealtimeError::Internal(err.into())), + }; trace!( "[realtime]: create group: uid:{},workspace_id:{},object_id:{}:{}", @@ -153,11 +169,11 @@ where collab_type, self.metrics_calculate.clone(), self.storage.clone(), - is_new_collab, self.collab_redis_stream.clone(), self.persistence_interval, self.prune_grace_period, indexer, + state_vector, ) .await?, ); diff --git a/services/appflowy-collaborate/src/snapshot/snapshot_control.rs b/services/appflowy-collaborate/src/snapshot/snapshot_control.rs index 38ecbce4..9f8c71e6 100644 --- a/services/appflowy-collaborate/src/snapshot/snapshot_control.rs +++ b/services/appflowy-collaborate/src/snapshot/snapshot_control.rs @@ -6,7 +6,6 @@ use collab::entity::{EncodedCollab, EncoderVersion}; use collab_entity::CollabType; use sqlx::PgPool; use tracing::{debug, error, trace, warn}; -use uuid::Uuid; use validator::Validate; use app_error::AppError;