diff --git a/Cargo.toml b/Cargo.toml index 4ffe0e46..0aaeaeb8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -110,6 +110,7 @@ client-api-test-util = { path = "libs/client-api-test-util", features = ["collab client-api = { path = "libs/client-api", features = ["collab-sync", "test_util", "sync_verbose_log", "test_fast_sync", "enable_brotli"] } opener = "0.6.1" image = "0.23.14" +collab-rt-entity.workspace = true [[bin]] name = "appflowy_cloud" diff --git a/libs/client-api-test-util/src/test_client.rs b/libs/client-api-test-util/src/test_client.rs index 8f0e8fb4..9577f43c 100644 --- a/libs/client-api-test-util/src/test_client.rs +++ b/libs/client-api-test-util/src/test_client.rs @@ -689,10 +689,8 @@ impl TestClient { } #[cfg(not(target_arch = "wasm32"))] - pub async fn post_realtime_message( - &self, - message: client_websocket::Message, - ) -> Result<(), AppResponseError> { + pub async fn post_realtime_binary(&self, message: Vec) -> Result<(), AppResponseError> { + let message = client_websocket::Message::binary(message); self .api_client .post_realtime_msg(&self.device_id, message) diff --git a/libs/database/src/collab/collab_db_ops.rs b/libs/database/src/collab/collab_db_ops.rs index b6ec343b..98095ad2 100644 --- a/libs/database/src/collab/collab_db_ops.rs +++ b/libs/database/src/collab/collab_db_ops.rs @@ -85,11 +85,12 @@ pub async fn insert_into_af_collab( uid, ) .execute(tx.deref_mut()) - .await - .context(format!( - "user:{} update af_collab:{} failed", - uid, params.object_id - ))?; + .await.map_err(|err| { + AppError::Internal(anyhow!( + "Update af_collab failed: workspace_id:{}, uid:{}, object_id:{}, collab_type:{}. error: {:?}", + workspace_id, uid, params.object_id, params.collab_type, err, + )) + })?; } else { return Err(AppError::Internal(anyhow!( "workspace_id is not match. expect workspace_id:{}, but receive:{}", @@ -148,11 +149,12 @@ pub async fn insert_into_af_collab( workspace_id, ) .execute(tx.deref_mut()) - .await - .context(format!( - "Insert new af_collab failed: workspace_id:{}, uid:{}, object_id:{}, collab_type:{}", - workspace_id, uid, params.object_id, params.collab_type - ))?; + .await.map_err(|err| { + AppError::Internal(anyhow!( + "Insert new af_collab failed: workspace_id:{}, uid:{}, object_id:{}, collab_type:{}. payload len:{} error: {:?}", + workspace_id, uid, params.object_id, params.collab_type, params.encoded_collab_v1.len(), err, + )) + })?; }, } diff --git a/services/appflowy-collaborate/src/error.rs b/services/appflowy-collaborate/src/error.rs index 4cdc32f7..333fbd12 100644 --- a/services/appflowy-collaborate/src/error.rs +++ b/services/appflowy-collaborate/src/error.rs @@ -1,4 +1,5 @@ use collab::error::CollabError; +use std::fmt::Display; #[derive(Debug, thiserror::Error)] pub enum RealtimeError { @@ -41,6 +42,9 @@ pub enum RealtimeError { #[error("group is not exist: {0}")] GroupNotFound(String), + #[error("Create group failed:{0}")] + CreateGroupFailed(CreateGroupFailedReason), + #[error("Lack of required collab data: {0}")] NoRequiredCollabData(String), @@ -50,13 +54,33 @@ pub enum RealtimeError { #[error("Acquire lock timeout")] LockTimeout, - #[error("Collab workspace id not match: expect {expect}, actual {actual}")] - CollabWorkspaceIdNotMatch { expect: String, actual: String }, - #[error("Internal failure: {0}")] Internal(#[from] anyhow::Error), } +#[derive(Debug)] +pub enum CreateGroupFailedReason { + CollabWorkspaceIdNotMatch { expect: String, actual: String }, + CannotGetCollabData, +} + +impl Display for CreateGroupFailedReason { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + CreateGroupFailedReason::CollabWorkspaceIdNotMatch { expect, actual } => { + write!( + f, + "Collab workspace id not match: expect {}, actual {}", + expect, actual + ) + }, + CreateGroupFailedReason::CannotGetCollabData => { + write!(f, "Cannot get collab data") + }, + } + } +} + impl RealtimeError { pub fn is_too_many_message(&self) -> bool { matches!(self, RealtimeError::TooManyMessage(_)) @@ -65,4 +89,7 @@ impl RealtimeError { pub fn is_lock_timeout(&self) -> bool { matches!(self, RealtimeError::LockTimeout) } + pub fn is_create_group_failed(&self) -> bool { + matches!(self, RealtimeError::CreateGroupFailed(_)) + } } diff --git a/services/appflowy-collaborate/src/group/cmd.rs b/services/appflowy-collaborate/src/group/cmd.rs index 7b08d394..821aa85d 100644 --- a/services/appflowy-collaborate/src/group/cmd.rs +++ b/services/appflowy-collaborate/src/group/cmd.rs @@ -12,7 +12,7 @@ use crate::client::client_msg_router::ClientMessageRouter; use collab::entity::EncodedCollab; use futures_util::StreamExt; use std::sync::Arc; -use tracing::{error, instrument, trace, warn}; +use tracing::{instrument, trace, warn}; /// Using [GroupCommand] to interact with the group /// - HandleClientCollabMessage: Handle the client message @@ -22,6 +22,7 @@ pub enum GroupCommand { user: RealtimeUser, object_id: String, collab_messages: Vec, + ret: tokio::sync::oneshot::Sender>, }, EncodeCollab { object_id: String, @@ -68,12 +69,13 @@ where user, object_id, collab_messages, + ret, } => { - if let Err(err) = self + let result = self .handle_client_collab_message(&user, object_id, collab_messages) - .await - { - error!("handle client message error: {}", err); + .await; + if let Err(err) = ret.send(result) { + warn!("Send handle client collab message result fail: {:?}", err); } }, GroupCommand::EncodeCollab { object_id, ret } => { diff --git a/services/appflowy-collaborate/src/group/manager.rs b/services/appflowy-collaborate/src/group/manager.rs index cd85b67e..d90dcb54 100644 --- a/services/appflowy-collaborate/src/group/manager.rs +++ b/services/appflowy-collaborate/src/group/manager.rs @@ -1,7 +1,7 @@ use crate::group::group_init::CollabGroup; use crate::group::state::GroupManagementState; -use crate::error::RealtimeError; +use crate::error::{CreateGroupFailedReason, RealtimeError}; use crate::metrics::CollabMetricsCalculate; use crate::RealtimeAccessControl; use app_error::AppError; @@ -119,10 +119,11 @@ where .await { if metadata.workspace_id != workspace_id { - let err = RealtimeError::CollabWorkspaceIdNotMatch { - expect: metadata.workspace_id, - actual: workspace_id.to_string(), - }; + let err = + RealtimeError::CreateGroupFailed(CreateGroupFailedReason::CollabWorkspaceIdNotMatch { + expect: metadata.workspace_id, + actual: workspace_id.to_string(), + }); warn!( "[Realtime]:user_id:{},object_id:{}:{},error:{}", uid, object_id, collab_type, err @@ -145,7 +146,9 @@ where false, )) } else { - return Err(RealtimeError::Internal(err.into())); + return Err(RealtimeError::CreateGroupFailed( + CreateGroupFailedReason::CannotGetCollabData, + )); } }, }; diff --git a/services/appflowy-collaborate/src/rt_server.rs b/services/appflowy-collaborate/src/rt_server.rs index 4076d35a..d11b7c00 100644 --- a/services/appflowy-collaborate/src/rt_server.rs +++ b/services/appflowy-collaborate/src/rt_server.rs @@ -183,18 +183,34 @@ where }, }; - if let Err(err) = sender - .send(GroupCommand::HandleClientCollabMessage { - user: user.clone(), - object_id, - collab_messages, - }) - .await - { - // it should not happen. Because the receiver is always running before acquiring the sender. - // Otherwise, the GroupCommandRunner might not be ready to handle the message. - error!("Send message to group fail: {}", err); - } + let cloned_user = user.clone(); + // Create a new task to send a message to the group command runner without waiting for the + // result. This approach is used to prevent potential issues with the actor's mailbox in + // single-threaded runtimes (like actix-web actors). By spawning a task, the actor can + // immediately proceed to process the next message. + tokio::spawn(async move { + let (tx, rx) = tokio::sync::oneshot::channel(); + match sender + .send(GroupCommand::HandleClientCollabMessage { + user: cloned_user, + object_id, + collab_messages, + ret: tx, + }) + .await + { + Ok(_) => { + if let Ok(Err(err)) = rx.await { + error!("Handle client collab message fail: {}", err); + } + }, + Err(err) => { + // it should not happen. Because the receiver is always running before acquiring the sender. + // Otherwise, the GroupCommandRunner might not be ready to handle the message. + error!("Send message to group fail: {}", err); + }, + } + }); } Ok(()) diff --git a/src/api/workspace.rs b/src/api/workspace.rs index c47b2716..d6eae1ca 100644 --- a/src/api/workspace.rs +++ b/src/api/workspace.rs @@ -28,8 +28,8 @@ use shared_entity::dto::workspace_dto::*; use shared_entity::response::AppResponseError; use shared_entity::response::{AppResponse, JsonAppResponse}; use sqlx::types::uuid; -use std::time::Duration; -use tokio::time::{sleep, Instant}; + +use tokio::time::Instant; use tokio_stream::StreamExt; use tokio_tungstenite::tungstenite::Message; use tracing::{error, event, instrument}; @@ -911,44 +911,24 @@ async fn post_realtime_message_stream_handler( event!(tracing::Level::INFO, "message len: {}", bytes.len()); let device_id = device_id.to_string(); let message = parser_realtime_msg(bytes.freeze(), req.clone()).await?; - let mut stream_message = Some(ClientStreamMessage { + let stream_message = ClientStreamMessage { uid, device_id, message, - }); - const MAX_RETRIES: usize = 3; - const RETRY_DELAY: Duration = Duration::from_secs(2); + }; - let mut attempts = 0; - while attempts < MAX_RETRIES { - match stream_message.take() { - None => { - return Err(AppError::Internal(anyhow!("Unexpected empty stream message")).into()); - }, - Some(message_to_send) => { - match server.try_send(message_to_send) { - Ok(_) => return Ok(Json(AppResponse::Ok())), - Err(err) if attempts < MAX_RETRIES - 1 => { - attempts += 1; - stream_message = Some(err.into_inner()); - sleep(RETRY_DELAY).await; - }, - Err(err) => { - return Err( - AppError::Internal(anyhow!( - "Failed to send client stream message to websocket server after {} attempts: {}", - // attempts starts from 0, so add 1 for accurate count - attempts + 1, - err - )) - .into(), - ); - }, - } - }, - } + // When the server is under heavy load, try_send may fail. In client side, it will retry to send + // the message later. + match server.try_send(stream_message) { + Ok(_) => return Ok(Json(AppResponse::Ok())), + Err(err) => Err( + AppError::Internal(anyhow!( + "Failed to send message to websocket server, error:{}", + err + )) + .into(), + ), } - Err(AppError::Internal(anyhow!("Failed to send message to websocket server")).into()) } async fn get_workspace_usage_handler( diff --git a/src/application.rs b/src/application.rs index 9213788e..19c26104 100644 --- a/src/application.rs +++ b/src/application.rs @@ -226,7 +226,7 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result Result self.handle_client_message(user, messages), (None, _) => { - warn!("user:{}|device:{} not found", uid, device_id); + warn!("Can't find the realtime user uid:{}, device:{}. User should connect via websocket before", uid,device_id); Box::pin(async { Ok(()) }) }, (Some(_), Err(err)) => { diff --git a/src/biz/collab/storage.rs b/src/biz/collab/storage.rs index 8d958f2e..d67c0086 100644 --- a/src/biz/collab/storage.rs +++ b/src/biz/collab/storage.rs @@ -377,3 +377,35 @@ where self.snapshot_control.get_collab_snapshot_list(oid).await } } + +#[allow(dead_code)] +#[derive(Clone)] +pub struct CollabUserState { + redis_conn_manager: RedisConnectionManager, +} + +#[allow(dead_code)] +impl CollabUserState { + async fn add_connected_user(&self, uid: i64, device_id: &str) -> AppResult<()> { + let mut conn = self.redis_conn_manager.clone(); + redis::cmd("HSET") + .arg(uid) + .arg(device_id) + .arg("true") + .query_async(&mut conn) + .await + .map_err(|err| AppError::Internal(err.into()))?; + Ok(()) + } + + async fn remove_connected_user(&self, uid: i64, device_id: &str) -> AppResult<()> { + let mut conn = self.redis_conn_manager.clone(); + redis::cmd("HDEL") + .arg(uid) + .arg(device_id) + .query_async(&mut conn) + .await + .map_err(|err| AppError::Internal(err.into()))?; + Ok(()) + } +} diff --git a/tests/collab/single_device_edit.rs b/tests/collab/single_device_edit.rs index 24e60dcc..74740203 100644 --- a/tests/collab/single_device_edit.rs +++ b/tests/collab/single_device_edit.rs @@ -576,29 +576,27 @@ async fn post_realtime_message_test() { } } -#[tokio::test] -async fn collab_flush_test() { - let mut new_user = TestClient::new_user().await; - let object_id = Uuid::new_v4().to_string(); - let workspace_id = new_user.workspace_id().await; - new_user - .open_collab(&workspace_id, &object_id, CollabType::Document) - .await; - - // the default flush_per_update is 100 that defined in [WriteConfig] - // so we need to write 200 times to trigger the flush - for i in 0..200 { - new_user - .collabs - .get_mut(&object_id) - .unwrap() - .mutex_collab - .lock() - .insert(&i.to_string(), i.to_string()); - sleep(Duration::from_millis(300)).await; - } - // TODO(nathan): assert the collab content in disk -} +// #[tokio::test] +// async fn post_large_num_of_realtime_message_request_test() { +// let client = Arc::new(TestClient::new_user().await); +// let mut handles = vec![]; +// for _ in 0..100 { +// let cloned_client = client.clone(); +// let handle = tokio::spawn(async move { +// let message = RealtimeMessage::Collab(CollabMessage::ClientUpdateSync(UpdateSync::new( +// CollabOrigin::Empty, +// "fake_object_id".to_string(), +// generate_random_bytes(1024), +// 1, +// ))) +// .encode() +// .unwrap(); +// cloned_client.post_realtime_binary(message).await.unwrap(); +// }); +// handles.push(handle); +// } +// futures::future::join_all(handles).await; +// } #[tokio::test] async fn simulate_10_offline_user_connect_and_then_sync_document_test() { @@ -638,8 +636,9 @@ async fn simulate_10_offline_user_connect_and_then_sync_document_test() { .mutex_collab .lock() .insert(&i.to_string(), i.to_string()); - sleep(Duration::from_millis(30)).await; + sleep(Duration::from_millis(60)).await; } + client.wait_object_sync_complete(&object_id).await.unwrap(); }); tasks.push(task); }