chore: try to not block actor mailbox (#534)

* chore: add test

* chore: spawn task to handle message

* chore: update error message
This commit is contained in:
Nathan.fooo 2024-05-08 16:54:05 +08:00 committed by GitHub
parent 359433f14c
commit 70262a1ac4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 164 additions and 103 deletions

View File

@ -110,6 +110,7 @@ client-api-test-util = { path = "libs/client-api-test-util", features = ["collab
client-api = { path = "libs/client-api", features = ["collab-sync", "test_util", "sync_verbose_log", "test_fast_sync", "enable_brotli"] }
opener = "0.6.1"
image = "0.23.14"
collab-rt-entity.workspace = true
[[bin]]
name = "appflowy_cloud"

View File

@ -689,10 +689,8 @@ impl TestClient {
}
#[cfg(not(target_arch = "wasm32"))]
pub async fn post_realtime_message(
&self,
message: client_websocket::Message,
) -> Result<(), AppResponseError> {
pub async fn post_realtime_binary(&self, message: Vec<u8>) -> Result<(), AppResponseError> {
let message = client_websocket::Message::binary(message);
self
.api_client
.post_realtime_msg(&self.device_id, message)

View File

@ -85,11 +85,12 @@ pub async fn insert_into_af_collab(
uid,
)
.execute(tx.deref_mut())
.await
.context(format!(
"user:{} update af_collab:{} failed",
uid, params.object_id
))?;
.await.map_err(|err| {
AppError::Internal(anyhow!(
"Update af_collab failed: workspace_id:{}, uid:{}, object_id:{}, collab_type:{}. error: {:?}",
workspace_id, uid, params.object_id, params.collab_type, err,
))
})?;
} else {
return Err(AppError::Internal(anyhow!(
"workspace_id is not match. expect workspace_id:{}, but receive:{}",
@ -148,11 +149,12 @@ pub async fn insert_into_af_collab(
workspace_id,
)
.execute(tx.deref_mut())
.await
.context(format!(
"Insert new af_collab failed: workspace_id:{}, uid:{}, object_id:{}, collab_type:{}",
workspace_id, uid, params.object_id, params.collab_type
))?;
.await.map_err(|err| {
AppError::Internal(anyhow!(
"Insert new af_collab failed: workspace_id:{}, uid:{}, object_id:{}, collab_type:{}. payload len:{} error: {:?}",
workspace_id, uid, params.object_id, params.collab_type, params.encoded_collab_v1.len(), err,
))
})?;
},
}

View File

@ -1,4 +1,5 @@
use collab::error::CollabError;
use std::fmt::Display;
#[derive(Debug, thiserror::Error)]
pub enum RealtimeError {
@ -41,6 +42,9 @@ pub enum RealtimeError {
#[error("group is not exist: {0}")]
GroupNotFound(String),
#[error("Create group failed:{0}")]
CreateGroupFailed(CreateGroupFailedReason),
#[error("Lack of required collab data: {0}")]
NoRequiredCollabData(String),
@ -50,13 +54,33 @@ pub enum RealtimeError {
#[error("Acquire lock timeout")]
LockTimeout,
#[error("Collab workspace id not match: expect {expect}, actual {actual}")]
CollabWorkspaceIdNotMatch { expect: String, actual: String },
#[error("Internal failure: {0}")]
Internal(#[from] anyhow::Error),
}
#[derive(Debug)]
pub enum CreateGroupFailedReason {
CollabWorkspaceIdNotMatch { expect: String, actual: String },
CannotGetCollabData,
}
impl Display for CreateGroupFailedReason {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
CreateGroupFailedReason::CollabWorkspaceIdNotMatch { expect, actual } => {
write!(
f,
"Collab workspace id not match: expect {}, actual {}",
expect, actual
)
},
CreateGroupFailedReason::CannotGetCollabData => {
write!(f, "Cannot get collab data")
},
}
}
}
impl RealtimeError {
pub fn is_too_many_message(&self) -> bool {
matches!(self, RealtimeError::TooManyMessage(_))
@ -65,4 +89,7 @@ impl RealtimeError {
pub fn is_lock_timeout(&self) -> bool {
matches!(self, RealtimeError::LockTimeout)
}
pub fn is_create_group_failed(&self) -> bool {
matches!(self, RealtimeError::CreateGroupFailed(_))
}
}

View File

@ -12,7 +12,7 @@ use crate::client::client_msg_router::ClientMessageRouter;
use collab::entity::EncodedCollab;
use futures_util::StreamExt;
use std::sync::Arc;
use tracing::{error, instrument, trace, warn};
use tracing::{instrument, trace, warn};
/// Using [GroupCommand] to interact with the group
/// - HandleClientCollabMessage: Handle the client message
@ -22,6 +22,7 @@ pub enum GroupCommand {
user: RealtimeUser,
object_id: String,
collab_messages: Vec<ClientCollabMessage>,
ret: tokio::sync::oneshot::Sender<Result<(), RealtimeError>>,
},
EncodeCollab {
object_id: String,
@ -68,12 +69,13 @@ where
user,
object_id,
collab_messages,
ret,
} => {
if let Err(err) = self
let result = self
.handle_client_collab_message(&user, object_id, collab_messages)
.await
{
error!("handle client message error: {}", err);
.await;
if let Err(err) = ret.send(result) {
warn!("Send handle client collab message result fail: {:?}", err);
}
},
GroupCommand::EncodeCollab { object_id, ret } => {

View File

@ -1,7 +1,7 @@
use crate::group::group_init::CollabGroup;
use crate::group::state::GroupManagementState;
use crate::error::RealtimeError;
use crate::error::{CreateGroupFailedReason, RealtimeError};
use crate::metrics::CollabMetricsCalculate;
use crate::RealtimeAccessControl;
use app_error::AppError;
@ -119,10 +119,11 @@ where
.await
{
if metadata.workspace_id != workspace_id {
let err = RealtimeError::CollabWorkspaceIdNotMatch {
expect: metadata.workspace_id,
actual: workspace_id.to_string(),
};
let err =
RealtimeError::CreateGroupFailed(CreateGroupFailedReason::CollabWorkspaceIdNotMatch {
expect: metadata.workspace_id,
actual: workspace_id.to_string(),
});
warn!(
"[Realtime]:user_id:{},object_id:{}:{},error:{}",
uid, object_id, collab_type, err
@ -145,7 +146,9 @@ where
false,
))
} else {
return Err(RealtimeError::Internal(err.into()));
return Err(RealtimeError::CreateGroupFailed(
CreateGroupFailedReason::CannotGetCollabData,
));
}
},
};

View File

@ -183,18 +183,34 @@ where
},
};
if let Err(err) = sender
.send(GroupCommand::HandleClientCollabMessage {
user: user.clone(),
object_id,
collab_messages,
})
.await
{
// it should not happen. Because the receiver is always running before acquiring the sender.
// Otherwise, the GroupCommandRunner might not be ready to handle the message.
error!("Send message to group fail: {}", err);
}
let cloned_user = user.clone();
// Create a new task to send a message to the group command runner without waiting for the
// result. This approach is used to prevent potential issues with the actor's mailbox in
// single-threaded runtimes (like actix-web actors). By spawning a task, the actor can
// immediately proceed to process the next message.
tokio::spawn(async move {
let (tx, rx) = tokio::sync::oneshot::channel();
match sender
.send(GroupCommand::HandleClientCollabMessage {
user: cloned_user,
object_id,
collab_messages,
ret: tx,
})
.await
{
Ok(_) => {
if let Ok(Err(err)) = rx.await {
error!("Handle client collab message fail: {}", err);
}
},
Err(err) => {
// it should not happen. Because the receiver is always running before acquiring the sender.
// Otherwise, the GroupCommandRunner might not be ready to handle the message.
error!("Send message to group fail: {}", err);
},
}
});
}
Ok(())

View File

@ -28,8 +28,8 @@ use shared_entity::dto::workspace_dto::*;
use shared_entity::response::AppResponseError;
use shared_entity::response::{AppResponse, JsonAppResponse};
use sqlx::types::uuid;
use std::time::Duration;
use tokio::time::{sleep, Instant};
use tokio::time::Instant;
use tokio_stream::StreamExt;
use tokio_tungstenite::tungstenite::Message;
use tracing::{error, event, instrument};
@ -911,44 +911,24 @@ async fn post_realtime_message_stream_handler(
event!(tracing::Level::INFO, "message len: {}", bytes.len());
let device_id = device_id.to_string();
let message = parser_realtime_msg(bytes.freeze(), req.clone()).await?;
let mut stream_message = Some(ClientStreamMessage {
let stream_message = ClientStreamMessage {
uid,
device_id,
message,
});
const MAX_RETRIES: usize = 3;
const RETRY_DELAY: Duration = Duration::from_secs(2);
};
let mut attempts = 0;
while attempts < MAX_RETRIES {
match stream_message.take() {
None => {
return Err(AppError::Internal(anyhow!("Unexpected empty stream message")).into());
},
Some(message_to_send) => {
match server.try_send(message_to_send) {
Ok(_) => return Ok(Json(AppResponse::Ok())),
Err(err) if attempts < MAX_RETRIES - 1 => {
attempts += 1;
stream_message = Some(err.into_inner());
sleep(RETRY_DELAY).await;
},
Err(err) => {
return Err(
AppError::Internal(anyhow!(
"Failed to send client stream message to websocket server after {} attempts: {}",
// attempts starts from 0, so add 1 for accurate count
attempts + 1,
err
))
.into(),
);
},
}
},
}
// When the server is under heavy load, try_send may fail. In client side, it will retry to send
// the message later.
match server.try_send(stream_message) {
Ok(_) => return Ok(Json(AppResponse::Ok())),
Err(err) => Err(
AppError::Internal(anyhow!(
"Failed to send message to websocket server, error:{}",
err
))
.into(),
),
}
Err(AppError::Internal(anyhow!("Failed to send message to websocket server")).into())
}
async fn get_workspace_usage_handler(

View File

@ -226,7 +226,7 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result<A
metrics.collab_metrics.clone(),
)
.await;
let collab_storage = Arc::new(CollabStorageImpl::new(
let collab_access_control_storage = Arc::new(CollabStorageImpl::new(
collab_cache.clone(),
collab_storage_access_control,
snapshot_control,
@ -256,7 +256,7 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result<A
gotrue_client,
redis_connection_manager: redis_conn_manager,
collab_cache,
collab_access_control_storage: collab_storage,
collab_access_control_storage,
collab_access_control,
workspace_access_control,
bucket_storage,

View File

@ -122,12 +122,13 @@ where
message,
} = client_msg;
// Get the real-time user by the device ID and user ID. If the user is not found, which means
// the user is not connected to the real-time server via websocket.
let user = self.get_user_by_device(&UserDevice::new(&device_id, uid));
match (user, message.transform()) {
(Some(user), Ok(messages)) => self.handle_client_message(user, messages),
(None, _) => {
warn!("user:{}|device:{} not found", uid, device_id);
warn!("Can't find the realtime user uid:{}, device:{}. User should connect via websocket before", uid,device_id);
Box::pin(async { Ok(()) })
},
(Some(_), Err(err)) => {

View File

@ -377,3 +377,35 @@ where
self.snapshot_control.get_collab_snapshot_list(oid).await
}
}
#[allow(dead_code)]
#[derive(Clone)]
pub struct CollabUserState {
redis_conn_manager: RedisConnectionManager,
}
#[allow(dead_code)]
impl CollabUserState {
async fn add_connected_user(&self, uid: i64, device_id: &str) -> AppResult<()> {
let mut conn = self.redis_conn_manager.clone();
redis::cmd("HSET")
.arg(uid)
.arg(device_id)
.arg("true")
.query_async(&mut conn)
.await
.map_err(|err| AppError::Internal(err.into()))?;
Ok(())
}
async fn remove_connected_user(&self, uid: i64, device_id: &str) -> AppResult<()> {
let mut conn = self.redis_conn_manager.clone();
redis::cmd("HDEL")
.arg(uid)
.arg(device_id)
.query_async(&mut conn)
.await
.map_err(|err| AppError::Internal(err.into()))?;
Ok(())
}
}

View File

@ -576,29 +576,27 @@ async fn post_realtime_message_test() {
}
}
#[tokio::test]
async fn collab_flush_test() {
let mut new_user = TestClient::new_user().await;
let object_id = Uuid::new_v4().to_string();
let workspace_id = new_user.workspace_id().await;
new_user
.open_collab(&workspace_id, &object_id, CollabType::Document)
.await;
// the default flush_per_update is 100 that defined in [WriteConfig]
// so we need to write 200 times to trigger the flush
for i in 0..200 {
new_user
.collabs
.get_mut(&object_id)
.unwrap()
.mutex_collab
.lock()
.insert(&i.to_string(), i.to_string());
sleep(Duration::from_millis(300)).await;
}
// TODO(nathan): assert the collab content in disk
}
// #[tokio::test]
// async fn post_large_num_of_realtime_message_request_test() {
// let client = Arc::new(TestClient::new_user().await);
// let mut handles = vec![];
// for _ in 0..100 {
// let cloned_client = client.clone();
// let handle = tokio::spawn(async move {
// let message = RealtimeMessage::Collab(CollabMessage::ClientUpdateSync(UpdateSync::new(
// CollabOrigin::Empty,
// "fake_object_id".to_string(),
// generate_random_bytes(1024),
// 1,
// )))
// .encode()
// .unwrap();
// cloned_client.post_realtime_binary(message).await.unwrap();
// });
// handles.push(handle);
// }
// futures::future::join_all(handles).await;
// }
#[tokio::test]
async fn simulate_10_offline_user_connect_and_then_sync_document_test() {
@ -638,8 +636,9 @@ async fn simulate_10_offline_user_connect_and_then_sync_document_test() {
.mutex_collab
.lock()
.insert(&i.to_string(), i.to_string());
sleep(Duration::from_millis(30)).await;
sleep(Duration::from_millis(60)).await;
}
client.wait_object_sync_complete(&object_id).await.unwrap();
});
tasks.push(task);
}