Merge branch 'main' into fix/batch-get-collab
This commit is contained in:
commit
cb31a863ac
|
|
@ -154,6 +154,9 @@ pub trait CollabStorage: Send + Sync + 'static {
|
|||
|
||||
/// Returns list of snapshots for given object_id in descending order of creation time.
|
||||
async fn get_collab_snapshot_list(&self, oid: &str) -> AppResult<AFSnapshotMetas>;
|
||||
|
||||
async fn add_connected_user(&self, uid: i64, device_id: &str);
|
||||
async fn remove_connected_user(&self, uid: i64, device_id: &str);
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
|
@ -268,6 +271,14 @@ where
|
|||
async fn get_collab_snapshot_list(&self, oid: &str) -> AppResult<AFSnapshotMetas> {
|
||||
self.as_ref().get_collab_snapshot_list(oid).await
|
||||
}
|
||||
|
||||
async fn add_connected_user(&self, uid: i64, device_id: &str) {
|
||||
self.as_ref().add_connected_user(uid, device_id).await
|
||||
}
|
||||
|
||||
async fn remove_connected_user(&self, uid: i64, device_id: &str) {
|
||||
self.as_ref().remove_connected_user(uid, device_id).await
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ mod group;
|
|||
mod metrics;
|
||||
mod permission;
|
||||
mod rt_server;
|
||||
pub mod shared_state;
|
||||
mod util;
|
||||
|
||||
pub use metrics::*;
|
||||
|
|
|
|||
|
|
@ -32,6 +32,7 @@ pub struct CollaborationServer<S, AC> {
|
|||
connect_state: ConnectState,
|
||||
group_sender_by_object_id: Arc<DashMap<String, GroupCommandSender>>,
|
||||
access_control: Arc<AC>,
|
||||
storage: Arc<S>,
|
||||
#[allow(dead_code)]
|
||||
metrics: Arc<CollabRealtimeMetrics>,
|
||||
metrics_calculate: CollabMetricsCalculate,
|
||||
|
|
@ -68,7 +69,9 @@ where
|
|||
spawn_collaboration_command(command_recv, &group_sender_by_object_id);
|
||||
|
||||
spawn_metrics(&metrics, &metrics_calculate, &storage);
|
||||
|
||||
Ok(Self {
|
||||
storage,
|
||||
group_manager,
|
||||
connect_state,
|
||||
group_sender_by_object_id,
|
||||
|
|
@ -94,13 +97,17 @@ where
|
|||
let group_manager = self.group_manager.clone();
|
||||
let connect_state = self.connect_state.clone();
|
||||
let metrics_calculate = self.metrics_calculate.clone();
|
||||
let storage = self.storage.clone();
|
||||
|
||||
Box::pin(async move {
|
||||
storage
|
||||
.add_connected_user(connected_user.uid, &connected_user.device_id)
|
||||
.await;
|
||||
|
||||
if let Some(old_user) = connect_state.handle_user_connect(connected_user, new_client_router) {
|
||||
// Remove the old user from all collaboration groups.
|
||||
group_manager.remove_user(&old_user).await;
|
||||
}
|
||||
|
||||
metrics_calculate.connected_users.store(
|
||||
connect_state.number_of_connected_users() as i64,
|
||||
std::sync::atomic::Ordering::Relaxed,
|
||||
|
|
@ -123,11 +130,16 @@ where
|
|||
let group_manager = self.group_manager.clone();
|
||||
let connect_state = self.connect_state.clone();
|
||||
let metrics_calculate = self.metrics_calculate.clone();
|
||||
let storage = self.storage.clone();
|
||||
|
||||
Box::pin(async move {
|
||||
trace!("[realtime]: disconnect => {}", disconnect_user);
|
||||
let was_removed = connect_state.handle_user_disconnect(&disconnect_user);
|
||||
if was_removed.is_some() {
|
||||
storage
|
||||
.remove_connected_user(disconnect_user.uid, &disconnect_user.device_id)
|
||||
.await;
|
||||
|
||||
metrics_calculate.connected_users.store(
|
||||
connect_state.number_of_connected_users() as i64,
|
||||
std::sync::atomic::Ordering::Relaxed,
|
||||
|
|
|
|||
|
|
@ -0,0 +1,74 @@
|
|||
use crate::error::RealtimeError;
|
||||
use futures_util::StreamExt;
|
||||
use redis::{pipe, AsyncCommands, AsyncIter};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct RealtimeSharedState {
|
||||
redis_conn_manager: redis::aio::ConnectionManager,
|
||||
}
|
||||
|
||||
impl RealtimeSharedState {
|
||||
pub fn new(redis_conn_manager: redis::aio::ConnectionManager) -> Self {
|
||||
Self { redis_conn_manager }
|
||||
}
|
||||
pub async fn add_connected_user(&self, uid: i64, device_id: &str) -> Result<(), RealtimeError> {
|
||||
let mut conn = self.redis_conn_manager.clone();
|
||||
let key = realtime_shared_state_cache_key(&uid, device_id);
|
||||
conn
|
||||
.set_ex(key, "1", 60 * 60 * 3)
|
||||
.await
|
||||
.map_err(|err| RealtimeError::Internal(err.into()))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn remove_connected_user(
|
||||
&self,
|
||||
uid: i64,
|
||||
device_id: &str,
|
||||
) -> Result<(), RealtimeError> {
|
||||
let mut conn = self.redis_conn_manager.clone();
|
||||
let key = realtime_shared_state_cache_key(&uid, device_id);
|
||||
conn
|
||||
.del(key)
|
||||
.await
|
||||
.map_err(|err| RealtimeError::Internal(err.into()))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn is_user_connected(&self, uid: &i64, device_id: &str) -> Result<bool, RealtimeError> {
|
||||
let mut conn = self.redis_conn_manager.clone();
|
||||
let key = realtime_shared_state_cache_key(uid, device_id);
|
||||
let result: Option<String> = conn
|
||||
.get(key)
|
||||
.await
|
||||
.map_err(|err| RealtimeError::Internal(err.into()))?;
|
||||
Ok(result.is_some())
|
||||
}
|
||||
|
||||
pub async fn remove_all_connected_users(&self) -> Result<(), RealtimeError> {
|
||||
let mut conn = self.redis_conn_manager.clone();
|
||||
let iter: AsyncIter<String> = conn
|
||||
.scan_match(format!("{}:*", REALTIME_SHARE_STATE_PREFIX))
|
||||
.await
|
||||
.map_err(|err| RealtimeError::Internal(err.into()))?;
|
||||
let keys_to_delete: Vec<_> = iter.collect().await;
|
||||
if !keys_to_delete.is_empty() {
|
||||
let mut pipeline = pipe();
|
||||
for key in keys_to_delete.iter() {
|
||||
pipeline.del(key);
|
||||
}
|
||||
pipeline
|
||||
.query_async(&mut conn)
|
||||
.await
|
||||
.map_err(|err| RealtimeError::Internal(err.into()))?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) const REALTIME_SHARE_STATE_PREFIX: &str = "realtime_shared_state_v0";
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn realtime_shared_state_cache_key(uid: &i64, device_id: &str) -> String {
|
||||
format!("{}:{}:{}", REALTIME_SHARE_STATE_PREFIX, uid, device_id)
|
||||
}
|
||||
|
|
@ -0,0 +1 @@
|
|||
|
||||
|
|
@ -0,0 +1,63 @@
|
|||
use anyhow::Context;
|
||||
use appflowy_collaborate::shared_state::RealtimeSharedState;
|
||||
|
||||
async fn redis_client() -> redis::Client {
|
||||
let redis_uri = "redis://localhost:6379";
|
||||
redis::Client::open(redis_uri)
|
||||
.context("failed to connect to redis")
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn connected_user_test() {
|
||||
let redis_client = redis_client().await;
|
||||
let shared_state = RealtimeSharedState::new(redis_client.get_connection_manager().await.unwrap());
|
||||
|
||||
let device_id = uuid::Uuid::new_v4().to_string();
|
||||
let is_connected = shared_state
|
||||
.is_user_connected(&1, &device_id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(!is_connected);
|
||||
|
||||
shared_state
|
||||
.add_connected_user(1, &device_id)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let is_connected = shared_state
|
||||
.is_user_connected(&1, &device_id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(is_connected);
|
||||
|
||||
shared_state
|
||||
.remove_connected_user(1, &device_id)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let is_connected = shared_state
|
||||
.is_user_connected(&1, &device_id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(!is_connected);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn remove_all_connected_user_test() {
|
||||
let redis_client = redis_client().await;
|
||||
let shared_state = RealtimeSharedState::new(redis_client.get_connection_manager().await.unwrap());
|
||||
|
||||
let device_id = uuid::Uuid::new_v4().to_string();
|
||||
shared_state
|
||||
.add_connected_user(1, &device_id)
|
||||
.await
|
||||
.unwrap();
|
||||
shared_state.remove_all_connected_users().await.unwrap();
|
||||
|
||||
let is_connected = shared_state
|
||||
.is_user_connected(&1, &device_id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(!is_connected);
|
||||
}
|
||||
|
|
@ -934,6 +934,16 @@ async fn post_realtime_message_stream_handler(
|
|||
|
||||
event!(tracing::Level::INFO, "message len: {}", bytes.len());
|
||||
let device_id = device_id.to_string();
|
||||
// Only send message to websocket server when the user is connected
|
||||
if !state
|
||||
.realtime_shared_state
|
||||
.is_user_connected(&uid, &device_id)
|
||||
.await
|
||||
.unwrap_or(false)
|
||||
{
|
||||
return Ok(Json(AppResponse::Ok()));
|
||||
}
|
||||
|
||||
let message = parser_realtime_msg(bytes.freeze(), req.clone()).await?;
|
||||
let stream_message = ClientStreamMessage {
|
||||
uid,
|
||||
|
|
|
|||
|
|
@ -34,6 +34,7 @@ use actix_web::{dev::Server, web, web::Data, App, HttpServer};
|
|||
use anyhow::{Context, Error};
|
||||
use appflowy_ai_client::client::AppFlowyAIClient;
|
||||
use appflowy_collaborate::command::{CLCommandReceiver, CLCommandSender};
|
||||
use appflowy_collaborate::shared_state::RealtimeSharedState;
|
||||
use appflowy_collaborate::CollaborationServer;
|
||||
use database::file::bucket_s3_impl::S3BucketStorage;
|
||||
use openssl::ssl::{SslAcceptor, SslAcceptorBuilder, SslFiletype, SslMethod};
|
||||
|
|
@ -246,6 +247,10 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result<A
|
|||
&config.mailer.smtp_host,
|
||||
)
|
||||
.await?;
|
||||
let realtime_shared_state = RealtimeSharedState::new(redis_conn_manager.clone());
|
||||
if let Err(err) = realtime_shared_state.remove_all_connected_users().await {
|
||||
warn!("Failed to remove all connected users: {:?}", err);
|
||||
}
|
||||
|
||||
info!("Application state initialized");
|
||||
Ok(AppState {
|
||||
|
|
@ -268,6 +273,7 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result<A
|
|||
ai_client: appflowy_ai_client,
|
||||
#[cfg(feature = "history")]
|
||||
grpc_history_client,
|
||||
realtime_shared_state,
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ use crate::biz::collab::queue_redis_ops::WritePriority;
|
|||
use crate::state::RedisConnectionManager;
|
||||
use app_error::AppError;
|
||||
use appflowy_collaborate::command::{CLCommandSender, CollaborationCommand};
|
||||
use appflowy_collaborate::shared_state::RealtimeSharedState;
|
||||
use async_trait::async_trait;
|
||||
use collab::entity::EncodedCollab;
|
||||
use collab_entity::CollabType;
|
||||
|
|
@ -41,6 +42,7 @@ pub struct CollabStorageImpl<AC> {
|
|||
snapshot_control: SnapshotControl,
|
||||
rt_cmd_sender: CLCommandSender,
|
||||
queue: Arc<StorageQueue>,
|
||||
shared_state: RealtimeSharedState,
|
||||
}
|
||||
|
||||
impl<AC> CollabStorageImpl<AC>
|
||||
|
|
@ -55,6 +57,7 @@ where
|
|||
redis_conn_manager: RedisConnectionManager,
|
||||
metrics: Arc<CollabMetrics>,
|
||||
) -> Self {
|
||||
let shared_state = RealtimeSharedState::new(redis_conn_manager.clone());
|
||||
let queue = Arc::new(StorageQueue::new_with_metrics(
|
||||
cache.clone(),
|
||||
redis_conn_manager,
|
||||
|
|
@ -67,6 +70,7 @@ where
|
|||
snapshot_control,
|
||||
rt_cmd_sender,
|
||||
queue,
|
||||
shared_state,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -376,36 +380,20 @@ where
|
|||
async fn get_collab_snapshot_list(&self, oid: &str) -> AppResult<AFSnapshotMetas> {
|
||||
self.snapshot_control.get_collab_snapshot_list(oid).await
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[derive(Clone)]
|
||||
pub struct CollabUserState {
|
||||
redis_conn_manager: RedisConnectionManager,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl CollabUserState {
|
||||
async fn add_connected_user(&self, uid: i64, device_id: &str) -> AppResult<()> {
|
||||
let mut conn = self.redis_conn_manager.clone();
|
||||
redis::cmd("HSET")
|
||||
.arg(uid)
|
||||
.arg(device_id)
|
||||
.arg("true")
|
||||
.query_async(&mut conn)
|
||||
.await
|
||||
.map_err(|err| AppError::Internal(err.into()))?;
|
||||
Ok(())
|
||||
async fn add_connected_user(&self, uid: i64, device_id: &str) {
|
||||
if let Err(err) = self.shared_state.add_connected_user(uid, device_id).await {
|
||||
error!("Failed to add connected user: {}", err);
|
||||
}
|
||||
}
|
||||
|
||||
async fn remove_connected_user(&self, uid: i64, device_id: &str) -> AppResult<()> {
|
||||
let mut conn = self.redis_conn_manager.clone();
|
||||
redis::cmd("HDEL")
|
||||
.arg(uid)
|
||||
.arg(device_id)
|
||||
.query_async(&mut conn)
|
||||
async fn remove_connected_user(&self, uid: i64, device_id: &str) {
|
||||
if let Err(err) = self
|
||||
.shared_state
|
||||
.remove_connected_user(uid, device_id)
|
||||
.await
|
||||
.map_err(|err| AppError::Internal(err.into()))?;
|
||||
Ok(())
|
||||
{
|
||||
error!("Failed to remove connected user: {}", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ use access_control::access::AccessControl;
|
|||
use access_control::metrics::AccessControlMetrics;
|
||||
use app_error::AppError;
|
||||
use appflowy_ai_client::client::AppFlowyAIClient;
|
||||
use appflowy_collaborate::shared_state::RealtimeSharedState;
|
||||
use appflowy_collaborate::CollabRealtimeMetrics;
|
||||
use dashmap::DashMap;
|
||||
use database::file::bucket_s3_impl::S3BucketStorage;
|
||||
|
|
@ -45,6 +46,7 @@ pub struct AppState {
|
|||
pub gotrue_admin: GoTrueAdmin,
|
||||
pub mailer: Mailer,
|
||||
pub ai_client: AppFlowyAIClient,
|
||||
pub realtime_shared_state: RealtimeSharedState,
|
||||
#[cfg(feature = "history")]
|
||||
pub grpc_history_client:
|
||||
tonic_proto::history::history_client::HistoryClient<tonic::transport::Channel>,
|
||||
|
|
|
|||
|
|
@ -1,16 +1,20 @@
|
|||
use crate::collab::util::{generate_random_string, make_big_collab_doc_state};
|
||||
use crate::collab::util::{
|
||||
generate_random_bytes, generate_random_string, make_big_collab_doc_state,
|
||||
};
|
||||
use assert_json_diff::assert_json_eq;
|
||||
use client_api_test_util::*;
|
||||
use collab_entity::CollabType;
|
||||
use database_entity::dto::AFAccessLevel;
|
||||
use serde_json::json;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use collab::core::origin::CollabOrigin;
|
||||
use std::time::Duration;
|
||||
|
||||
use tokio::time::sleep;
|
||||
|
||||
use collab_rt_entity::MAXIMUM_REALTIME_MESSAGE_SIZE;
|
||||
use collab_rt_entity::{CollabMessage, RealtimeMessage, UpdateSync, MAXIMUM_REALTIME_MESSAGE_SIZE};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[tokio::test]
|
||||
|
|
@ -576,27 +580,45 @@ async fn post_realtime_message_test() {
|
|||
}
|
||||
}
|
||||
|
||||
// #[tokio::test]
|
||||
// async fn post_large_num_of_realtime_message_request_test() {
|
||||
// let client = Arc::new(TestClient::new_user().await);
|
||||
// let mut handles = vec![];
|
||||
// for _ in 0..100 {
|
||||
// let cloned_client = client.clone();
|
||||
// let handle = tokio::spawn(async move {
|
||||
// let message = RealtimeMessage::Collab(CollabMessage::ClientUpdateSync(UpdateSync::new(
|
||||
// CollabOrigin::Empty,
|
||||
// "fake_object_id".to_string(),
|
||||
// generate_random_bytes(1024),
|
||||
// 1,
|
||||
// )))
|
||||
// .encode()
|
||||
// .unwrap();
|
||||
// cloned_client.post_realtime_binary(message).await.unwrap();
|
||||
// });
|
||||
// handles.push(handle);
|
||||
// }
|
||||
// futures::future::join_all(handles).await;
|
||||
// }
|
||||
#[tokio::test]
|
||||
async fn post_realtime_message_without_ws_connect_test() {
|
||||
let client = Arc::new(TestClient::new_user_without_ws_conn().await);
|
||||
let mut handles = vec![];
|
||||
|
||||
// try to post 10 realtime message without connect to the websocket server.
|
||||
for _ in 0..10 {
|
||||
let cloned_client = client.clone();
|
||||
let handle = tokio::spawn(async move {
|
||||
let message = RealtimeMessage::Collab(CollabMessage::ClientUpdateSync(UpdateSync::new(
|
||||
CollabOrigin::Empty,
|
||||
uuid::Uuid::new_v4().to_string(),
|
||||
generate_random_bytes(1024),
|
||||
1,
|
||||
)))
|
||||
.encode()
|
||||
.unwrap();
|
||||
cloned_client.post_realtime_binary(message).await.unwrap();
|
||||
});
|
||||
handles.push(handle);
|
||||
}
|
||||
for result in futures::future::join_all(handles).await {
|
||||
result.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn post_realtime_message_with_ws_connect_test() {
|
||||
let client = Arc::new(TestClient::new_user().await);
|
||||
let message = RealtimeMessage::Collab(CollabMessage::ClientUpdateSync(UpdateSync::new(
|
||||
CollabOrigin::Empty,
|
||||
uuid::Uuid::new_v4().to_string(),
|
||||
generate_random_bytes(1024),
|
||||
1,
|
||||
)))
|
||||
.encode()
|
||||
.unwrap();
|
||||
client.post_realtime_binary(message).await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn simulate_10_offline_user_connect_and_then_sync_document_test() {
|
||||
|
|
|
|||
Loading…
Reference in New Issue