feat: get collab retry (#434)

* chore: retry when fetch collab doc state

* chore: fix test

* chore: disable log

* chore: fix client api build

* chore: log

* chore: try invite and acccept

* chore: add todo

* chore: update ci
This commit is contained in:
Nathan.fooo 2024-04-01 09:39:02 +08:00 committed by GitHub
parent 6c96d05e2f
commit 5a06cb3278
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 108 additions and 61 deletions

View File

@ -3,8 +3,14 @@ name: AppFlowy-Cloud Integrations
on:
push:
branches: [ main ]
pull_request_target:
types: [ opened, synchronize, reopened ]
pull_request:
branches: [ main ]
types:
- opened
- synchronize
- reopened
- unlocked
- ready_for_review
concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
@ -65,7 +71,7 @@ jobs:
- name: Run tests
run: |
RUST_LOG="trace" DISABLE_CI_TEST_LOG="false" cargo test
RUST_LOG="info" DISABLE_CI_TEST_LOG="true" cargo test
- name: Install Node.js
uses: actions/setup-node@v2

View File

@ -156,6 +156,9 @@ impl TestClient {
other_client: &TestClient,
role: AFRole,
) {
// TODO(zack): replace with `invite_and_accepted_workspace_member`. Make sure running local test with `cargo run`
// and then all the local tasks should be passed.
// mark the create_workspace_members_handler with ` #[deprecated(note = "...")]`
let member = CreateWorkspaceMember {
email: other_client.email().await,
role,

View File

@ -88,8 +88,7 @@ where
let msg_id_counter = DefaultMsgIdCounter::new();
let notifier = Arc::new(notifier);
let sender = Arc::new(Mutex::new(sink));
let msg_queue = SinkQueue::new();
let message_queue = Arc::new(parking_lot::Mutex::new(msg_queue));
let message_queue = Arc::new(parking_lot::Mutex::new(SinkQueue::new()));
let msg_id_counter = Arc::new(msg_id_counter);
let flying_messages = Arc::new(parking_lot::Mutex::new(HashSet::new()));
let pause_ping = Arc::new(AtomicBool::new(false));

View File

@ -6,6 +6,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Weak};
use std::time::Duration;
use tokio::sync::watch;
use tokio::time::{sleep_until, Instant};
use tracing::warn;
pub struct PingSyncRunner;
@ -21,10 +22,15 @@ impl PingSyncRunner {
sync_timestamp: Arc<SyncTimestamp>,
) {
let duration = Duration::from_secs(10);
let mut next_tick = Instant::now() + duration;
tokio::spawn(async move {
let mut interval = tokio::time::interval(duration);
loop {
interval.tick().await;
sleep_until(next_tick).await;
// Set the next tick to the current time plus the duration.
// Otherwise, it might spike the CPU usage.
next_tick = Instant::now() + duration;
match message_queue.upgrade() {
None => break,
Some(message_queue) => {
@ -37,7 +43,11 @@ impl PingSyncRunner {
}
if let Some(mut queue) = message_queue.try_lock() {
if !queue.is_empty() {
if queue.is_empty() {
// slow down the ping sync if there are messages in the queue.
next_tick = Instant::now() + Duration::from_secs(30);
} else {
// No need to send ping sync if there are messages in the queue.
continue;
}
@ -50,6 +60,7 @@ impl PingSyncRunner {
let ping = ClientCollabMessage::ClientPingSync(ping);
queue.push_msg(msg_id, ping);
// notify the sink to proceed next message
if let Some(notify) = weak_notify.upgrade() {
if let Err(err) = notify.send(SinkSignal::Proceed) {
warn!("{} fail to send notify signal: {}", object_id, err);

View File

@ -14,14 +14,12 @@ use database_entity::dto::{
AFUserWorkspaceInfo, AFWorkspace, AFWorkspaceInvitation, AFWorkspaceInvitationStatus,
AFWorkspaceMember, AFWorkspaces, BatchQueryCollabParams, BatchQueryCollabResult,
CollabMemberIdentify, CreateCollabParams, DeleteCollabParams, InsertCollabMemberParams,
QueryCollab, QueryCollabMembers, QueryCollabParams, QuerySnapshotParams, SnapshotData,
UpdateCollabMemberParams,
QueryCollab, QueryCollabMembers, QuerySnapshotParams, SnapshotData, UpdateCollabMemberParams,
};
use futures_util::StreamExt;
use gotrue::grant::PasswordGrant;
use gotrue::grant::{Grant, RefreshTokenGrant};
use collab_rt_entity::EncodedCollab;
use gotrue::params::MagicLinkParams;
use gotrue::params::{AdminUserParams, GenerateLinkParams};
use mime::Mime;
@ -918,27 +916,6 @@ impl Client {
AppResponse::<()>::from_response(resp).await?.into_error()
}
#[instrument(level = "debug", skip_all)]
pub async fn get_collab(
&self,
params: QueryCollabParams,
) -> Result<EncodedCollab, AppResponseError> {
let url = format!(
"{}/api/workspace/{}/collab/{}",
self.base_url, &params.workspace_id, &params.object_id
);
let resp = self
.http_client_with_auth(Method::GET, &url)
.await?
.json(&params)
.send()
.await?;
log_request_id(&resp);
AppResponse::<EncodedCollab>::from_response(resp)
.await?
.into_data()
}
#[instrument(level = "debug", skip_all, err)]
pub async fn batch_get_collab(
&self,

View File

@ -1,36 +1,20 @@
mod http;
pub use http::*;
macro_rules! if_native {
($($item:item)*) => {$(
#[cfg(not(target_arch = "wasm32"))]
$item
)*}
}
macro_rules! if_wasm {
($($item:item)*) => {$(
#[cfg(target_arch = "wasm32")]
$item
)*}
}
#[cfg(feature = "collab-sync")]
pub mod collab_sync;
pub mod notify;
if_native! {
mod native;
#[allow(unused_imports)]
pub use native::*;
}
#[cfg(not(target_arch = "wasm32"))]
mod native;
#[cfg(not(target_arch = "wasm32"))]
pub use native::*;
if_wasm! {
mod wasm;
#[allow(unused_imports)]
pub use wasm::*;
}
#[cfg(target_arch = "wasm32")]
mod wasm;
#[cfg(target_arch = "wasm32")]
pub use wasm::*;
pub mod ws;

View File

@ -3,10 +3,11 @@ use crate::ws::{WSClientHttpSender, WSError};
use crate::{spawn_blocking_brotli_compress, Client};
use crate::{RefreshTokenAction, RefreshTokenRetryCondition};
use anyhow::anyhow;
use app_error::AppError;
use app_error::{AppError, ErrorCode};
use async_trait::async_trait;
use collab_rt_entity::EncodedCollab;
use collab_rt_entity::HttpRealtimeMessage;
use database_entity::dto::CollabParams;
use database_entity::dto::{CollabParams, QueryCollabParams};
use futures_util::stream;
use prost::Message;
use reqwest::{Body, Method};
@ -19,6 +20,49 @@ use tokio_retry::RetryIf;
use tracing::{event, instrument};
impl Client {
#[instrument(level = "debug", skip_all)]
pub async fn get_collab(
&self,
params: QueryCollabParams,
) -> Result<EncodedCollab, AppResponseError> {
let url = format!(
"{}/api/workspace/{}/collab/{}",
self.base_url, &params.workspace_id, &params.object_id
);
let mut retries = 3; // Maximum number of retries
let retry_delay = Duration::from_secs(2);
while retries > 0 {
let resp = self
.http_client_with_auth(Method::GET, &url)
.await?
.json(&params)
.send()
.await?;
log_request_id(&resp);
let response = AppResponse::<EncodedCollab>::from_response(resp).await?;
// Retry if the record is not found
if response.code == ErrorCode::RecordNotFound {
retries -= 1;
if retries > 0 {
tokio::time::sleep(retry_delay).await;
continue;
} else {
return response.into_data();
}
} else {
return response.into_data();
}
}
// this part is unreachable by logic
Err(AppResponseError::new(
ErrorCode::Unhandled,
"Exhausted retries to fetch collaboration data.",
))
}
#[instrument(level = "debug", skip_all, err)]
pub async fn post_realtime_msg(
&self,

View File

@ -1,11 +1,14 @@
use crate::http::log_request_id;
use crate::ws::{WSClientHttpSender, WSError};
use crate::Client;
use app_error::gotrue::GoTrueError;
use app_error::ErrorCode;
use async_trait::async_trait;
use database_entity::dto::CollabParams;
use collab_rt_entity::EncodedCollab;
use database_entity::dto::{CollabParams, QueryCollabParams};
use gotrue::grant::{Grant, RefreshTokenGrant};
use shared_entity::response::AppResponseError;
use reqwest::Method;
use shared_entity::response::{AppResponse, AppResponseError};
use std::future::Future;
use std::sync::atomic::Ordering;
use tracing::instrument;
@ -23,6 +26,26 @@ impl Client {
))
}
#[instrument(level = "debug", skip_all)]
pub async fn get_collab(
&self,
params: QueryCollabParams,
) -> Result<EncodedCollab, AppResponseError> {
let url = format!(
"{}/api/workspace/{}/collab/{}",
self.base_url, &params.workspace_id, &params.object_id
);
let resp = self
.http_client_with_auth(Method::GET, &url)
.await?
.json(&params)
.send()
.await?;
log_request_id(&resp);
let resp = AppResponse::<EncodedCollab>::from_response(resp).await?;
resp.into_data()
}
#[instrument(level = "debug", skip_all, err)]
pub async fn refresh_token(&self) -> Result<(), AppResponseError> {
let (tx, rx) = tokio::sync::oneshot::channel();

View File

@ -77,7 +77,7 @@ where
(Method::POST, AFRole::Owner),
(Method::DELETE, AFRole::Owner),
(Method::PUT, AFRole::Owner),
(Method::GET, AFRole::Owner),
(Method::GET, AFRole::Member),
]
.into(),
),