From eb79b9f5e80846bd06b30b4f9c04039ce1452582 Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Thu, 7 Mar 2024 11:57:40 +0800 Subject: [PATCH] chore: add realtime message test (#374) * chore: add realtime message test * chore: add files * chore: add files * chore: rename --- libs/client-api/src/ws/client.rs | 2 +- libs/client-api/src/ws/msg_queue.rs | 2 +- .../migration/0147/client_init | Bin 0 -> 102 bytes .../migration/0147/collab_update | Bin 0 -> 76 bytes .../migration/0149/client_collab_v1 | Bin 0 -> 84 bytes .../migration/0149/client_init | Bin 0 -> 102 bytes .../migration/0149/collab_update | Bin 0 -> 76 bytes libs/realtime-entity/src/collab_msg.rs | 2 +- libs/realtime-entity/src/message.rs | 126 ++++++++++++++---- libs/realtime/src/server/rt_server.rs | 9 +- 10 files changed, 107 insertions(+), 34 deletions(-) create mode 100644 libs/realtime-entity/migration/0147/client_init create mode 100644 libs/realtime-entity/migration/0147/collab_update create mode 100644 libs/realtime-entity/migration/0149/client_collab_v1 create mode 100644 libs/realtime-entity/migration/0149/client_init create mode 100644 libs/realtime-entity/migration/0149/collab_update diff --git a/libs/client-api/src/ws/client.rs b/libs/client-api/src/ws/client.rs index d422b2ed..647ef2e9 100644 --- a/libs/client-api/src/ws/client.rs +++ b/libs/client-api/src/ws/client.rs @@ -310,7 +310,7 @@ impl WSClient { RealtimeMessage::ServerCollabV1(collab_messages) => { handle_collab_message(&weak_collab_channels, collab_messages); }, - RealtimeMessage::ClientCollabV1(_) => { + RealtimeMessage::ClientCollabV1(_) | RealtimeMessage::ClientCollabV2(_) => { // The message from server should not be collab message. error!( "received unexpected collab message from websocket: {:?}", diff --git a/libs/client-api/src/ws/msg_queue.rs b/libs/client-api/src/ws/msg_queue.rs index fddc22c3..57e91839 100644 --- a/libs/client-api/src/ws/msg_queue.rs +++ b/libs/client-api/src/ws/msg_queue.rs @@ -95,7 +95,7 @@ impl AggregateMessageQueue { } debug!("Aggregate messages len: {}", messages_map.len()); - let rt_message = RealtimeMessage::ClientCollabV1(messages_map); + let rt_message = RealtimeMessage::ClientCollabV2(messages_map); match rt_message.encode() { Ok(data) => { if let Err(e) = sender.send(Message::Binary(data)).await { diff --git a/libs/realtime-entity/migration/0147/client_init b/libs/realtime-entity/migration/0147/client_init new file mode 100644 index 0000000000000000000000000000000000000000..4c2b5c26fb5b141c0e0ac5f163893c3e3fd3f9fe GIT binary patch literal 102 zcmZQzKmbN4lM_m(q?TnSr^aWda6<+1ld@8iOB6Cw6bwNkd{E)?{G#mQg2d!hunf!; L7N{&E6Eh0{SkMfS literal 0 HcmV?d00001 diff --git a/libs/realtime-entity/migration/0147/collab_update b/libs/realtime-entity/migration/0147/collab_update new file mode 100644 index 0000000000000000000000000000000000000000..c5941c48477bbfb8bacce34f2fbcf58a3611573a GIT binary patch literal 76 zcmZQzU|?VbVi17QoDeQUN@`hVa%y~L3O7_BKPf9UxkMo|MZu5@D#QY%S=rb*00*B3 AkN^Mx literal 0 HcmV?d00001 diff --git a/libs/realtime-entity/migration/0149/client_collab_v1 b/libs/realtime-entity/migration/0149/client_collab_v1 new file mode 100644 index 0000000000000000000000000000000000000000..94ca01d34d7b2bc07a638e712d8473c4b50b1809 GIT binary patch literal 84 zcmZQ(U|?VbVh}*5IU&LfDXC?d$*J*~Dcn$j{G_bZ for CollabMessage { impl From for RealtimeMessage { fn from(msg: ClientCollabMessage) -> Self { let object_id = msg.object_id().to_string(); - Self::ClientCollabV1([(object_id, vec![msg])].into()) + Self::ClientCollabV2([(object_id, vec![msg])].into()) } } diff --git a/libs/realtime-entity/src/message.rs b/libs/realtime-entity/src/message.rs index f59fe009..2cdf622e 100644 --- a/libs/realtime-entity/src/message.rs +++ b/libs/realtime-entity/src/message.rs @@ -35,7 +35,8 @@ pub enum RealtimeMessage { Collab(CollabMessage), User(UserMessage), System(SystemMessage), - ClientCollabV1(MessageByObjectId), + ClientCollabV1(Vec), + ClientCollabV2(MessageByObjectId), ServerCollabV1(Vec), } @@ -45,7 +46,8 @@ impl RealtimeMessage { RealtimeMessage::Collab(msg) => msg.len(), RealtimeMessage::User(_) => 1, RealtimeMessage::System(_) => 1, - RealtimeMessage::ClientCollabV1(msgs) => msgs + RealtimeMessage::ClientCollabV1(msgs) => msgs.iter().map(|msg| msg.size()).sum(), + RealtimeMessage::ClientCollabV2(msgs) => msgs .iter() .map(|(_, value)| value.iter().map(|v| v.size()).sum::()) .sum(), @@ -64,7 +66,14 @@ impl RealtimeMessage { let collab_message = ClientCollabMessage::try_from(collab_message)?; Ok([(object_id, vec![collab_message])].into()) }, - RealtimeMessage::ClientCollabV1(collab_messages) => Ok(collab_messages), + RealtimeMessage::ClientCollabV1(collab_messages) => { + let message_map: MessageByObjectId = collab_messages + .into_iter() + .map(|message| (message.object_id().to_string(), vec![message])) + .collect(); + Ok(message_map) + }, + RealtimeMessage::ClientCollabV2(collab_messages) => Ok(collab_messages), _ => Err(anyhow!( "Failed to convert RealtimeMessage:{} to ClientCollabMessage", self @@ -137,6 +146,7 @@ impl Display for RealtimeMessage { RealtimeMessage::User(_) => f.write_fmt(format_args!("User")), RealtimeMessage::System(_) => f.write_fmt(format_args!("System")), RealtimeMessage::ClientCollabV1(_) => f.write_fmt(format_args!("ClientCollabV1")), + RealtimeMessage::ClientCollabV2(_) => f.write_fmt(format_args!("ClientCollabV2")), RealtimeMessage::ServerCollabV1(_) => f.write_fmt(format_args!("ServerCollabV1")), } } @@ -151,13 +161,15 @@ pub enum SystemMessage { #[cfg(test)] mod tests { - - use crate::collab_msg::{CollabMessage, InitSync}; + use crate::collab_msg::{ClientCollabMessage, CollabMessage, InitSync}; use crate::message::{RealtimeMessage, SystemMessage}; use crate::user::UserMessage; + use bytes::Bytes; use collab::core::origin::CollabOrigin; use collab_entity::CollabType; use serde::{Deserialize, Serialize}; + use std::fs::File; + use std::io::{Read, Write}; #[derive(Debug, Clone, Serialize, Deserialize)] #[cfg_attr( @@ -172,32 +184,75 @@ mod tests { } #[test] - fn decode_version_1_with_version_2_struct_test() { - let version_1 = RealtimeMessageV1::Collab(CollabMessage::ClientInitSync(InitSync::new( - CollabOrigin::Empty, - "1".to_string(), - CollabType::Document, - "w1".to_string(), - 1, - vec![0u8, 3], - ))); + fn decode_0149_realtime_message_test() { + let collab_init = read_message_from_file("migration/0149/client_init").unwrap(); + assert!(matches!(collab_init, RealtimeMessage::Collab(_))); + if let RealtimeMessage::Collab(CollabMessage::ClientInitSync(init)) = collab_init { + assert_eq!(init.object_id, "object id 1"); + assert_eq!(init.collab_type, CollabType::Document); + assert_eq!(init.workspace_id, "workspace id 1"); + assert_eq!(init.msg_id, 1); + assert_eq!(init.payload, vec![1, 2, 3, 4]); + } else { + panic!("Failed to decode RealtimeMessage from file"); + } - let version_1_bytes = bincode::serialize(&version_1).unwrap(); - let version_2 = RealtimeMessage::decode(&version_1_bytes).unwrap(); + let collab_update = read_message_from_file("migration/0149/collab_update").unwrap(); + assert!(matches!(collab_update, RealtimeMessage::Collab(_))); + if let RealtimeMessage::Collab(CollabMessage::ClientUpdateSync(update)) = collab_update { + assert_eq!(update.object_id, "object id 1"); + assert_eq!(update.msg_id, 10); + assert_eq!(update.payload, Bytes::from(vec![5, 6, 7, 8])); + } else { + panic!("Failed to decode RealtimeMessage from file"); + } - match (version_1, version_2) { - ( - RealtimeMessageV1::Collab(CollabMessage::ClientInitSync(init_1)), - RealtimeMessage::Collab(CollabMessage::ClientInitSync(init_2)), - ) => { - assert_eq!(init_1, init_2); - }, - _ => panic!("Failed to convert RealtimeMessage to RealtimeMessage2"), + let client_collab_v1 = read_message_from_file("migration/0149/client_collab_v1").unwrap(); + assert!(matches!( + client_collab_v1, + RealtimeMessage::ClientCollabV1(_) + )); + if let RealtimeMessage::ClientCollabV1(messages) = client_collab_v1 { + assert_eq!(messages.len(), 1); + if let ClientCollabMessage::ClientUpdateSync { data } = &messages[0] { + assert_eq!(data.object_id, "object id 1"); + assert_eq!(data.msg_id, 10); + assert_eq!(data.payload, Bytes::from(vec![5, 6, 7, 8])); + } else { + panic!("Failed to decode RealtimeMessage from file"); + } + } else { + panic!("Failed to decode RealtimeMessage from file"); } } #[test] - fn decode_version_2_with_version_1_test_1() { + fn decode_0147_realtime_message_test() { + let collab_init = read_message_from_file("migration/0147/client_init").unwrap(); + assert!(matches!(collab_init, RealtimeMessage::Collab(_))); + if let RealtimeMessage::Collab(CollabMessage::ClientInitSync(init)) = collab_init { + assert_eq!(init.object_id, "object id 1"); + assert_eq!(init.collab_type, CollabType::Document); + assert_eq!(init.workspace_id, "workspace id 1"); + assert_eq!(init.msg_id, 1); + assert_eq!(init.payload, vec![1, 2, 3, 4]); + } else { + panic!("Failed to decode RealtimeMessage from file"); + } + + let collab_update = read_message_from_file("migration/0147/collab_update").unwrap(); + assert!(matches!(collab_update, RealtimeMessage::Collab(_))); + if let RealtimeMessage::Collab(CollabMessage::ClientUpdateSync(update)) = collab_update { + assert_eq!(update.object_id, "object id 1"); + assert_eq!(update.msg_id, 10); + assert_eq!(update.payload, Bytes::from(vec![5, 6, 7, 8])); + } else { + panic!("Failed to decode RealtimeMessage from file"); + } + } + + #[test] + fn decode_version_2_collab_message_with_version_1_test_1() { let version_2 = RealtimeMessage::Collab(CollabMessage::ClientInitSync(InitSync::new( CollabOrigin::Empty, "1".to_string(), @@ -209,7 +264,6 @@ mod tests { let version_2_bytes = version_2.encode().unwrap(); let version_1: RealtimeMessageV1 = bincode::deserialize(&version_2_bytes).unwrap(); - match (version_1, version_2) { ( RealtimeMessageV1::Collab(CollabMessage::ClientInitSync(init_1)), @@ -220,4 +274,24 @@ mod tests { _ => panic!("Failed to convert RealtimeMessage2 to RealtimeMessage"), } } + + #[allow(dead_code)] + fn write_message_to_file( + message: &RealtimeMessage, + file_path: &str, + ) -> Result<(), Box> { + let data = message.encode().unwrap(); + let mut file = File::create(file_path)?; + file.write_all(&data)?; + Ok(()) + } + + #[allow(dead_code)] + fn read_message_from_file(file_path: &str) -> Result { + let mut file = File::open(file_path)?; + let mut buffer = Vec::new(); + file.read_to_end(&mut buffer)?; + let message = RealtimeMessage::decode(&buffer)?; + Ok(message) + } } diff --git a/libs/realtime/src/server/rt_server.rs b/libs/realtime/src/server/rt_server.rs index 1d1e7602..c180a2fa 100644 --- a/libs/realtime/src/server/rt_server.rs +++ b/libs/realtime/src/server/rt_server.rs @@ -444,7 +444,7 @@ pub async fn broadcast_client_collab_message( let pair = (object_id, collab_messages); let err = client_stream .stream_tx - .send(RealtimeMessage::ClientCollabV1([pair].into())); + .send(RealtimeMessage::ClientCollabV2([pair].into())); if let Err(err) = err { warn!("Send user:{} message to group error: {}", user.uid(), err,); client_streams.remove(user); @@ -569,10 +569,9 @@ impl CollabClientStream { if messages.is_empty() { continue; } - if !messages.is_empty() { - if tx.send([(object_id, messages)].into()).await.is_err() { - break; - } + + if tx.send([(object_id, messages)].into()).await.is_err() { + break; } } },