Merge branch 'main' into admin-password
This commit is contained in:
commit
cd73e1cb21
|
|
@ -310,7 +310,7 @@ impl WSClient {
|
|||
RealtimeMessage::ServerCollabV1(collab_messages) => {
|
||||
handle_collab_message(&weak_collab_channels, collab_messages);
|
||||
},
|
||||
RealtimeMessage::ClientCollabV1(_) => {
|
||||
RealtimeMessage::ClientCollabV1(_) | RealtimeMessage::ClientCollabV2(_) => {
|
||||
// The message from server should not be collab message.
|
||||
error!(
|
||||
"received unexpected collab message from websocket: {:?}",
|
||||
|
|
|
|||
|
|
@ -95,7 +95,7 @@ impl AggregateMessageQueue {
|
|||
}
|
||||
|
||||
debug!("Aggregate messages len: {}", messages_map.len());
|
||||
let rt_message = RealtimeMessage::ClientCollabV1(messages_map);
|
||||
let rt_message = RealtimeMessage::ClientCollabV2(messages_map);
|
||||
match rt_message.encode() {
|
||||
Ok(data) => {
|
||||
if let Err(e) = sender.send(Message::Binary(data)).await {
|
||||
|
|
|
|||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
|
@ -683,7 +683,7 @@ impl From<ClientCollabMessage> for CollabMessage {
|
|||
impl From<ClientCollabMessage> for RealtimeMessage {
|
||||
fn from(msg: ClientCollabMessage) -> Self {
|
||||
let object_id = msg.object_id().to_string();
|
||||
Self::ClientCollabV1([(object_id, vec![msg])].into())
|
||||
Self::ClientCollabV2([(object_id, vec![msg])].into())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -35,7 +35,8 @@ pub enum RealtimeMessage {
|
|||
Collab(CollabMessage),
|
||||
User(UserMessage),
|
||||
System(SystemMessage),
|
||||
ClientCollabV1(MessageByObjectId),
|
||||
ClientCollabV1(Vec<ClientCollabMessage>),
|
||||
ClientCollabV2(MessageByObjectId),
|
||||
ServerCollabV1(Vec<ServerCollabMessage>),
|
||||
}
|
||||
|
||||
|
|
@ -45,7 +46,8 @@ impl RealtimeMessage {
|
|||
RealtimeMessage::Collab(msg) => msg.len(),
|
||||
RealtimeMessage::User(_) => 1,
|
||||
RealtimeMessage::System(_) => 1,
|
||||
RealtimeMessage::ClientCollabV1(msgs) => msgs
|
||||
RealtimeMessage::ClientCollabV1(msgs) => msgs.iter().map(|msg| msg.size()).sum(),
|
||||
RealtimeMessage::ClientCollabV2(msgs) => msgs
|
||||
.iter()
|
||||
.map(|(_, value)| value.iter().map(|v| v.size()).sum::<usize>())
|
||||
.sum(),
|
||||
|
|
@ -64,7 +66,14 @@ impl RealtimeMessage {
|
|||
let collab_message = ClientCollabMessage::try_from(collab_message)?;
|
||||
Ok([(object_id, vec![collab_message])].into())
|
||||
},
|
||||
RealtimeMessage::ClientCollabV1(collab_messages) => Ok(collab_messages),
|
||||
RealtimeMessage::ClientCollabV1(collab_messages) => {
|
||||
let message_map: MessageByObjectId = collab_messages
|
||||
.into_iter()
|
||||
.map(|message| (message.object_id().to_string(), vec![message]))
|
||||
.collect();
|
||||
Ok(message_map)
|
||||
},
|
||||
RealtimeMessage::ClientCollabV2(collab_messages) => Ok(collab_messages),
|
||||
_ => Err(anyhow!(
|
||||
"Failed to convert RealtimeMessage:{} to ClientCollabMessage",
|
||||
self
|
||||
|
|
@ -137,6 +146,7 @@ impl Display for RealtimeMessage {
|
|||
RealtimeMessage::User(_) => f.write_fmt(format_args!("User")),
|
||||
RealtimeMessage::System(_) => f.write_fmt(format_args!("System")),
|
||||
RealtimeMessage::ClientCollabV1(_) => f.write_fmt(format_args!("ClientCollabV1")),
|
||||
RealtimeMessage::ClientCollabV2(_) => f.write_fmt(format_args!("ClientCollabV2")),
|
||||
RealtimeMessage::ServerCollabV1(_) => f.write_fmt(format_args!("ServerCollabV1")),
|
||||
}
|
||||
}
|
||||
|
|
@ -151,13 +161,15 @@ pub enum SystemMessage {
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use crate::collab_msg::{CollabMessage, InitSync};
|
||||
use crate::collab_msg::{ClientCollabMessage, CollabMessage, InitSync};
|
||||
use crate::message::{RealtimeMessage, SystemMessage};
|
||||
use crate::user::UserMessage;
|
||||
use bytes::Bytes;
|
||||
use collab::core::origin::CollabOrigin;
|
||||
use collab_entity::CollabType;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fs::File;
|
||||
use std::io::{Read, Write};
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[cfg_attr(
|
||||
|
|
@ -172,32 +184,75 @@ mod tests {
|
|||
}
|
||||
|
||||
#[test]
|
||||
fn decode_version_1_with_version_2_struct_test() {
|
||||
let version_1 = RealtimeMessageV1::Collab(CollabMessage::ClientInitSync(InitSync::new(
|
||||
CollabOrigin::Empty,
|
||||
"1".to_string(),
|
||||
CollabType::Document,
|
||||
"w1".to_string(),
|
||||
1,
|
||||
vec![0u8, 3],
|
||||
)));
|
||||
fn decode_0149_realtime_message_test() {
|
||||
let collab_init = read_message_from_file("migration/0149/client_init").unwrap();
|
||||
assert!(matches!(collab_init, RealtimeMessage::Collab(_)));
|
||||
if let RealtimeMessage::Collab(CollabMessage::ClientInitSync(init)) = collab_init {
|
||||
assert_eq!(init.object_id, "object id 1");
|
||||
assert_eq!(init.collab_type, CollabType::Document);
|
||||
assert_eq!(init.workspace_id, "workspace id 1");
|
||||
assert_eq!(init.msg_id, 1);
|
||||
assert_eq!(init.payload, vec![1, 2, 3, 4]);
|
||||
} else {
|
||||
panic!("Failed to decode RealtimeMessage from file");
|
||||
}
|
||||
|
||||
let version_1_bytes = bincode::serialize(&version_1).unwrap();
|
||||
let version_2 = RealtimeMessage::decode(&version_1_bytes).unwrap();
|
||||
let collab_update = read_message_from_file("migration/0149/collab_update").unwrap();
|
||||
assert!(matches!(collab_update, RealtimeMessage::Collab(_)));
|
||||
if let RealtimeMessage::Collab(CollabMessage::ClientUpdateSync(update)) = collab_update {
|
||||
assert_eq!(update.object_id, "object id 1");
|
||||
assert_eq!(update.msg_id, 10);
|
||||
assert_eq!(update.payload, Bytes::from(vec![5, 6, 7, 8]));
|
||||
} else {
|
||||
panic!("Failed to decode RealtimeMessage from file");
|
||||
}
|
||||
|
||||
match (version_1, version_2) {
|
||||
(
|
||||
RealtimeMessageV1::Collab(CollabMessage::ClientInitSync(init_1)),
|
||||
RealtimeMessage::Collab(CollabMessage::ClientInitSync(init_2)),
|
||||
) => {
|
||||
assert_eq!(init_1, init_2);
|
||||
},
|
||||
_ => panic!("Failed to convert RealtimeMessage to RealtimeMessage2"),
|
||||
let client_collab_v1 = read_message_from_file("migration/0149/client_collab_v1").unwrap();
|
||||
assert!(matches!(
|
||||
client_collab_v1,
|
||||
RealtimeMessage::ClientCollabV1(_)
|
||||
));
|
||||
if let RealtimeMessage::ClientCollabV1(messages) = client_collab_v1 {
|
||||
assert_eq!(messages.len(), 1);
|
||||
if let ClientCollabMessage::ClientUpdateSync { data } = &messages[0] {
|
||||
assert_eq!(data.object_id, "object id 1");
|
||||
assert_eq!(data.msg_id, 10);
|
||||
assert_eq!(data.payload, Bytes::from(vec![5, 6, 7, 8]));
|
||||
} else {
|
||||
panic!("Failed to decode RealtimeMessage from file");
|
||||
}
|
||||
} else {
|
||||
panic!("Failed to decode RealtimeMessage from file");
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn decode_version_2_with_version_1_test_1() {
|
||||
fn decode_0147_realtime_message_test() {
|
||||
let collab_init = read_message_from_file("migration/0147/client_init").unwrap();
|
||||
assert!(matches!(collab_init, RealtimeMessage::Collab(_)));
|
||||
if let RealtimeMessage::Collab(CollabMessage::ClientInitSync(init)) = collab_init {
|
||||
assert_eq!(init.object_id, "object id 1");
|
||||
assert_eq!(init.collab_type, CollabType::Document);
|
||||
assert_eq!(init.workspace_id, "workspace id 1");
|
||||
assert_eq!(init.msg_id, 1);
|
||||
assert_eq!(init.payload, vec![1, 2, 3, 4]);
|
||||
} else {
|
||||
panic!("Failed to decode RealtimeMessage from file");
|
||||
}
|
||||
|
||||
let collab_update = read_message_from_file("migration/0147/collab_update").unwrap();
|
||||
assert!(matches!(collab_update, RealtimeMessage::Collab(_)));
|
||||
if let RealtimeMessage::Collab(CollabMessage::ClientUpdateSync(update)) = collab_update {
|
||||
assert_eq!(update.object_id, "object id 1");
|
||||
assert_eq!(update.msg_id, 10);
|
||||
assert_eq!(update.payload, Bytes::from(vec![5, 6, 7, 8]));
|
||||
} else {
|
||||
panic!("Failed to decode RealtimeMessage from file");
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn decode_version_2_collab_message_with_version_1_test_1() {
|
||||
let version_2 = RealtimeMessage::Collab(CollabMessage::ClientInitSync(InitSync::new(
|
||||
CollabOrigin::Empty,
|
||||
"1".to_string(),
|
||||
|
|
@ -209,7 +264,6 @@ mod tests {
|
|||
|
||||
let version_2_bytes = version_2.encode().unwrap();
|
||||
let version_1: RealtimeMessageV1 = bincode::deserialize(&version_2_bytes).unwrap();
|
||||
|
||||
match (version_1, version_2) {
|
||||
(
|
||||
RealtimeMessageV1::Collab(CollabMessage::ClientInitSync(init_1)),
|
||||
|
|
@ -220,4 +274,24 @@ mod tests {
|
|||
_ => panic!("Failed to convert RealtimeMessage2 to RealtimeMessage"),
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn write_message_to_file(
|
||||
message: &RealtimeMessage,
|
||||
file_path: &str,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let data = message.encode().unwrap();
|
||||
let mut file = File::create(file_path)?;
|
||||
file.write_all(&data)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn read_message_from_file(file_path: &str) -> Result<RealtimeMessage, anyhow::Error> {
|
||||
let mut file = File::open(file_path)?;
|
||||
let mut buffer = Vec::new();
|
||||
file.read_to_end(&mut buffer)?;
|
||||
let message = RealtimeMessage::decode(&buffer)?;
|
||||
Ok(message)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -444,7 +444,7 @@ pub async fn broadcast_client_collab_message<U>(
|
|||
let pair = (object_id, collab_messages);
|
||||
let err = client_stream
|
||||
.stream_tx
|
||||
.send(RealtimeMessage::ClientCollabV1([pair].into()));
|
||||
.send(RealtimeMessage::ClientCollabV2([pair].into()));
|
||||
if let Err(err) = err {
|
||||
warn!("Send user:{} message to group error: {}", user.uid(), err,);
|
||||
client_streams.remove(user);
|
||||
|
|
@ -569,10 +569,9 @@ impl CollabClientStream {
|
|||
if messages.is_empty() {
|
||||
continue;
|
||||
}
|
||||
if !messages.is_empty() {
|
||||
if tx.send([(object_id, messages)].into()).await.is_err() {
|
||||
break;
|
||||
}
|
||||
|
||||
if tx.send([(object_id, messages)].into()).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
},
|
||||
|
|
|
|||
Loading…
Reference in New Issue