diff --git a/Cargo.lock b/Cargo.lock index e4a14f53..c99c81e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1132,7 +1132,7 @@ dependencies = [ [[package]] name = "collab" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=e08a66d3#e08a66d341889f9b3e0a79b47315a799d1b21ccc" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c23dff3#c23dff3b58fa635a7fb4b5e2dbc18ebd76cfbd53" dependencies = [ "anyhow", "async-trait", @@ -1151,7 +1151,7 @@ dependencies = [ [[package]] name = "collab-entity" version = "0.1.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=e08a66d3#e08a66d341889f9b3e0a79b47315a799d1b21ccc" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c23dff3#c23dff3b58fa635a7fb4b5e2dbc18ebd76cfbd53" dependencies = [ "anyhow", "bytes", diff --git a/Cargo.toml b/Cargo.toml index d0c11b4c..bfbebe39 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -132,8 +132,8 @@ lto = false opt-level = 3 [patch.crates-io] -collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "e08a66d3" } -collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "e08a66d3" } +collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c23dff3" } +collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c23dff3" } # Comment the above and uncomment the below to use local version of collab by cloning the repo and placing it in libs folder #collab = { path = "libs/AppFlowy-Collab/collab" } diff --git a/libs/client-api/src/collab_sync/sink.rs b/libs/client-api/src/collab_sync/sink.rs index acdd4a1f..befe5b14 100644 --- a/libs/client-api/src/collab_sync/sink.rs +++ b/libs/client-api/src/collab_sync/sink.rs @@ -199,14 +199,17 @@ where // the fix interval. if let SinkStrategy::FixInterval(duration) = &self.config.strategy { let elapsed = self.instant.lock().await.elapsed(); - // trace!( - // "elapsed interval: {:?}, fix interval: {:?}", - // elapsed, - // duration - // ); + // If the elapsed time is less than the fixed interval or if the remaining time until the fixed + // interval is less than the send timeout, return. if elapsed < *duration { return Ok(()); } + + if elapsed < self.config.send_timeout { + tokio::time::sleep(Duration::from_secs(1)).await; + self.notify(); + return Ok(()); + } } // Reset the instant if the strategy is [SinkStrategy::FixInterval]. diff --git a/libs/client-api/src/http.rs b/libs/client-api/src/http.rs index 1f683d02..939945bc 100644 --- a/libs/client-api/src/http.rs +++ b/libs/client-api/src/http.rs @@ -449,12 +449,20 @@ impl Client { .refresh_token .as_str() .to_owned(); - let access_token_resp = self + match self .gotrue_client .token(&Grant::RefreshToken(RefreshTokenGrant { refresh_token })) - .await?; - self.token.write().set(access_token_resp); - Ok(()) + .await + { + Ok(access_token_resp) => { + self.token.write().set(access_token_resp); + Ok(()) + }, + Err(err) => { + self.token.write().unset(); + Err(AppError::from(err)) + }, + } } #[instrument(level = "debug", skip_all, err)] diff --git a/libs/client-api/src/notify.rs b/libs/client-api/src/notify.rs index 7f3cd3ce..0f2bc5b9 100644 --- a/libs/client-api/src/notify.rs +++ b/libs/client-api/src/notify.rs @@ -42,8 +42,14 @@ impl ClientToken { /// - `token`: The new `AccessTokenResponse` to be set. pub(crate) fn set(&mut self, new_token: AccessTokenResponse) { tracing::trace!("Set new access token: {:?}", new_token); + let is_new = match &self.token { + None => true, + Some(old_token) => old_token.access_token != new_token.access_token, + }; self.token = Some(new_token); - let _ = self.sender.send(TokenState::Refresh); + if is_new { + let _ = self.sender.send(TokenState::Refresh); + } } /// Unsets the current access token and notifies receivers of the invalidation. @@ -54,7 +60,6 @@ impl ClientToken { #[allow(dead_code)] pub(crate) fn unset(&mut self) { if self.token.is_some() { - tracing::trace!("Set new access token: {:?}", self.token); self.token = None; let _ = self.sender.send(TokenState::Invalid); } diff --git a/libs/client-api/src/ws/client.rs b/libs/client-api/src/ws/client.rs index 3ed2556e..38aba734 100644 --- a/libs/client-api/src/ws/client.rs +++ b/libs/client-api/src/ws/client.rs @@ -18,7 +18,7 @@ use tokio_retry::strategy::FixedInterval; use tokio_retry::{Condition, RetryIf}; use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode; use tokio_tungstenite::tungstenite::protocol::CloseFrame; -use tokio_tungstenite::tungstenite::Message; +use tokio_tungstenite::tungstenite::{Error, Message}; use tokio_tungstenite::MaybeTlsStream; use tracing::{debug, error, info, trace, warn}; @@ -157,6 +157,23 @@ impl WSClient { }); let mut sink_rx = self.sender.subscribe(); + + let weak_state_notify = Arc::downgrade(&self.state_notify); + let handle_ws_error = move |error: &Error| { + error!("websocket error: {:?}", error); + match weak_state_notify.upgrade() { + None => { + error!("ws state_notify is dropped"); + }, + Some(state_notify) => match &error { + Error::ConnectionClosed | Error::AlreadyClosed => { + state_notify.lock().set_state(ConnectState::Disconnected); + }, + _ => {}, + }, + } + }; + tokio::spawn(async move { loop { tokio::select! { @@ -165,12 +182,9 @@ impl WSClient { break; }, Ok(msg) = sink_rx.recv() => { - match sink.send(msg).await { - Ok(_) => {}, - Err(e) => { - error!("Failed to send message via websocket: {:?}", e); - break; - }, + if let Err(err) = sink.send(msg).await { + handle_ws_error(&err); + break; } } } diff --git a/libs/client-api/src/ws/retry.rs b/libs/client-api/src/ws/retry.rs index 6ec5a434..dc5738cc 100644 --- a/libs/client-api/src/ws/retry.rs +++ b/libs/client-api/src/ws/retry.rs @@ -5,7 +5,7 @@ use crate::ws::WSError; use tokio::net::TcpStream; use tokio_retry::Action; use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream}; -use tracing::debug; +use tracing::{debug, error}; pub(crate) struct ConnectAction { addr: String, @@ -27,10 +27,12 @@ impl Action for ConnectAction { Box::pin(async move { debug!("🔵Connecting to websocket: {}", cloned_addr); match connect_async(&cloned_addr).await { - Ok((stream, _response)) => Ok(stream), + Ok((stream, _response)) => { + debug!("connect success"); + Ok(stream) + }, Err(e) => { - // - tracing::error!("🔴connect error: {:?}", e.to_string()); + error!("connect failed: {:?}", e.to_string()); Err(e.into()) }, } diff --git a/libs/realtime/src/client.rs b/libs/realtime/src/client.rs index f1a3e3b3..81537f9b 100644 --- a/libs/realtime/src/client.rs +++ b/libs/realtime/src/client.rs @@ -12,6 +12,7 @@ use std::ops::Deref; use crate::collaborate::CollabServer; use crate::error::RealtimeError; +use actix_web_actors::ws::ProtocolError; use database::collab::CollabStorage; use realtime_entity::collab_msg::CollabMessage; use std::time::{Duration, Instant}; @@ -141,8 +142,10 @@ where fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { let msg = match msg { Err(err) => { - error!("Websocket ProtocolError: {:?}", err); - ctx.stop(); + error!("Websocket stream error: {}", err); + if let ProtocolError::Overflow = err { + ctx.stop(); + } return; }, Ok(msg) => msg, @@ -162,7 +165,7 @@ where ctx.close(reason); ctx.stop(); }, - ws::Message::Continuation(_) => ctx.stop(), + ws::Message::Continuation(_) => {}, ws::Message::Nop => (), } } diff --git a/libs/realtime/src/collaborate/server.rs b/libs/realtime/src/collaborate/server.rs index 707bdd82..95f47794 100644 --- a/libs/realtime/src/collaborate/server.rs +++ b/libs/realtime/src/collaborate/server.rs @@ -173,9 +173,8 @@ async fn broadcast_message( { if let Some(client_stream) = client_streams.read().await.get(&client_msg.user) { trace!( - "[💭Server]: receives client message: [oid:{}|msg_id:{:?}]", - client_msg.content.object_id(), - client_msg.content.msg_id() + "[💭Server]: receives client message: {}", + client_msg.content, ); match client_stream .stream_tx diff --git a/src/api/ws.rs b/src/api/ws.rs index 8663c5a9..19bfb81f 100644 --- a/src/api/ws.rs +++ b/src/api/ws.rs @@ -18,6 +18,8 @@ pub fn ws_scope() -> Scope { web::scope("/ws").service(establish_ws_connection) } +const MAX_FRAME_SIZE: usize = 65_536; // 64 KiB + #[get("/{token}/{device_id}")] pub async fn establish_ws_connection( request: HttpRequest, @@ -26,7 +28,7 @@ pub async fn establish_ws_connection( state: Data, server: Data>>>, ) -> Result { - tracing::info!("ws connect: {:?}", request); + tracing::info!("receive ws connect: {:?}", request); let (token, device_id) = path.into_inner(); let auth = authorization_from_token(token.as_str(), &state)?; let user_uuid = UserUuid::from_auth(auth)?; @@ -38,7 +40,10 @@ pub async fn establish_ws_connection( Duration::from_secs(state.config.websocket.client_timeout as u64), ); - match ws::start(client, &request, payload) { + match ws::WsResponseBuilder::new(client, &request, payload) + .frame_size(MAX_FRAME_SIZE) + .start() + { Ok(response) => Ok(response), Err(e) => { tracing::error!("🔴ws connection error: {:?}", e);