feat: update ws connect setting (#111)

* chore: update ws connect setting

* chore: remove token when token is invalid

* chore: close the ws connect when overflow happened

* fix: reset timing

* fix: reset timing

* chore: sleep if timeout not reach

* chore: sleep if timeout not reach
This commit is contained in:
Nathan.fooo 2023-10-12 16:51:44 +08:00 committed by GitHub
parent 119adf53e4
commit 3611c2076e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 73 additions and 34 deletions

4
Cargo.lock generated
View File

@ -1132,7 +1132,7 @@ dependencies = [
[[package]]
name = "collab"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=e08a66d3#e08a66d341889f9b3e0a79b47315a799d1b21ccc"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c23dff3#c23dff3b58fa635a7fb4b5e2dbc18ebd76cfbd53"
dependencies = [
"anyhow",
"async-trait",
@ -1151,7 +1151,7 @@ dependencies = [
[[package]]
name = "collab-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=e08a66d3#e08a66d341889f9b3e0a79b47315a799d1b21ccc"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c23dff3#c23dff3b58fa635a7fb4b5e2dbc18ebd76cfbd53"
dependencies = [
"anyhow",
"bytes",

View File

@ -132,8 +132,8 @@ lto = false
opt-level = 3
[patch.crates-io]
collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "e08a66d3" }
collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "e08a66d3" }
collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c23dff3" }
collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c23dff3" }
# Comment the above and uncomment the below to use local version of collab by cloning the repo and placing it in libs folder
#collab = { path = "libs/AppFlowy-Collab/collab" }

View File

@ -199,14 +199,17 @@ where
// the fix interval.
if let SinkStrategy::FixInterval(duration) = &self.config.strategy {
let elapsed = self.instant.lock().await.elapsed();
// trace!(
// "elapsed interval: {:?}, fix interval: {:?}",
// elapsed,
// duration
// );
// If the elapsed time is less than the fixed interval or if the remaining time until the fixed
// interval is less than the send timeout, return.
if elapsed < *duration {
return Ok(());
}
if elapsed < self.config.send_timeout {
tokio::time::sleep(Duration::from_secs(1)).await;
self.notify();
return Ok(());
}
}
// Reset the instant if the strategy is [SinkStrategy::FixInterval].

View File

@ -449,12 +449,20 @@ impl Client {
.refresh_token
.as_str()
.to_owned();
let access_token_resp = self
match self
.gotrue_client
.token(&Grant::RefreshToken(RefreshTokenGrant { refresh_token }))
.await?;
self.token.write().set(access_token_resp);
Ok(())
.await
{
Ok(access_token_resp) => {
self.token.write().set(access_token_resp);
Ok(())
},
Err(err) => {
self.token.write().unset();
Err(AppError::from(err))
},
}
}
#[instrument(level = "debug", skip_all, err)]

View File

@ -42,8 +42,14 @@ impl ClientToken {
/// - `token`: The new `AccessTokenResponse` to be set.
pub(crate) fn set(&mut self, new_token: AccessTokenResponse) {
tracing::trace!("Set new access token: {:?}", new_token);
let is_new = match &self.token {
None => true,
Some(old_token) => old_token.access_token != new_token.access_token,
};
self.token = Some(new_token);
let _ = self.sender.send(TokenState::Refresh);
if is_new {
let _ = self.sender.send(TokenState::Refresh);
}
}
/// Unsets the current access token and notifies receivers of the invalidation.
@ -54,7 +60,6 @@ impl ClientToken {
#[allow(dead_code)]
pub(crate) fn unset(&mut self) {
if self.token.is_some() {
tracing::trace!("Set new access token: {:?}", self.token);
self.token = None;
let _ = self.sender.send(TokenState::Invalid);
}

View File

@ -18,7 +18,7 @@ use tokio_retry::strategy::FixedInterval;
use tokio_retry::{Condition, RetryIf};
use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode;
use tokio_tungstenite::tungstenite::protocol::CloseFrame;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::tungstenite::{Error, Message};
use tokio_tungstenite::MaybeTlsStream;
use tracing::{debug, error, info, trace, warn};
@ -157,6 +157,23 @@ impl WSClient {
});
let mut sink_rx = self.sender.subscribe();
let weak_state_notify = Arc::downgrade(&self.state_notify);
let handle_ws_error = move |error: &Error| {
error!("websocket error: {:?}", error);
match weak_state_notify.upgrade() {
None => {
error!("ws state_notify is dropped");
},
Some(state_notify) => match &error {
Error::ConnectionClosed | Error::AlreadyClosed => {
state_notify.lock().set_state(ConnectState::Disconnected);
},
_ => {},
},
}
};
tokio::spawn(async move {
loop {
tokio::select! {
@ -165,12 +182,9 @@ impl WSClient {
break;
},
Ok(msg) = sink_rx.recv() => {
match sink.send(msg).await {
Ok(_) => {},
Err(e) => {
error!("Failed to send message via websocket: {:?}", e);
break;
},
if let Err(err) = sink.send(msg).await {
handle_ws_error(&err);
break;
}
}
}

View File

@ -5,7 +5,7 @@ use crate::ws::WSError;
use tokio::net::TcpStream;
use tokio_retry::Action;
use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
use tracing::debug;
use tracing::{debug, error};
pub(crate) struct ConnectAction {
addr: String,
@ -27,10 +27,12 @@ impl Action for ConnectAction {
Box::pin(async move {
debug!("🔵Connecting to websocket: {}", cloned_addr);
match connect_async(&cloned_addr).await {
Ok((stream, _response)) => Ok(stream),
Ok((stream, _response)) => {
debug!("connect success");
Ok(stream)
},
Err(e) => {
//
tracing::error!("🔴connect error: {:?}", e.to_string());
error!("connect failed: {:?}", e.to_string());
Err(e.into())
},
}

View File

@ -12,6 +12,7 @@ use std::ops::Deref;
use crate::collaborate::CollabServer;
use crate::error::RealtimeError;
use actix_web_actors::ws::ProtocolError;
use database::collab::CollabStorage;
use realtime_entity::collab_msg::CollabMessage;
use std::time::{Duration, Instant};
@ -141,8 +142,10 @@ where
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
let msg = match msg {
Err(err) => {
error!("Websocket ProtocolError: {:?}", err);
ctx.stop();
error!("Websocket stream error: {}", err);
if let ProtocolError::Overflow = err {
ctx.stop();
}
return;
},
Ok(msg) => msg,
@ -162,7 +165,7 @@ where
ctx.close(reason);
ctx.stop();
},
ws::Message::Continuation(_) => ctx.stop(),
ws::Message::Continuation(_) => {},
ws::Message::Nop => (),
}
}

View File

@ -173,9 +173,8 @@ async fn broadcast_message<U>(
{
if let Some(client_stream) = client_streams.read().await.get(&client_msg.user) {
trace!(
"[💭Server]: receives client message: [oid:{}|msg_id:{:?}]",
client_msg.content.object_id(),
client_msg.content.msg_id()
"[💭Server]: receives client message: {}",
client_msg.content,
);
match client_stream
.stream_tx

View File

@ -18,6 +18,8 @@ pub fn ws_scope() -> Scope {
web::scope("/ws").service(establish_ws_connection)
}
const MAX_FRAME_SIZE: usize = 65_536; // 64 KiB
#[get("/{token}/{device_id}")]
pub async fn establish_ws_connection(
request: HttpRequest,
@ -26,7 +28,7 @@ pub async fn establish_ws_connection(
state: Data<AppState>,
server: Data<Addr<CollabServer<CollabStorageProxy, Arc<RealtimeUserImpl>>>>,
) -> Result<HttpResponse> {
tracing::info!("ws connect: {:?}", request);
tracing::info!("receive ws connect: {:?}", request);
let (token, device_id) = path.into_inner();
let auth = authorization_from_token(token.as_str(), &state)?;
let user_uuid = UserUuid::from_auth(auth)?;
@ -38,7 +40,10 @@ pub async fn establish_ws_connection(
Duration::from_secs(state.config.websocket.client_timeout as u64),
);
match ws::start(client, &request, payload) {
match ws::WsResponseBuilder::new(client, &request, payload)
.frame_size(MAX_FRAME_SIZE)
.start()
{
Ok(response) => Ok(response),
Err(e) => {
tracing::error!("🔴ws connection error: {:?}", e);