diff --git a/libs/client-api/src/http.rs b/libs/client-api/src/http.rs index 8be9fe66..4d5e7018 100644 --- a/libs/client-api/src/http.rs +++ b/libs/client-api/src/http.rs @@ -819,7 +819,7 @@ impl Client { AppResponse::<()>::from_response(resp).await?.into_error() } - #[instrument(level = "debug", skip_all, err)] + #[instrument(level = "debug", skip_all)] pub async fn get_collab( &self, params: QueryCollabParams, diff --git a/libs/client-api/src/ws/client.rs b/libs/client-api/src/ws/client.rs index d0b8367e..792a62a0 100644 --- a/libs/client-api/src/ws/client.rs +++ b/libs/client-api/src/ws/client.rs @@ -310,6 +310,10 @@ impl WSClient { self.sender.clone() } + pub fn get_state(&self) -> ConnectState { + self.state_notify.lock().state.clone() + } + async fn set_state(&self, state: ConnectState) { self.state_notify.lock().set_state(state); } diff --git a/libs/database/src/collab/collab_db_ops.rs b/libs/database/src/collab/collab_db_ops.rs index fb0bf53c..0ac83c8e 100644 --- a/libs/database/src/collab/collab_db_ops.rs +++ b/libs/database/src/collab/collab_db_ops.rs @@ -157,8 +157,8 @@ pub async fn insert_into_af_collab( tracing::Level::TRACE, "did insert new collab row: {}:{}:{}", uid, + workspace_id, params.object_id, - workspace_id ); }, } diff --git a/libs/database/src/collab/collab_storage.rs b/libs/database/src/collab/collab_storage.rs index b852b761..8793f8d4 100644 --- a/libs/database/src/collab/collab_storage.rs +++ b/libs/database/src/collab/collab_storage.rs @@ -84,6 +84,7 @@ pub trait CollabStorage: Send + Sync + 'static { /// # Arguments /// /// * `params` - The parameters required to query a collab object. + /// * `force_from_disk` - If `true`, the data will be retrieved from the disk instead of the cache. /// /// # Returns /// @@ -92,6 +93,7 @@ pub trait CollabStorage: Send + Sync + 'static { &self, uid: &i64, params: QueryCollabParams, + force_from_disk: bool, ) -> DatabaseResult; async fn batch_get_collab( @@ -152,8 +154,12 @@ where &self, uid: &i64, params: QueryCollabParams, + force_from_disk: bool, ) -> DatabaseResult { - self.as_ref().get_collab_encoded_v1(uid, params).await + self + .as_ref() + .get_collab_encoded_v1(uid, params, force_from_disk) + .await } async fn batch_get_collab( @@ -253,6 +259,7 @@ impl CollabStorage for CollabStoragePgImpl { &self, _uid: &i64, params: QueryCollabParams, + _force_from_disk: bool, ) -> DatabaseResult { match collab_db_ops::select_blob_from_af_collab( &self.pg_pool, diff --git a/libs/realtime/src/collaborate/plugin.rs b/libs/realtime/src/collaborate/plugin.rs index cfcd705e..df05d481 100644 --- a/libs/realtime/src/collaborate/plugin.rs +++ b/libs/realtime/src/collaborate/plugin.rs @@ -173,7 +173,11 @@ where async fn init(&self, object_id: &str, _origin: &CollabOrigin, doc: &Doc) { let params = QueryCollabParams::new(object_id, self.collab_type.clone(), &self.workspace_id); - match self.storage.get_collab_encoded_v1(&self.uid, params).await { + match self + .storage + .get_collab_encoded_v1(&self.uid, params, true) + .await + { Ok(encoded_collab_v1) => match init_collab_with_raw_data(object_id, &encoded_collab_v1, doc) .await { diff --git a/src/api/workspace.rs b/src/api/workspace.rs index fd421eaa..117e5357 100644 --- a/src/api/workspace.rs +++ b/src/api/workspace.rs @@ -287,7 +287,7 @@ async fn get_collab_handler( .map_err(AppResponseError::from)?; let data = state .collab_storage - .get_collab_encoded_v1(&uid, payload.into_inner()) + .get_collab_encoded_v1(&uid, payload.into_inner(), false) .await .map_err(AppResponseError::from)?; @@ -325,6 +325,7 @@ async fn create_collab_snapshot_handler( .get_collab_encoded_v1( &uid, QueryCollabParams::new(&object_id, collab_type, &workspace_id), + false, ) .await? .encode_to_bytes() diff --git a/src/biz/casbin/access_control.rs b/src/biz/casbin/access_control.rs index 53b247f7..fc527f92 100644 --- a/src/biz/casbin/access_control.rs +++ b/src/biz/casbin/access_control.rs @@ -291,7 +291,7 @@ impl CollabAccessControl for CasbinCollabAccessControl { access_level.ok_or(AppError::RecordNotFound(format!( "collab:{} does not exist or user:{} is not a member", - uid, oid + oid, uid ))) } diff --git a/src/biz/casbin/enforcer_ext.rs b/src/biz/casbin/enforcer_ext.rs index 72d981a8..55630948 100644 --- a/src/biz/casbin/enforcer_ext.rs +++ b/src/biz/casbin/enforcer_ext.rs @@ -35,6 +35,7 @@ pub(crate) async fn enforcer_update( }?; let mut enforcer = enforcer.write().await; + // TODO(jireh): if the policy already exists and doesn't need to be updated, return early. enforcer_remove(&mut enforcer, uid, obj).await?; event!( tracing::Level::INFO, diff --git a/src/biz/collab/ops.rs b/src/biz/collab/ops.rs index da764237..f49826fe 100644 --- a/src/biz/collab/ops.rs +++ b/src/biz/collab/ops.rs @@ -24,15 +24,23 @@ where C: CollabAccessControl, { for params in params_list { - if !params.override_if_exist - && database::collab::collab_exists(pg_pool, ¶ms.object_id).await? - { - // When calling this function, the caller should have already checked if the collab exists. - return Err(AppError::RecordAlreadyExists(format!( - "Collab with object_id {} already exists", - params.object_id - ))); + if database::collab::collab_exists(pg_pool, ¶ms.object_id).await? { + if params.override_if_exist { + event!( + tracing::Level::INFO, + "Collab:{} with object_id {} already exists, override it", + params.collab_type, + params.object_id + ); + } else { + // When calling this function, the caller should have already checked if the collab exists. + return Err(AppError::RecordAlreadyExists(format!( + "Collab:{} with object_id {} already exists", + params.collab_type, params.object_id + ))); + } } + collab_access_control .cache_collab_access_level( CollabUserId::UserUuid(user_uuid), diff --git a/src/biz/collab/storage.rs b/src/biz/collab/storage.rs index ed72b208..535b267a 100644 --- a/src/biz/collab/storage.rs +++ b/src/biz/collab/storage.rs @@ -146,6 +146,7 @@ where &self, uid: &i64, params: QueryCollabParams, + force_from_disk: bool, ) -> DatabaseResult { params.validate()?; self @@ -153,12 +154,16 @@ where .get_collab_access_level(uid, ¶ms.object_id) .await?; - let collab = self - .collab_by_object_id - .read() - .await - .get(¶ms.object_id) - .and_then(|collab| collab.upgrade()); + let collab = if force_from_disk { + None + } else { + self + .collab_by_object_id + .read() + .await + .get(¶ms.object_id) + .and_then(|collab| collab.upgrade()) + }; match collab { None => { @@ -167,7 +172,7 @@ where "Get collab data:{} from disk", params.object_id ); - self.inner.get_collab_encoded_v1(uid, params).await + self.inner.get_collab_encoded_v1(uid, params, true).await }, Some(collab) => { event!(