diff --git a/libs/client-api/src/http.rs b/libs/client-api/src/http.rs index ce47c40b..9af257a4 100644 --- a/libs/client-api/src/http.rs +++ b/libs/client-api/src/http.rs @@ -46,7 +46,10 @@ use url::Url; use gotrue_entity::dto::SignUpResponse::{Authenticated, NotAuthenticated}; use gotrue_entity::dto::{GotrueTokenResponse, UpdateGotrueUserParams, User}; -pub const CLIENT_API_VERSION: &str = "0.0.3"; +/// The API version of the client. +/// 0.0.4 +/// fix refresh token issue +pub const CLIENT_API_VERSION: &str = "0.0.4"; pub const X_COMPRESSION_TYPE: &str = "X-Compression-Type"; pub const X_COMPRESSION_BUFFER_SIZE: &str = "X-Compression-Buffer-Size"; pub const X_COMPRESSION_TYPE_BROTLI: &str = "brotli"; @@ -112,7 +115,6 @@ pub struct Client { pub(crate) config: ClientConfiguration, } -pub(crate) type RefreshTokenRet = tokio::sync::oneshot::Receiver>; pub(crate) type RefreshTokenSender = tokio::sync::oneshot::Sender>; /// Hardcoded schema in the frontend application. Do not change this value. diff --git a/libs/client-api/src/native/http_native.rs b/libs/client-api/src/native/http_native.rs index a979a70a..d1377db1 100644 --- a/libs/client-api/src/native/http_native.rs +++ b/libs/client-api/src/native/http_native.rs @@ -2,6 +2,7 @@ use crate::http::log_request_id; use crate::ws::{WSClientHttpSender, WSError}; use crate::{spawn_blocking_brotli_compress, Client}; use crate::{RefreshTokenAction, RefreshTokenRetryCondition}; +use anyhow::anyhow; use app_error::AppError; use async_trait::async_trait; use database_entity::dto::CollabParams; @@ -105,20 +106,26 @@ impl Client { /// using the stored refresh token. If successful, it updates the stored access token with the new one /// received from the server. #[instrument(level = "debug", skip_all, err)] - pub async fn refresh_token(&self) -> Result { + pub async fn refresh_token(&self) -> Result<(), AppResponseError> { let (tx, rx) = tokio::sync::oneshot::channel(); self.refresh_ret_txs.write().push(tx); if !self.is_refreshing_token.load(Ordering::SeqCst) { self.is_refreshing_token.store(true, Ordering::SeqCst); - let txs = std::mem::take(&mut *self.refresh_ret_txs.write()); let result = self.inner_refresh_token().await; + let txs = std::mem::take(&mut *self.refresh_ret_txs.write()); for tx in txs { let _ = tx.send(result.clone()); } self.is_refreshing_token.store(false, Ordering::SeqCst); } - Ok(rx) + + // Wait for the result of the refresh token request. + match tokio::time::timeout(Duration::from_secs(60), rx).await { + Ok(Ok(result)) => result, + Ok(Err(err)) => Err(AppError::Internal(anyhow!("refresh token error: {}", err)).into()), + Err(_) => Err(AppError::RequestTimeout("refresh token timeout".to_string()).into()), + } } async fn inner_refresh_token(&self) -> Result<(), AppResponseError> { diff --git a/libs/client-api/src/wasm/http_wasm.rs b/libs/client-api/src/wasm/http_wasm.rs index 6c538234..ac69e0fd 100644 --- a/libs/client-api/src/wasm/http_wasm.rs +++ b/libs/client-api/src/wasm/http_wasm.rs @@ -1,4 +1,3 @@ -use crate::http::RefreshTokenRet; use crate::ws::{WSClientHttpSender, WSError}; use crate::Client; use app_error::gotrue::GoTrueError; @@ -25,7 +24,7 @@ impl Client { } #[instrument(level = "debug", skip_all, err)] - pub async fn refresh_token(&self) -> Result { + pub async fn refresh_token(&self) -> Result<(), AppResponseError> { let (tx, rx) = tokio::sync::oneshot::channel(); self.refresh_ret_txs.write().push(tx); @@ -38,7 +37,10 @@ impl Client { } self.is_refreshing_token.store(false, Ordering::SeqCst); } - Ok(rx) + + rx.await + .map_err(|err| AppResponseError::new(ErrorCode::Internal, err.to_string()))??; + Ok(()) } async fn inner_refresh_token(&self) -> Result<(), AppResponseError> { diff --git a/tests/user/refresh.rs b/tests/user/refresh.rs index a17853cc..81229fa9 100644 --- a/tests/user/refresh.rs +++ b/tests/user/refresh.rs @@ -1,7 +1,7 @@ use app_error::AppError; use client_api_test_util::generate_unique_registered_user_client; use futures::future::join_all; -use std::time::{Duration, SystemTime}; +use std::time::SystemTime; #[tokio::test] async fn refresh_success() { @@ -20,7 +20,7 @@ async fn concurrent_refresh() { tokio::time::sleep(std::time::Duration::from_secs(2)).await; let mut join_handles = vec![]; - for _ in 0..100 { + for _ in 0..20 { let cloned_client = c.clone(); let handle = tokio::spawn(async move { cloned_client.refresh_token().await.unwrap(); @@ -29,12 +29,11 @@ async fn concurrent_refresh() { join_handles.push(handle); } let results = join_all(join_handles).await; - assert_eq!(results.len(), 100); + assert_eq!(results.len(), 20); for result in results { result.unwrap().unwrap(); } - tokio::time::sleep(Duration::from_secs(2)).await; let new_token = c.access_token().unwrap(); assert_ne!(old_token, new_token); }