chore: try to fix ws token error (#530)

* chore: try to fix ws token error

* chore: fix compile

* chore: bump version number

* chore: update

* chore: update

* ci: fix test
This commit is contained in:
Nathan.fooo 2024-05-07 16:45:12 +08:00 committed by GitHub
parent 5dbb9d9c86
commit ef8e6f360f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 117 additions and 122 deletions

View File

@ -22,7 +22,7 @@ concurrency:
env:
LOCALHOST_URL: http://localhost
LOCALHOST_WS: ws://localhost/ws
LOCALHOST_WS: ws://localhost/ws/v1
APPFLOWY_REDIS_URI: redis://redis:6379
LOCALHOST_GOTRUE: http://localhost/gotrue
DATABASE_URL: postgres://postgres:password@localhost:5432/postgres

2
Cargo.lock generated
View File

@ -1458,7 +1458,7 @@ checksum = "702fc72eb24e5a1e48ce58027a675bc24edd52096d5397d4aea7c6dd9eca0bd1"
[[package]]
name = "client-api"
version = "0.1.0"
version = "0.2.0"
dependencies = [
"again",
"anyhow",

View File

@ -18,7 +18,7 @@ fi
# Make sure to update the test client configuration in libs/client-api-test-util/src/client.rs
# export LOCALHOST_URL="http://localhost"
# export LOCALHOST_WS_URL="ws://localhost/ws"
# export LOCALHOST_WS_URL="ws://localhost/ws/v1"
# export LOCALHOST_GOTRUE_URL="http://localhost:gotrue"
docker compose down

View File

@ -10,7 +10,7 @@ lazy_static! {
pub static ref LOCALHOST_URL: Cow<'static, str> =
get_env_var("LOCALHOST_URL", "http://localhost:8000");
pub static ref LOCALHOST_WS: Cow<'static, str> =
get_env_var("LOCALHOST_WS", "ws://localhost:8000/ws");
get_env_var("LOCALHOST_WS", "ws://localhost:8000/ws/v1");
pub static ref LOCALHOST_GOTRUE: Cow<'static, str> =
get_env_var("LOCALHOST_GOTRUE", "http://localhost:9999");
}
@ -19,7 +19,7 @@ lazy_static! {
#[cfg(target_arch = "wasm32")]
lazy_static! {
pub static ref LOCALHOST_URL: Cow<'static, str> = Cow::Owned("http://localhost".to_string());
pub static ref LOCALHOST_WS: Cow<'static, str> = Cow::Owned("ws://localhost/ws".to_string());
pub static ref LOCALHOST_WS: Cow<'static, str> = Cow::Owned("ws://localhost/ws/v1".to_string());
pub static ref LOCALHOST_GOTRUE: Cow<'static, str> =
Cow::Owned("http://localhost/gotrue".to_string());
}

View File

@ -81,13 +81,10 @@ impl TestClient {
retry_connect_per_pings: 5,
},
api_client.clone(),
api_client.clone(),
);
let connect_info = api_client.ws_connect_info().await.unwrap();
if start_ws_conn {
ws_client
.connect(&api_client.ws_url(), connect_info)
.await
.unwrap();
ws_client.connect().await.unwrap();
}
Self {
user: registered_user,
@ -707,14 +704,7 @@ impl TestClient {
}
pub async fn reconnect(&self) {
self
.ws_client
.connect(
&self.api_client.ws_url(),
self.api_client.ws_connect_info().await.unwrap(),
)
.await
.unwrap();
self.ws_client.connect().await.unwrap();
}
pub async fn get_edit_collab_json(&self, object_id: &str) -> Value {

View File

@ -1,6 +1,6 @@
[package]
name = "client-api"
version = "0.1.0"
version = "0.2.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

View File

@ -104,7 +104,7 @@ pub struct Client {
pub(crate) cloud_client: reqwest::Client,
pub(crate) gotrue_client: gotrue::api::Client,
pub base_url: String,
ws_addr: String,
pub ws_addr: String,
pub device_id: String,
pub client_version: Version,
pub(crate) token: Arc<RwLock<ClientToken>>,
@ -1101,15 +1101,15 @@ impl Client {
.into_data()
}
#[instrument(level = "info", skip_all)]
pub fn ws_url(&self) -> String {
format!("{}/v1", self.ws_addr)
}
pub async fn ws_connect_info(&self) -> Result<ConnectInfo, AppResponseError> {
self
.refresh_if_expired(chrono::Local::now().timestamp())
.await?;
pub async fn ws_connect_info(&self, auto_refresh: bool) -> Result<ConnectInfo, AppResponseError> {
if auto_refresh {
self
.refresh_if_expired(
chrono::Local::now().timestamp(),
"get websocket connect info",
)
.await?;
}
Ok(ConnectInfo {
access_token: self.access_token()?,
@ -1283,13 +1283,13 @@ impl Client {
}
// Refresh token if given timestamp is close to the token expiration time
pub async fn refresh_if_expired(&self, ts: i64) -> Result<(), AppResponseError> {
pub async fn refresh_if_expired(&self, ts: i64, reason: &str) -> Result<(), AppResponseError> {
let expires_at = self.token_expires_at()?;
if ts + 30 > expires_at {
info!("token is about to expire, refreshing token");
// Add 30 seconds buffer
self.refresh_token().await?;
self.refresh_token(reason).await?;
}
Ok(())
}
@ -1324,7 +1324,9 @@ impl Client {
url: &str,
) -> Result<RequestBuilder, AppResponseError> {
let ts_now = chrono::Local::now().timestamp();
self.refresh_if_expired(ts_now).await?;
self
.refresh_if_expired(ts_now, "make http client request")
.await?;
let access_token = self.access_token()?;
trace!("start request: {}, method: {}", url, method);

View File

@ -1,6 +1,6 @@
use crate::http::log_request_id;
use crate::native::GetCollabAction;
use crate::ws::{WSClientHttpSender, WSError};
use crate::ws::{ConnectInfo, WSClientConnectURLProvider, WSClientHttpSender, WSError};
use crate::{spawn_blocking_brotli_compress, Client};
use crate::{RefreshTokenAction, RefreshTokenRetryCondition};
use anyhow::anyhow;
@ -120,12 +120,14 @@ impl Client {
/// using the stored refresh token. If successful, it updates the stored access token with the new one
/// received from the server.
#[instrument(level = "debug", skip_all, err)]
pub async fn refresh_token(&self) -> Result<(), AppResponseError> {
pub async fn refresh_token(&self, reason: &str) -> Result<(), AppResponseError> {
let (tx, rx) = tokio::sync::oneshot::channel();
self.refresh_ret_txs.write().push(tx);
if !self.is_refreshing_token.load(Ordering::SeqCst) {
self.is_refreshing_token.store(true, Ordering::SeqCst);
info!("refresh token reason:{}", reason);
let result = self.inner_refresh_token().await;
let txs = std::mem::take(&mut *self.refresh_ret_txs.write());
for tx in txs {
@ -178,6 +180,21 @@ impl WSClientHttpSender for Client {
}
}
#[async_trait]
impl WSClientConnectURLProvider for Client {
fn connect_ws_url(&self) -> String {
self.ws_addr.clone()
}
async fn connect_info(&self) -> Result<ConnectInfo, WSError> {
let conn_info = self
.ws_connect_info(true)
.await
.map_err(|err| WSError::Http(err.to_string()))?;
Ok(conn_info)
}
}
// TODO(nathan): spawn for wasm
pub fn af_spawn<T>(future: T) -> tokio::task::JoinHandle<T::Output>
where

View File

@ -1,7 +1,7 @@
use crate::http::log_request_id;
use crate::notify::ClientToken;
use crate::ws::{
ConnectInfo, ConnectState, ConnectStateNotify, CurrentConnInfo, StateNotify, WSError,
ConnectState, ConnectStateNotify, StateNotify, WSClientConnectURLProvider, WSError,
};
use crate::Client;
use app_error::gotrue::GoTrueError;
@ -19,7 +19,7 @@ use std::sync::{Arc, Weak};
use std::time::Duration;
use tokio_retry::strategy::FixedInterval;
use tokio_retry::{Action, Condition, RetryIf};
use tracing::{debug, info};
use tracing::{debug, info, trace};
pub(crate) struct RefreshTokenAction {
token: Arc<RwLock<ClientToken>>,
@ -74,45 +74,40 @@ impl Condition<GoTrueError> for RefreshTokenRetryCondition {
}
pub async fn retry_connect(
url: String,
info: ConnectInfo,
connect_provider: Arc<dyn WSClientConnectURLProvider>,
state_notify: Weak<StateNotify>,
current_addr: Weak<CurrentConnInfo>,
) -> Result<WebSocketStream, WSError> {
let stream = RetryIf::spawn(
FixedInterval::new(Duration::from_secs(10)),
ConnectAction::new(url, info.clone()),
RetryCondition {
connect_info: info,
current_connect_info: current_addr,
state_notify,
},
FixedInterval::new(Duration::from_secs(15)),
ConnectAction::new(connect_provider),
RetryCondition { state_notify },
)
.await?;
Ok(stream)
}
struct ConnectAction {
url: String,
connect_info: ConnectInfo,
connect_provider: Arc<dyn WSClientConnectURLProvider>,
}
impl ConnectAction {
fn new(url: String, connect_info: ConnectInfo) -> Self {
Self { url, connect_info }
fn new(connect_provider: Arc<dyn WSClientConnectURLProvider>) -> Self {
Self { connect_provider }
}
}
impl Action for ConnectAction {
type Future = Pin<Box<dyn Future<Output = Result<Self::Item, Self::Error>> + Send + Sync>>;
type Future = Pin<Box<dyn Future<Output = Result<Self::Item, Self::Error>> + Send>>;
type Item = WebSocketStream;
type Error = WSError;
fn run(&mut self) -> Self::Future {
let url = self.url.clone();
let headers: HeaderMap = self.connect_info.clone().into();
let connect_provider = self.connect_provider.clone();
Box::pin(async move {
info!("🔵websocket start connecting");
let url = connect_provider.connect_ws_url();
let headers: HeaderMap = connect_provider.connect_info().await?.into();
trace!("websocket url:{}, headers: {:?}", url, headers);
match connect_async(&url, headers).await {
Ok(stream) => {
info!("🟢websocket connect success");
@ -125,8 +120,6 @@ impl Action for ConnectAction {
}
struct RetryCondition {
connect_info: ConnectInfo,
current_connect_info: Weak<parking_lot::Mutex<Option<ConnectInfo>>>,
state_notify: Weak<parking_lot::Mutex<ConnectStateNotify>>,
}
impl Condition<WSError> for RetryCondition {
@ -136,24 +129,10 @@ impl Condition<WSError> for RetryCondition {
if let Some(state_notify) = self.state_notify.upgrade() {
state_notify.lock().set_state(ConnectState::Unauthorized);
}
return false;
}
let should_retry = self
.current_connect_info
.upgrade()
.map(|addr| match addr.try_lock() {
None => false,
Some(addr) => match &*addr {
None => false,
Some(addr) => addr == &self.connect_info,
},
})
.unwrap_or(false);
debug!("WSClient should_retry: {}", should_retry);
should_retry
true
}
}

View File

@ -1,12 +1,9 @@
use crate::http::log_request_id;
use crate::ws::{WSClientHttpSender, WSError};
use crate::ws::{ConnectInfo, WSClientConnectURLProvider, WSClientHttpSender, WSError};
use crate::Client;
use crate::RefreshTokenRetryCondition;
use again::RetryPolicy;
use app_error::gotrue::GoTrueError;
use app_error::{AppError, ErrorCode};
use app_error::ErrorCode;
use async_trait::async_trait;
use collab_entity::EncodedCollab;
use database_entity::dto::{CollabParams, QueryCollabParams};
use gotrue::grant::{Grant, RefreshTokenGrant};
use reqwest::Method;
@ -14,8 +11,7 @@ use shared_entity::dto::workspace_dto::{CollabResponse, CollabTypeParam};
use shared_entity::response::{AppResponse, AppResponseError};
use std::future::Future;
use std::sync::atomic::Ordering;
use std::time::Duration;
use tracing::{event, instrument};
use tracing::{info, instrument};
impl Client {
pub async fn create_collab_list(
@ -52,12 +48,14 @@ impl Client {
}
#[instrument(level = "debug", skip_all, err)]
pub async fn refresh_token(&self) -> Result<(), AppResponseError> {
pub async fn refresh_token(&self, reason: &str) -> Result<(), AppResponseError> {
let (tx, rx) = tokio::sync::oneshot::channel();
self.refresh_ret_txs.write().push(tx);
if !self.is_refreshing_token.load(Ordering::SeqCst) {
self.is_refreshing_token.store(true, Ordering::SeqCst);
info!("refresh token reason:{}", reason);
let txs = std::mem::take(&mut *self.refresh_ret_txs.write());
let result = self.inner_refresh_token().await;
for tx in txs {
@ -145,3 +143,14 @@ impl WSClientHttpSender for Client {
Err(WSError::Internal(anyhow::Error::msg("not supported")))
}
}
#[async_trait]
impl WSClientConnectURLProvider for Client {
fn connect_ws_url(&self) -> String {
self.ws_addr.clone()
}
async fn connect_info(&self) -> Result<ConnectInfo, WSError> {
Err(WSError::Internal(anyhow::Error::msg("not supported")))
}
}

View File

@ -1,9 +1,9 @@
use crate::ws::{ConnectInfo, CurrentConnInfo, StateNotify, WSError};
use crate::ws::{StateNotify, WSClientConnectURLProvider, WSError};
use again::Condition;
use app_error::gotrue::GoTrueError;
use client_websocket::{connect_async, WebSocketStream};
use reqwest::header::HeaderMap;
use std::sync::Weak;
use std::sync::{Arc, Weak};
pub(crate) struct RefreshTokenRetryCondition;
@ -13,12 +13,12 @@ impl Condition<GoTrueError> for RefreshTokenRetryCondition {
}
}
pub async fn retry_connect(
url: String,
info: ConnectInfo,
connect_provider: Arc<dyn WSClientConnectURLProvider>,
_state_notify: Weak<StateNotify>,
_current_addr: Weak<CurrentConnInfo>,
) -> Result<WebSocketStream, WSError> {
let headers: HeaderMap = info.into();
let url = connect_provider.connect_ws_url();
let connect_info = connect_provider.connect_info().await?;
let headers: HeaderMap = connect_info.into();
let stream = connect_async(url, headers).await?;
Ok(stream)
}

View File

@ -48,19 +48,24 @@ pub trait WSClientHttpSender: Send + Sync {
async fn send_ws_msg(&self, device_id: &str, message: Message) -> Result<(), WSError>;
}
#[async_trait::async_trait]
pub trait WSClientConnectURLProvider: Send + Sync {
fn connect_ws_url(&self) -> String;
async fn connect_info(&self) -> Result<ConnectInfo, WSError>;
}
type WeakChannel = Weak<WebSocketChannel<ServerCollabMessage>>;
type ChannelByObjectId = HashMap<String, Vec<WeakChannel>>;
pub type WSConnectStateReceiver = Receiver<ConnectState>;
pub(crate) type StateNotify = parking_lot::Mutex<ConnectStateNotify>;
pub(crate) type CurrentConnInfo = parking_lot::Mutex<Option<ConnectInfo>>;
/// The maximum size allowed for a WebSocket message is 65,536 bytes. If the message exceeds
/// 50960 bytes (to avoid occupying the entire space), it should be sent over HTTP instead.
const MAXIMUM_MESSAGE_SIZE: usize = 40960;
const MAXIMUM_BATCH_MESSAGE_SIZE: usize = 20480;
pub struct WSClient {
current_conn_info: Arc<CurrentConnInfo>,
config: WSClientConfig,
state_notify: Arc<StateNotify>,
/// Sender used to send messages to the websocket.
@ -75,11 +80,13 @@ pub struct WSClient {
#[cfg(debug_assertions)]
skip_realtime_message: Arc<std::sync::atomic::AtomicBool>,
connect_provider: Arc<dyn WSClientConnectURLProvider>,
}
impl WSClient {
pub fn new<H>(config: WSClientConfig, http_sender: H) -> Self
pub fn new<H, C>(config: WSClientConfig, http_sender: H, connect_provider: C) -> Self
where
H: WSClientHttpSender + 'static,
C: WSClientConnectURLProvider + 'static,
{
let (ws_msg_sender, _) = channel(config.buffer_capacity);
let state_notify = Arc::new(parking_lot::Mutex::new(ConnectStateNotify::new()));
@ -88,9 +95,9 @@ impl WSClient {
let http_sender = Arc::new(http_sender);
let (user_channel, _) = channel(1);
let (rt_msg_sender, _) = channel(config.buffer_capacity);
let connect_provider = Arc::new(connect_provider);
let aggregate_queue = Arc::new(AggregateMessageQueue::new(MAXIMUM_BATCH_MESSAGE_SIZE));
WSClient {
current_conn_info: Arc::new(parking_lot::Mutex::new(None)),
config,
state_notify,
ws_msg_sender,
@ -104,11 +111,14 @@ impl WSClient {
#[cfg(debug_assertions)]
skip_realtime_message: Default::default(),
connect_provider,
}
}
pub async fn connect(&self, url: &str, connect_info: ConnectInfo) -> Result<(), WSError> {
pub async fn connect(&self) -> Result<(), WSError> {
let connect_info = self.connect_provider.connect_info().await?;
let device_id = connect_info.device_id.clone();
if self.get_state().is_connecting() {
info!("websocket is connecting, skip connect request");
return Ok(());
@ -119,15 +129,11 @@ impl WSClient {
self.set_state(ConnectState::Connecting).await;
let (stop_ws_msg_loop_tx, stop_ws_msg_loop_rx) = oneshot::channel();
*self.stop_ws_msg_loop_tx.lock().await = Some(stop_ws_msg_loop_tx);
*self.current_conn_info.lock() = Some(connect_info.clone());
// 2. start connecting
trace!("start connecting to {}, {}", url, connect_info);
let conn_result = retry_connect(
url.to_string(),
connect_info,
self.connect_provider.clone(),
Arc::downgrade(&self.state_notify),
Arc::downgrade(&self.current_conn_info),
)
.await;
@ -377,7 +383,6 @@ impl WSClient {
reason: Cow::from("client disconnect"),
})));
*self.current_conn_info.lock() = None;
self.set_state(ConnectState::Lost).await;
}

View File

@ -6,7 +6,7 @@ use client_api::ws::{ConnectState, WSClient, WSClientConfig};
#[tokio::test]
async fn realtime_connect_test() {
let (c, _user) = generate_unique_registered_user_client().await;
let ws_client = WSClient::new(WSClientConfig::default(), c.clone());
let ws_client = WSClient::new(WSClientConfig::default(), c.clone(), c.clone());
let mut state = ws_client.subscribe_connect_state();
let device_id = "fake_device_id";
loop {
@ -25,7 +25,7 @@ async fn realtime_connect_test() {
#[tokio::test]
async fn realtime_disconnect_test() {
let (c, _user) = generate_unique_registered_user_client().await;
let ws_client = WSClient::new(WSClientConfig::default(), c.clone());
let ws_client = WSClient::new(WSClientConfig::default(), c.clone(), c.clone());
let device_id = "fake_device_id";
ws_client
.connect(c.ws_url(device_id).await.unwrap(), device_id)

View File

@ -5,7 +5,7 @@ use wasm_bindgen_test::wasm_bindgen_test;
#[wasm_bindgen_test]
async fn realtime_connect_test() {
let (c, _user) = generate_unique_registered_user_client().await;
let ws_client = WSClient::new(WSClientConfig::default(), c.clone());
let ws_client = WSClient::new(WSClientConfig::default(), c.clone(), c.clone());
let mut state = ws_client.subscribe_connect_state();
let device_id = "fake_device_id";
loop {

View File

@ -5,12 +5,11 @@ use wasm_bindgen_test::wasm_bindgen_test;
#[wasm_bindgen_test]
async fn wasm_websocket_connect_test() {
let (c, _user) = generate_unique_registered_user_client().await;
let ws_client = WSClient::new(WSClientConfig::default(), c.clone());
let ws_client = WSClient::new(WSClientConfig::default(), c.clone(), c.clone());
let mut state = ws_client.subscribe_connect_state();
let connect_info = c.ws_connect_info().await.unwrap();
wasm_bindgen_futures::spawn_local(async move {
ws_client.connect(&c.ws_url(), connect_info).await.unwrap();
ws_client.connect().await.unwrap();
});
// wait for the connect state to be connected

View File

@ -2,8 +2,8 @@ extern crate wasm_bindgen_test;
use wasm_bindgen_test::wasm_bindgen_test_configure;
wasm_bindgen_test_configure!(run_in_browser);
#[cfg(target_arch = "wasm32")]
mod conn_test;
// #[cfg(target_arch = "wasm32")]
// mod conn_test;
// #[cfg(target_arch = "wasm32")]
// mod user_test;

View File

@ -601,10 +601,10 @@ async fn collab_flush_test() {
}
#[tokio::test]
async fn simulate_50_offline_user_connect_and_then_sync_document_test() {
async fn simulate_10_offline_user_connect_and_then_sync_document_test() {
let text = generate_random_string(1024 * 1024 * 3);
let mut tasks = Vec::new();
for i in 0..50 {
for i in 0..10 {
let cloned_text = text.clone();
let task = tokio::spawn(async move {
let mut new_user = TestClient::new_user_without_ws_conn().await;
@ -615,7 +615,7 @@ async fn simulate_50_offline_user_connect_and_then_sync_document_test() {
let workspace_id = new_user.workspace_id().await;
let doc_state = make_big_collab_doc_state(&object_id, "text", cloned_text);
new_user
.open_collab_with_doc_state(&workspace_id, &object_id, CollabType::Document, doc_state)
.open_collab_with_doc_state(&workspace_id, &object_id, CollabType::Unknown, doc_state)
.await;
(new_user, object_id)
});

View File

@ -8,7 +8,7 @@ async fn refresh_success() {
let (c, _user) = generate_unique_registered_user_client().await;
let old_token = c.access_token().unwrap();
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
c.refresh_token().await.unwrap();
c.refresh_token("").await.unwrap();
let new_token = c.access_token().unwrap();
assert_ne!(old_token, new_token);
}
@ -23,7 +23,7 @@ async fn concurrent_refresh() {
for _ in 0..20 {
let cloned_client = c.clone();
let handle = tokio::spawn(async move {
cloned_client.refresh_token().await.unwrap();
cloned_client.refresh_token("").await.unwrap();
Ok::<(), AppError>(())
});
join_handles.push(handle);

View File

@ -160,11 +160,9 @@ async fn user_empty_metadata_override() {
#[tokio::test]
async fn user_change_notify_test() {
let (c, _user) = generate_unique_registered_user_client().await;
let ws_client = WSClient::new(WSClientConfig::default(), c.clone());
let ws_client = WSClient::new(WSClientConfig::default(), c.clone(), c.clone());
let mut user_change_recv = ws_client.subscribe_user_changed();
let connect_info = c.ws_connect_info().await.unwrap();
ws_client.connect(&c.ws_url(), connect_info).await.unwrap();
ws_client.connect().await.unwrap();
// After update user, the user_change_recv should receive a user change message via the websocket
let fut = Box::pin(async move {

View File

@ -7,10 +7,9 @@ use client_api_test_util::generate_unique_registered_user_client;
#[tokio::test]
async fn realtime_connect_test() {
let (c, _user) = generate_unique_registered_user_client().await;
let ws_client = WSClient::new(WSClientConfig::default(), c.clone());
let ws_client = WSClient::new(WSClientConfig::default(), c.clone(), c.clone());
let mut state = ws_client.subscribe_connect_state();
let connect_info = c.ws_connect_info().await.unwrap();
tokio::spawn(async move { ws_client.connect(&c.ws_url(), connect_info).await });
tokio::spawn(async move { ws_client.connect().await });
let connect_future = async {
loop {
match state.recv().await {
@ -40,11 +39,9 @@ async fn realtime_connect_after_token_exp_test() {
.unwrap()
.as_secs() as i64;
let ws_client = WSClient::new(WSClientConfig::default(), c.clone());
let ws_client = WSClient::new(WSClientConfig::default(), c.clone(), c.clone());
let mut state = ws_client.subscribe_connect_state();
let connect_info = c.ws_connect_info().await.unwrap();
tokio::spawn(async move { ws_client.connect(&c.ws_url(), connect_info).await });
tokio::spawn(async move { ws_client.connect().await });
let connect_future = async {
loop {
match state.recv().await {
@ -67,9 +64,8 @@ async fn realtime_connect_after_token_exp_test() {
#[tokio::test]
async fn realtime_disconnect_test() {
let (c, _user) = generate_unique_registered_user_client().await;
let ws_client = WSClient::new(WSClientConfig::default(), c.clone());
let connect_info = c.ws_connect_info().await.unwrap();
ws_client.connect(&c.ws_url(), connect_info).await.unwrap();
let ws_client = WSClient::new(WSClientConfig::default(), c.clone(), c.clone());
ws_client.connect().await.unwrap();
let mut state = ws_client.subscribe_connect_state();
loop {