From 5a06cb3278dd938b7e2c7b4ac7b16ed9a101ef41 Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Mon, 1 Apr 2024 09:39:02 +0800 Subject: [PATCH] feat: get collab retry (#434) * chore: retry when fetch collab doc state * chore: fix test * chore: disable log * chore: fix client api build * chore: log * chore: try invite and acccept * chore: add todo * chore: update ci --- .github/workflows/integration_test.yml | 12 +++-- libs/client-api-test-util/src/test_client.rs | 3 ++ .../client-api/src/collab_sync/collab_sink.rs | 3 +- libs/client-api/src/collab_sync/ping.rs | 17 +++++-- libs/client-api/src/http.rs | 25 +--------- libs/client-api/src/lib.rs | 32 ++++--------- libs/client-api/src/native/http_native.rs | 48 ++++++++++++++++++- libs/client-api/src/wasm/http_wasm.rs | 27 ++++++++++- src/biz/workspace/access_control.rs | 2 +- 9 files changed, 108 insertions(+), 61 deletions(-) diff --git a/.github/workflows/integration_test.yml b/.github/workflows/integration_test.yml index 48a2bfdf..89e3e1ef 100644 --- a/.github/workflows/integration_test.yml +++ b/.github/workflows/integration_test.yml @@ -3,8 +3,14 @@ name: AppFlowy-Cloud Integrations on: push: branches: [ main ] - pull_request_target: - types: [ opened, synchronize, reopened ] + pull_request: + branches: [ main ] + types: + - opened + - synchronize + - reopened + - unlocked + - ready_for_review concurrency: group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} @@ -65,7 +71,7 @@ jobs: - name: Run tests run: | - RUST_LOG="trace" DISABLE_CI_TEST_LOG="false" cargo test + RUST_LOG="info" DISABLE_CI_TEST_LOG="true" cargo test - name: Install Node.js uses: actions/setup-node@v2 diff --git a/libs/client-api-test-util/src/test_client.rs b/libs/client-api-test-util/src/test_client.rs index 9f3c7228..6abffeaf 100644 --- a/libs/client-api-test-util/src/test_client.rs +++ b/libs/client-api-test-util/src/test_client.rs @@ -156,6 +156,9 @@ impl TestClient { other_client: &TestClient, role: AFRole, ) { + // TODO(zack): replace with `invite_and_accepted_workspace_member`. Make sure running local test with `cargo run` + // and then all the local tasks should be passed. + // mark the create_workspace_members_handler with ` #[deprecated(note = "...")]` let member = CreateWorkspaceMember { email: other_client.email().await, role, diff --git a/libs/client-api/src/collab_sync/collab_sink.rs b/libs/client-api/src/collab_sync/collab_sink.rs index 4f57d0f7..7fe3f3ab 100644 --- a/libs/client-api/src/collab_sync/collab_sink.rs +++ b/libs/client-api/src/collab_sync/collab_sink.rs @@ -88,8 +88,7 @@ where let msg_id_counter = DefaultMsgIdCounter::new(); let notifier = Arc::new(notifier); let sender = Arc::new(Mutex::new(sink)); - let msg_queue = SinkQueue::new(); - let message_queue = Arc::new(parking_lot::Mutex::new(msg_queue)); + let message_queue = Arc::new(parking_lot::Mutex::new(SinkQueue::new())); let msg_id_counter = Arc::new(msg_id_counter); let flying_messages = Arc::new(parking_lot::Mutex::new(HashSet::new())); let pause_ping = Arc::new(AtomicBool::new(false)); diff --git a/libs/client-api/src/collab_sync/ping.rs b/libs/client-api/src/collab_sync/ping.rs index f7998c9d..c8c03b05 100644 --- a/libs/client-api/src/collab_sync/ping.rs +++ b/libs/client-api/src/collab_sync/ping.rs @@ -6,6 +6,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Weak}; use std::time::Duration; use tokio::sync::watch; +use tokio::time::{sleep_until, Instant}; use tracing::warn; pub struct PingSyncRunner; @@ -21,10 +22,15 @@ impl PingSyncRunner { sync_timestamp: Arc, ) { let duration = Duration::from_secs(10); + let mut next_tick = Instant::now() + duration; tokio::spawn(async move { - let mut interval = tokio::time::interval(duration); loop { - interval.tick().await; + sleep_until(next_tick).await; + + // Set the next tick to the current time plus the duration. + // Otherwise, it might spike the CPU usage. + next_tick = Instant::now() + duration; + match message_queue.upgrade() { None => break, Some(message_queue) => { @@ -37,7 +43,11 @@ impl PingSyncRunner { } if let Some(mut queue) = message_queue.try_lock() { - if !queue.is_empty() { + if queue.is_empty() { + // slow down the ping sync if there are messages in the queue. + next_tick = Instant::now() + Duration::from_secs(30); + } else { + // No need to send ping sync if there are messages in the queue. continue; } @@ -50,6 +60,7 @@ impl PingSyncRunner { let ping = ClientCollabMessage::ClientPingSync(ping); queue.push_msg(msg_id, ping); + // notify the sink to proceed next message if let Some(notify) = weak_notify.upgrade() { if let Err(err) = notify.send(SinkSignal::Proceed) { warn!("{} fail to send notify signal: {}", object_id, err); diff --git a/libs/client-api/src/http.rs b/libs/client-api/src/http.rs index a53ef80e..64ac6187 100644 --- a/libs/client-api/src/http.rs +++ b/libs/client-api/src/http.rs @@ -14,14 +14,12 @@ use database_entity::dto::{ AFUserWorkspaceInfo, AFWorkspace, AFWorkspaceInvitation, AFWorkspaceInvitationStatus, AFWorkspaceMember, AFWorkspaces, BatchQueryCollabParams, BatchQueryCollabResult, CollabMemberIdentify, CreateCollabParams, DeleteCollabParams, InsertCollabMemberParams, - QueryCollab, QueryCollabMembers, QueryCollabParams, QuerySnapshotParams, SnapshotData, - UpdateCollabMemberParams, + QueryCollab, QueryCollabMembers, QuerySnapshotParams, SnapshotData, UpdateCollabMemberParams, }; use futures_util::StreamExt; use gotrue::grant::PasswordGrant; use gotrue::grant::{Grant, RefreshTokenGrant}; -use collab_rt_entity::EncodedCollab; use gotrue::params::MagicLinkParams; use gotrue::params::{AdminUserParams, GenerateLinkParams}; use mime::Mime; @@ -918,27 +916,6 @@ impl Client { AppResponse::<()>::from_response(resp).await?.into_error() } - #[instrument(level = "debug", skip_all)] - pub async fn get_collab( - &self, - params: QueryCollabParams, - ) -> Result { - let url = format!( - "{}/api/workspace/{}/collab/{}", - self.base_url, ¶ms.workspace_id, ¶ms.object_id - ); - let resp = self - .http_client_with_auth(Method::GET, &url) - .await? - .json(¶ms) - .send() - .await?; - log_request_id(&resp); - AppResponse::::from_response(resp) - .await? - .into_data() - } - #[instrument(level = "debug", skip_all, err)] pub async fn batch_get_collab( &self, diff --git a/libs/client-api/src/lib.rs b/libs/client-api/src/lib.rs index abcf9d89..d4bded1f 100644 --- a/libs/client-api/src/lib.rs +++ b/libs/client-api/src/lib.rs @@ -1,36 +1,20 @@ mod http; pub use http::*; -macro_rules! if_native { - ($($item:item)*) => {$( - #[cfg(not(target_arch = "wasm32"))] - $item - )*} -} - -macro_rules! if_wasm { - ($($item:item)*) => {$( - #[cfg(target_arch = "wasm32")] - $item - )*} -} - #[cfg(feature = "collab-sync")] pub mod collab_sync; pub mod notify; -if_native! { - mod native; - #[allow(unused_imports)] - pub use native::*; -} +#[cfg(not(target_arch = "wasm32"))] +mod native; +#[cfg(not(target_arch = "wasm32"))] +pub use native::*; -if_wasm! { - mod wasm; - #[allow(unused_imports)] - pub use wasm::*; -} +#[cfg(target_arch = "wasm32")] +mod wasm; +#[cfg(target_arch = "wasm32")] +pub use wasm::*; pub mod ws; diff --git a/libs/client-api/src/native/http_native.rs b/libs/client-api/src/native/http_native.rs index e897da9f..1d6e6bff 100644 --- a/libs/client-api/src/native/http_native.rs +++ b/libs/client-api/src/native/http_native.rs @@ -3,10 +3,11 @@ use crate::ws::{WSClientHttpSender, WSError}; use crate::{spawn_blocking_brotli_compress, Client}; use crate::{RefreshTokenAction, RefreshTokenRetryCondition}; use anyhow::anyhow; -use app_error::AppError; +use app_error::{AppError, ErrorCode}; use async_trait::async_trait; +use collab_rt_entity::EncodedCollab; use collab_rt_entity::HttpRealtimeMessage; -use database_entity::dto::CollabParams; +use database_entity::dto::{CollabParams, QueryCollabParams}; use futures_util::stream; use prost::Message; use reqwest::{Body, Method}; @@ -19,6 +20,49 @@ use tokio_retry::RetryIf; use tracing::{event, instrument}; impl Client { + #[instrument(level = "debug", skip_all)] + pub async fn get_collab( + &self, + params: QueryCollabParams, + ) -> Result { + let url = format!( + "{}/api/workspace/{}/collab/{}", + self.base_url, ¶ms.workspace_id, ¶ms.object_id + ); + + let mut retries = 3; // Maximum number of retries + let retry_delay = Duration::from_secs(2); + while retries > 0 { + let resp = self + .http_client_with_auth(Method::GET, &url) + .await? + .json(¶ms) + .send() + .await?; + log_request_id(&resp); + let response = AppResponse::::from_response(resp).await?; + + // Retry if the record is not found + if response.code == ErrorCode::RecordNotFound { + retries -= 1; + if retries > 0 { + tokio::time::sleep(retry_delay).await; + continue; + } else { + return response.into_data(); + } + } else { + return response.into_data(); + } + } + + // this part is unreachable by logic + Err(AppResponseError::new( + ErrorCode::Unhandled, + "Exhausted retries to fetch collaboration data.", + )) + } + #[instrument(level = "debug", skip_all, err)] pub async fn post_realtime_msg( &self, diff --git a/libs/client-api/src/wasm/http_wasm.rs b/libs/client-api/src/wasm/http_wasm.rs index 56492c73..4f0af8b2 100644 --- a/libs/client-api/src/wasm/http_wasm.rs +++ b/libs/client-api/src/wasm/http_wasm.rs @@ -1,11 +1,14 @@ +use crate::http::log_request_id; use crate::ws::{WSClientHttpSender, WSError}; use crate::Client; use app_error::gotrue::GoTrueError; use app_error::ErrorCode; use async_trait::async_trait; -use database_entity::dto::CollabParams; +use collab_rt_entity::EncodedCollab; +use database_entity::dto::{CollabParams, QueryCollabParams}; use gotrue::grant::{Grant, RefreshTokenGrant}; -use shared_entity::response::AppResponseError; +use reqwest::Method; +use shared_entity::response::{AppResponse, AppResponseError}; use std::future::Future; use std::sync::atomic::Ordering; use tracing::instrument; @@ -23,6 +26,26 @@ impl Client { )) } + #[instrument(level = "debug", skip_all)] + pub async fn get_collab( + &self, + params: QueryCollabParams, + ) -> Result { + let url = format!( + "{}/api/workspace/{}/collab/{}", + self.base_url, ¶ms.workspace_id, ¶ms.object_id + ); + let resp = self + .http_client_with_auth(Method::GET, &url) + .await? + .json(¶ms) + .send() + .await?; + log_request_id(&resp); + let resp = AppResponse::::from_response(resp).await?; + resp.into_data() + } + #[instrument(level = "debug", skip_all, err)] pub async fn refresh_token(&self) -> Result<(), AppResponseError> { let (tx, rx) = tokio::sync::oneshot::channel(); diff --git a/src/biz/workspace/access_control.rs b/src/biz/workspace/access_control.rs index 6531ef42..aef292c9 100644 --- a/src/biz/workspace/access_control.rs +++ b/src/biz/workspace/access_control.rs @@ -77,7 +77,7 @@ where (Method::POST, AFRole::Owner), (Method::DELETE, AFRole::Owner), (Method::PUT, AFRole::Owner), - (Method::GET, AFRole::Owner), + (Method::GET, AFRole::Member), ] .into(), ),