feat: deserialization for entities used in collab stream (#682)

This commit is contained in:
Khor Shu Heng 2024-07-30 17:08:48 +08:00 committed by GitHub
parent b5abaafb50
commit ca6490c1ac
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 1093 additions and 813 deletions

View File

@ -25,6 +25,11 @@ jobs:
- name: Install wasm-pack
run: cargo install wasm-pack
- name: install prerequisites
run: |
sudo apt-get update
sudo apt-get install protobuf-compiler
- name: Build ClientAPI
working-directory: ./libs/client-api
run: cargo build --features "enable_brotli"

1647
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -8,7 +8,11 @@ edition = "2021"
[dependencies]
actix.workspace = true
actix-web.workspace = true
actix-http = { workspace = true, default-features = false, features = ["openssl", "compress-brotli", "compress-gzip"] }
actix-http = { workspace = true, default-features = false, features = [
"openssl",
"compress-brotli",
"compress-gzip",
] }
actix-rt = "2.9.0"
actix-web-actors = { version = "4.3" }
actix-service = "2.0.2"
@ -23,24 +27,32 @@ serde_repr.workspace = true
serde.workspace = true
tokio = { workspace = true, features = [
"macros",
"rt-multi-thread",
"sync",
"fs",
"time",
"full",
"macros",
"rt-multi-thread",
"sync",
"fs",
"time",
"full",
] }
tokio-stream.workspace = true
tokio-util = { version = "0.7.10", features = ["io"] }
futures-util = { workspace = true, features = ["std", "io"] }
once_cell = "1.19.0"
chrono = { version = "0.4.37", features = ["serde", "clock"], default-features = false }
chrono = { version = "0.4.37", features = [
"serde",
"clock",
], default-features = false }
derive_more = { version = "0.99" }
secrecy.workspace = true
rand = { version = "0.8", features = ["std_rng"] }
anyhow = "1.0.79"
thiserror = "1.0.56"
reqwest = { workspace = true, features = ["json", "rustls-tls", "cookies", "stream"] }
reqwest = { workspace = true, features = [
"json",
"rustls-tls",
"cookies",
"stream",
] }
unicode-segmentation = "1.10"
lazy_static.workspace = true
fancy-regex = "0.11.0"
@ -48,13 +60,33 @@ validator = "0.16.1"
bytes = "1.5.0"
rcgen = { version = "0.10.0", features = ["pem", "x509-parser"] }
mime = "0.3.17"
aws-sdk-s3 = { version = "1.36.0", features = ["behavior-version-latest", "rt-tokio"] }
aws-sdk-s3 = { version = "1.36.0", features = [
"behavior-version-latest",
"rt-tokio",
] }
aws-config = { version = "1.5.1", features = ["behavior-version-latest"] }
redis = { workspace = true, features = ["json", "tokio-comp", "connection-manager"] }
redis = { workspace = true, features = [
"json",
"tokio-comp",
"connection-manager",
] }
tracing = { version = "0.1.40", features = ["log"] }
tracing-subscriber = { version = "0.3.18", features = ["registry", "env-filter", "ansi", "json", "tracing-log"] }
tracing-subscriber = { version = "0.3.18", features = [
"registry",
"env-filter",
"ansi",
"json",
"tracing-log",
] }
tracing-bunyan-formatter = "0.3.9"
sqlx = { workspace = true, default-features = false, features = ["runtime-tokio-rustls", "macros", "postgres", "uuid", "chrono", "migrate"] }
sqlx = { workspace = true, default-features = false, features = [
"runtime-tokio-rustls",
"macros",
"postgres",
"uuid",
"chrono",
"migrate",
] }
async-trait.workspace = true
prometheus-client.workspace = true
itertools = "0.11"
@ -94,7 +126,12 @@ infra = { path = "libs/infra" }
authentication.workspace = true
access-control.workspace = true
workspace-access.workspace = true
app-error = { workspace = true, features = ["sqlx_error", "actix_web_error", "tokio_error", "appflowy_ai_error"] }
app-error = { workspace = true, features = [
"sqlx_error",
"actix_web_error",
"tokio_error",
"appflowy_ai_error",
] }
shared-entity = { path = "libs/shared-entity", features = ["cloud"] }
workspace-template = { workspace = true }
collab-rt-entity.workspace = true
@ -114,7 +151,13 @@ tempfile = "3.9.0"
assert-json-diff = "2.0.2"
scraper = "0.17.1"
client-api-test = { path = "libs/client-api-test", features = ["collab-sync"] }
client-api = { path = "libs/client-api", features = ["collab-sync", "test_util", "sync_verbose_log", "test_fast_sync", "enable_brotli"] }
client-api = { path = "libs/client-api", features = [
"collab-sync",
"test_util",
"sync_verbose_log",
"test_fast_sync",
"enable_brotli",
] }
opener = "0.6.1"
image = "0.23.14"
collab-rt-entity.workspace = true
@ -132,37 +175,37 @@ path = "src/lib.rs"
[workspace]
members = [
# libs
"libs/snowflake",
"libs/collab-rt-entity",
"libs/database",
"libs/database-entity",
"libs/client-api",
"libs/infra",
"libs/shared-entity",
"libs/gotrue",
"libs/gotrue-entity",
"admin_frontend",
"libs/app-error",
"libs/workspace-access",
"libs/workspace-template",
"libs/encrypt",
"libs/authentication",
"libs/access-control",
"libs/collab-rt-protocol",
"libs/collab-stream",
"libs/client-websocket",
"libs/client-api-test",
"libs/wasm-test",
"libs/client-api-wasm",
"libs/appflowy-ai-client",
"libs/client-api-entity",
# services
"services/appflowy-history",
"services/appflowy-collaborate",
# xtask
"xtask",
"libs/tonic-proto",
# libs
"libs/snowflake",
"libs/collab-rt-entity",
"libs/database",
"libs/database-entity",
"libs/client-api",
"libs/infra",
"libs/shared-entity",
"libs/gotrue",
"libs/gotrue-entity",
"admin_frontend",
"libs/app-error",
"libs/workspace-access",
"libs/workspace-template",
"libs/encrypt",
"libs/authentication",
"libs/access-control",
"libs/collab-rt-protocol",
"libs/collab-stream",
"libs/client-websocket",
"libs/client-api-test",
"libs/wasm-test",
"libs/client-api-wasm",
"libs/appflowy-ai-client",
"libs/client-api-entity",
# services
"services/appflowy-history",
"services/appflowy-collaborate",
# xtask
"xtask",
"libs/tonic-proto",
]
[workspace.dependencies]
@ -190,7 +233,11 @@ workspace-template = { path = "libs/workspace-template" }
uuid = { version = "1.6.1", features = ["v4"] }
anyhow = "1.0.79"
actix = "0.13.3"
actix-web = { version = "4.5.1", default-features = false, features = ["openssl", "compress-brotli", "compress-gzip"] }
actix-web = { version = "4.5.1", default-features = false, features = [
"openssl",
"compress-brotli",
"compress-gzip",
] }
actix-http = { version = "3.6.0", default-features = false }
tokio = { version = "1.36.0", features = ["sync"] }
tokio-stream = "0.1.14"
@ -233,11 +280,11 @@ debug = true
[patch.crates-io]
# It's diffcult to resovle different version with the same crate used in AppFlowy Frontend and the Client-API crate.
# So using patch to workaround this issue.
collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "43b1c98435d63c225229c9def79f2f5213d6eaf1" }
collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "43b1c98435d63c225229c9def79f2f5213d6eaf1" }
collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "43b1c98435d63c225229c9def79f2f5213d6eaf1" }
collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "43b1c98435d63c225229c9def79f2f5213d6eaf1" }
collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "a6fe08ccf3e7ad782083848d6771a870da3e5af9" }
collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "a6fe08ccf3e7ad782083848d6771a870da3e5af9" }
collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "a6fe08ccf3e7ad782083848d6771a870da3e5af9" }
collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "a6fe08ccf3e7ad782083848d6771a870da3e5af9" }
[features]
history = []
ai-test-enabled = []
ai-test-enabled = []

View File

@ -1,3 +1,4 @@
// This file is @generated by prost-build.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct HttpRealtimeMessage {

View File

@ -19,6 +19,7 @@ collab-entity.workspace = true
serde_json.workspace = true
chrono = "0.4"
tokio-util = { version = "0.7" }
prost.workspace = true
[dev-dependencies]

View File

@ -1,4 +1,5 @@
use collab_entity::CollabType;
use collab_entity::proto::collab::collab_update_event::Update;
use collab_entity::{proto, CollabType};
use std::collections::BTreeMap;
use std::fmt::{Display, Formatter};
use std::ops::Deref;
@ -304,12 +305,40 @@ pub enum CollabUpdateEvent {
}
impl CollabUpdateEvent {
#[allow(dead_code)]
fn to_proto(&self) -> proto::collab::CollabUpdateEvent {
match self {
CollabUpdateEvent::UpdateV1 { encode_update } => proto::collab::CollabUpdateEvent {
update: Some(Update::UpdateV1(encode_update.clone())),
},
}
}
fn from_proto(proto: &proto::collab::CollabUpdateEvent) -> Result<Self, StreamError> {
match &proto.update {
None => Err(StreamError::UnexpectedValue(
"update not set for CollabUpdateEvent proto".to_string(),
)),
Some(update) => match update {
Update::UpdateV1(encode_update) => Ok(CollabUpdateEvent::UpdateV1 {
encode_update: encode_update.to_vec(),
}),
},
}
}
pub fn encode(&self) -> Result<Vec<u8>, bincode::Error> {
bincode::serialize(self)
}
pub fn decode(data: &[u8]) -> Result<Self, bincode::Error> {
bincode::deserialize(data)
pub fn decode(data: &[u8]) -> Result<Self, StreamError> {
match prost::Message::decode(data) {
Ok(proto) => CollabUpdateEvent::from_proto(&proto),
Err(_) => match bincode::deserialize(data) {
Ok(event) => Ok(event),
Err(e) => Err(StreamError::BinCodeSerde(e)),
},
}
}
}
@ -321,3 +350,22 @@ impl TryFrom<CollabUpdateEvent> for StreamBinary {
Ok(StreamBinary(raw_data))
}
}
#[cfg(test)]
mod test {
use prost::Message;
#[test]
fn test_collab_update_event_decoding() {
let encoded_update = vec![1, 2, 3, 4, 5];
let event = super::CollabUpdateEvent::UpdateV1 {
encode_update: encoded_update.clone(),
};
let bincode_encoded = event.encode().unwrap();
let protobuf_encoded = event.to_proto().encode_to_vec();
let decoded_from_bincode = super::CollabUpdateEvent::decode(&bincode_encoded).unwrap();
let decoded_from_protobuf = super::CollabUpdateEvent::decode(&protobuf_encoded).unwrap();
assert_eq!(event, decoded_from_bincode);
assert_eq!(event, decoded_from_protobuf);
}
}

View File

@ -1,6 +1,8 @@
use crate::error::StreamError;
use collab_entity::proto;
use futures::stream::BoxStream;
use futures::StreamExt;
use prost::Message;
#[allow(deprecated)]
use redis::aio::{Connection, ConnectionManager};
use redis::{AsyncCommands, RedisWrite, ToRedisArgs};
@ -53,16 +55,36 @@ impl CollabStreamPub {
}
}
#[derive(Clone, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct PubSubMessage {
pub workspace_id: String,
pub oid: String,
}
impl PubSubMessage {
#[allow(dead_code)]
fn to_proto(&self) -> proto::collab::ActiveCollabId {
proto::collab::ActiveCollabId {
workspace_id: self.workspace_id.clone(),
oid: self.oid.clone(),
}
}
fn from_proto(proto: &proto::collab::ActiveCollabId) -> Self {
Self {
workspace_id: proto.workspace_id.clone(),
oid: proto.oid.clone(),
}
}
pub fn from_vec(vec: &[u8]) -> Result<Self, StreamError> {
let message = bincode::deserialize(vec)?;
Ok(message)
match Message::decode(vec) {
Ok(proto) => Ok(Self::from_proto(&proto)),
Err(_) => match bincode::deserialize(vec) {
Ok(event) => Ok(event),
Err(e) => Err(StreamError::BinCodeSerde(e)),
},
}
}
}
@ -75,3 +97,22 @@ impl ToRedisArgs for PubSubMessage {
json.write_redis_args(out);
}
}
#[cfg(test)]
mod test {
use prost::Message;
#[test]
fn test_pubsub_message_decoding() {
let message = super::PubSubMessage {
workspace_id: "1".to_string(),
oid: "o1".to_string(),
};
let bincode_encoded = bincode::serialize(&message).unwrap();
let protobuf_encoded = message.to_proto().encode_to_vec();
let decoded_from_bincode = super::PubSubMessage::from_vec(&bincode_encoded).unwrap();
let decoded_from_protobuf = super::PubSubMessage::from_vec(&protobuf_encoded).unwrap();
assert_eq!(message, decoded_from_bincode);
assert_eq!(message, decoded_from_protobuf);
}
}