diff --git a/libs/client-api/src/collab_sync/collab_sink.rs b/libs/client-api/src/collab_sync/collab_sink.rs index 0dfde727..934a85b2 100644 --- a/libs/client-api/src/collab_sync/collab_sink.rs +++ b/libs/client-api/src/collab_sync/collab_sink.rs @@ -1,8 +1,9 @@ use crate::af_spawn; use crate::collab_sync::collab_stream::SeqNumCounter; -use crate::collab_sync::ping::PingSyncRunner; + use crate::collab_sync::sink_queue::{QueueItem, SinkQueue}; use crate::collab_sync::{SinkConfig, SyncError, SyncObject}; + use collab::core::origin::{CollabClient, CollabOrigin}; use collab_rt_entity::{ClientCollabMessage, MsgId, ServerCollabMessage, SinkMessage}; use futures_util::SinkExt; @@ -11,6 +12,7 @@ use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; +use crate::collab_sync::ping::PingSyncRunner; use tokio::sync::{broadcast, watch, Mutex}; use tokio::time::{interval, sleep}; use tracing::{error, trace, warn}; @@ -95,14 +97,13 @@ where let last_sync = Arc::new(SyncTimestamp::new()); let mut interval = interval(SEND_INTERVAL); - let weak_notifier = Arc::downgrade(¬ifier); let weak_flying_messages = Arc::downgrade(&flying_messages); + let weak_notifier = Arc::downgrade(¬ifier); let origin = CollabOrigin::Client(CollabClient { uid, device_id: object.device_id.clone(), }); - PingSyncRunner::run( origin, object.object_id.clone(), diff --git a/libs/client-api/src/collab_sync/collab_stream.rs b/libs/client-api/src/collab_sync/collab_stream.rs index 42aa1d9e..e09ea218 100644 --- a/libs/client-api/src/collab_sync/collab_stream.rs +++ b/libs/client-api/src/collab_sync/collab_stream.rs @@ -332,7 +332,6 @@ impl SeqNumCounter { if ack_seq_num > broadcast_seq_num + 3 { self.store_broadcast_seq_num(ack_seq_num); - return Err(SyncError::MissUpdates(format!( "missing {} updates, start init sync", ack_seq_num - broadcast_seq_num, @@ -341,7 +340,6 @@ impl SeqNumCounter { if self.equal_counter.load(Ordering::SeqCst) >= 5 { self.equal_counter.store(0, Ordering::SeqCst); - return Err(SyncError::MissUpdates( "ping exceeds, start init sync".to_string(), )); diff --git a/libs/client-api/src/collab_sync/ping.rs b/libs/client-api/src/collab_sync/ping.rs index f6d34c36..8fa1c9fd 100644 --- a/libs/client-api/src/collab_sync/ping.rs +++ b/libs/client-api/src/collab_sync/ping.rs @@ -9,9 +9,11 @@ use tokio::sync::watch; use tokio::time::{sleep_until, Instant}; use tracing::warn; +#[allow(dead_code)] pub struct PingSyncRunner; impl PingSyncRunner { + #[allow(dead_code)] pub(crate) fn run( origin: CollabOrigin, object_id: String, @@ -21,7 +23,12 @@ impl PingSyncRunner { weak_notify: Weak>, sync_timestamp: Arc, ) { - let duration = Duration::from_secs(10); + let duration = if cfg!(feature = "test_fast_sync") { + Duration::from_secs(10) + } else { + Duration::from_secs(20) + }; + let mut next_tick = Instant::now() + duration; tokio::spawn(async move { loop { @@ -51,7 +58,7 @@ impl PingSyncRunner { if is_not_empty { #[cfg(feature = "sync_verbose_log")] tracing::trace!("{} slow down ping", object_id); - next_tick = Instant::now() + Duration::from_secs(20); + next_tick = Instant::now() + Duration::from_secs(30); } let msg_id = msg_id_counter.next(); diff --git a/libs/client-api/src/http.rs b/libs/client-api/src/http.rs index 64ac6187..44f1e4c9 100644 --- a/libs/client-api/src/http.rs +++ b/libs/client-api/src/http.rs @@ -27,6 +27,7 @@ use parking_lot::RwLock; use reqwest::{header, StatusCode}; use collab_entity::CollabType; + use reqwest::header::HeaderValue; use reqwest::Method; use reqwest::RequestBuilder; diff --git a/libs/client-api/src/native/http_native.rs b/libs/client-api/src/native/http_native.rs index 1d6e6bff..f048900a 100644 --- a/libs/client-api/src/native/http_native.rs +++ b/libs/client-api/src/native/http_native.rs @@ -1,9 +1,10 @@ use crate::http::log_request_id; +use crate::native::GetCollabAction; use crate::ws::{WSClientHttpSender, WSError}; use crate::{spawn_blocking_brotli_compress, Client}; use crate::{RefreshTokenAction, RefreshTokenRetryCondition}; use anyhow::anyhow; -use app_error::{AppError, ErrorCode}; +use app_error::AppError; use async_trait::async_trait; use collab_rt_entity::EncodedCollab; use collab_rt_entity::HttpRealtimeMessage; @@ -15,8 +16,8 @@ use shared_entity::response::{AppResponse, AppResponseError}; use std::future::Future; use std::sync::atomic::Ordering; use std::time::Duration; -use tokio_retry::strategy::FixedInterval; -use tokio_retry::RetryIf; +use tokio_retry::strategy::{ExponentialBackoff, FixedInterval}; +use tokio_retry::{Retry, RetryIf}; use tracing::{event, instrument}; impl Client { @@ -25,42 +26,10 @@ impl Client { &self, params: QueryCollabParams, ) -> Result { - let url = format!( - "{}/api/workspace/{}/collab/{}", - self.base_url, ¶ms.workspace_id, ¶ms.object_id - ); - - let mut retries = 3; // Maximum number of retries - let retry_delay = Duration::from_secs(2); - while retries > 0 { - let resp = self - .http_client_with_auth(Method::GET, &url) - .await? - .json(¶ms) - .send() - .await?; - log_request_id(&resp); - let response = AppResponse::::from_response(resp).await?; - - // Retry if the record is not found - if response.code == ErrorCode::RecordNotFound { - retries -= 1; - if retries > 0 { - tokio::time::sleep(retry_delay).await; - continue; - } else { - return response.into_data(); - } - } else { - return response.into_data(); - } - } - - // this part is unreachable by logic - Err(AppResponseError::new( - ErrorCode::Unhandled, - "Exhausted retries to fetch collaboration data.", - )) + // 2 seconds, 4 seconds, 8 seconds + let retry_strategy = ExponentialBackoff::from_millis(2).factor(1000).take(3); + let action = GetCollabAction::new(self.clone(), params); + Retry::spawn(retry_strategy, action).await } #[instrument(level = "debug", skip_all, err)] diff --git a/libs/client-api/src/native/retry.rs b/libs/client-api/src/native/retry.rs index 0cea88b4..4693bae8 100644 --- a/libs/client-api/src/native/retry.rs +++ b/libs/client-api/src/native/retry.rs @@ -1,12 +1,18 @@ +use crate::http::log_request_id; use crate::notify::ClientToken; use crate::ws::{ ConnectInfo, ConnectState, ConnectStateNotify, CurrentConnInfo, StateNotify, WSError, }; +use crate::Client; use app_error::gotrue::GoTrueError; use client_websocket::{connect_async, WebSocketStream}; +use collab_rt_entity::EncodedCollab; +use database_entity::dto::QueryCollabParams; use gotrue::grant::{Grant, RefreshTokenGrant}; use parking_lot::RwLock; use reqwest::header::HeaderMap; +use reqwest::Method; +use shared_entity::response::{AppResponse, AppResponseError}; use std::future::Future; use std::pin::Pin; use std::sync::{Arc, Weak}; @@ -150,3 +156,41 @@ impl Condition for RetryCondition { should_retry } } + +pub(crate) struct GetCollabAction { + client: Client, + params: QueryCollabParams, +} + +impl GetCollabAction { + pub fn new(client: Client, params: QueryCollabParams) -> Self { + Self { client, params } + } +} + +impl Action for GetCollabAction { + type Future = Pin> + Send + Sync>>; + type Item = EncodedCollab; + type Error = AppResponseError; + + fn run(&mut self) -> Self::Future { + let client = self.client.clone(); + let params = self.params.clone(); + + Box::pin(async move { + let url = format!( + "{}/api/workspace/{}/collab/{}", + client.base_url, ¶ms.workspace_id, ¶ms.object_id + ); + let resp = client + .http_client_with_auth(Method::GET, &url) + .await? + .json(¶ms) + .send() + .await?; + log_request_id(&resp); + let resp = AppResponse::::from_response(resp).await?; + resp.into_data() + }) + } +} diff --git a/tests/collab/multi_devices_edit.rs b/tests/collab/multi_devices_edit.rs index 31877b84..ebe2c293 100644 --- a/tests/collab/multi_devices_edit.rs +++ b/tests/collab/multi_devices_edit.rs @@ -275,59 +275,3 @@ async fn edit_document_with_both_clients_offline_then_online_sync_test() { .await .unwrap(); } - -#[tokio::test] -async fn second_client_missing_broadcast_and_then_pull_missing_updates_test() { - let collab_type = CollabType::Empty; - let mut client_1 = TestClient::new_user().await; - let mut client_2 = TestClient::new_user().await; - - // Create a collaborative document with client_1 and invite client_2 to collaborate. - let workspace_id = client_1.workspace_id().await; - let object_id = client_1 - .create_and_edit_collab(&workspace_id, collab_type.clone()) - .await; - client_1 - .add_collab_member( - &workspace_id, - &object_id, - &client_2, - AFAccessLevel::ReadAndWrite, - ) - .await; - - // after client 2 finish init sync and then disable receive message - client_2 - .open_collab(&workspace_id, &object_id, collab_type.clone()) - .await; - client_2 - .wait_object_sync_complete(&object_id) - .await - .unwrap(); - client_2.ws_client.disable_receive_message(); - - // Client_1 makes the first edit by inserting "task 1". - client_1 - .collabs - .get_mut(&object_id) - .unwrap() - .collab - .lock() - .insert("content", "hello world"); - client_1 - .wait_object_sync_complete(&object_id) - .await - .unwrap(); - - // sleep two seconds to make sure missing the server broadcast message - sleep(Duration::from_secs(2)).await; - // after a period of time, client 2 should trigger init sync - client_2.ws_client.enable_receive_message(); - - let expected_json = json!({ - "content": "hello world" - }); - assert_client_collab_include_value(&mut client_2, &object_id, expected_json) - .await - .unwrap(); -}