refactor: sync protocol (#203)
* refactor: sync protocol * chore: update collab rev
This commit is contained in:
parent
36ef0f13b0
commit
9589054f38
|
|
@ -1243,6 +1243,7 @@ dependencies = [
|
|||
"anyhow",
|
||||
"app-error",
|
||||
"async-trait",
|
||||
"bincode",
|
||||
"bytes",
|
||||
"collab",
|
||||
"collab-entity",
|
||||
|
|
@ -1256,6 +1257,7 @@ dependencies = [
|
|||
"parking_lot",
|
||||
"prost",
|
||||
"realtime-entity",
|
||||
"realtime-protocol",
|
||||
"reqwest",
|
||||
"scraper",
|
||||
"serde",
|
||||
|
|
@ -1277,7 +1279,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "collab"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=82b3f74a716285ec595b8140c7255402433e7c8a#82b3f74a716285ec595b8140c7255402433e7c8a"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=237cc24eda1e52f6dad9126f4a4c160a449688e3#237cc24eda1e52f6dad9126f4a4c160a449688e3"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
|
|
@ -1296,7 +1298,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "collab-derive"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=82b3f74a716285ec595b8140c7255402433e7c8a#82b3f74a716285ec595b8140c7255402433e7c8a"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=237cc24eda1e52f6dad9126f4a4c160a449688e3#237cc24eda1e52f6dad9126f4a4c160a449688e3"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
|
|
@ -1308,7 +1310,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "collab-document"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=82b3f74a716285ec595b8140c7255402433e7c8a#82b3f74a716285ec595b8140c7255402433e7c8a"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=237cc24eda1e52f6dad9126f4a4c160a449688e3#237cc24eda1e52f6dad9126f4a4c160a449688e3"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"collab",
|
||||
|
|
@ -1327,7 +1329,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "collab-entity"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=82b3f74a716285ec595b8140c7255402433e7c8a#82b3f74a716285ec595b8140c7255402433e7c8a"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=237cc24eda1e52f6dad9126f4a4c160a449688e3#237cc24eda1e52f6dad9126f4a4c160a449688e3"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bytes",
|
||||
|
|
@ -1341,7 +1343,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "collab-folder"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=82b3f74a716285ec595b8140c7255402433e7c8a#82b3f74a716285ec595b8140c7255402433e7c8a"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=237cc24eda1e52f6dad9126f4a4c160a449688e3#237cc24eda1e52f6dad9126f4a4c160a449688e3"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"chrono",
|
||||
|
|
@ -1361,7 +1363,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "collab-persistence"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=82b3f74a716285ec595b8140c7255402433e7c8a#82b3f74a716285ec595b8140c7255402433e7c8a"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=237cc24eda1e52f6dad9126f4a4c160a449688e3#237cc24eda1e52f6dad9126f4a4c160a449688e3"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
|
|
@ -3927,6 +3929,7 @@ dependencies = [
|
|||
"once_cell",
|
||||
"parking_lot",
|
||||
"realtime-entity",
|
||||
"realtime-protocol",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde-aux",
|
||||
|
|
@ -3960,9 +3963,24 @@ dependencies = [
|
|||
"prost",
|
||||
"prost-build",
|
||||
"protoc-bin-vendored",
|
||||
"realtime-protocol",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"thiserror",
|
||||
"tokio-tungstenite",
|
||||
"yrs",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "realtime-protocol"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bincode",
|
||||
"collab",
|
||||
"serde",
|
||||
"thiserror",
|
||||
"yrs",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
|
|||
13
Cargo.toml
13
Cargo.toml
|
|
@ -123,11 +123,13 @@ members = [
|
|||
"admin_frontend",
|
||||
"libs/app_error",
|
||||
"libs/workspace-template",
|
||||
"libs/encrypt"
|
||||
"libs/encrypt",
|
||||
"libs/realtime-protocol"
|
||||
]
|
||||
|
||||
[workspace.dependencies]
|
||||
realtime-entity = { path = "libs/realtime-entity" }
|
||||
realtime-protocol = { path = "libs/realtime-protocol" }
|
||||
database-entity = { path = "libs/database-entity" }
|
||||
app-error = { path = "libs/app_error" }
|
||||
serde_json = "1.0.108"
|
||||
|
|
@ -139,6 +141,7 @@ uuid = { version = "1.4.1", features = ["v4"] }
|
|||
anyhow = "1.0.75"
|
||||
tokio = { version = "1.34", features = ["sync"] }
|
||||
yrs = "0.17.1"
|
||||
bincode = "1.3.3"
|
||||
|
||||
[profile.release]
|
||||
lto = true
|
||||
|
|
@ -155,10 +158,10 @@ lto = false
|
|||
opt-level = 3
|
||||
|
||||
[patch.crates-io]
|
||||
collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "82b3f74a716285ec595b8140c7255402433e7c8a" }
|
||||
collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "82b3f74a716285ec595b8140c7255402433e7c8a" }
|
||||
collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "82b3f74a716285ec595b8140c7255402433e7c8a" }
|
||||
collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "82b3f74a716285ec595b8140c7255402433e7c8a" }
|
||||
collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "237cc24eda1e52f6dad9126f4a4c160a449688e3" }
|
||||
collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "237cc24eda1e52f6dad9126f4a4c160a449688e3" }
|
||||
collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "237cc24eda1e52f6dad9126f4a4c160a449688e3" }
|
||||
collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "237cc24eda1e52f6dad9126f4a4c160a449688e3" }
|
||||
|
||||
# Comment the above and uncomment the below to use local version of collab by cloning the repo and placing it in libs folder
|
||||
#collab = { path = "libs/AppFlowy-Collab/collab" }
|
||||
|
|
|
|||
|
|
@ -38,10 +38,12 @@ collab = { version = "0.1.0", optional = true }
|
|||
collab-entity = { version = "0.1.0" }
|
||||
yrs = { workspace = true, optional = true }
|
||||
realtime-entity = { workspace = true, features = ["tungstenite"] }
|
||||
realtime-protocol = { workspace = true }
|
||||
workspace-template = { workspace = true, optional = true }
|
||||
mime_guess = "2.0.4"
|
||||
async-trait = { version = "0.1.73" }
|
||||
prost = "0.12.1"
|
||||
bincode = "1.3.3"
|
||||
|
||||
|
||||
[features]
|
||||
|
|
|
|||
|
|
@ -1,10 +1,10 @@
|
|||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum SyncError {
|
||||
#[error(transparent)]
|
||||
YSync(#[from] collab::sync_protocol::message::Error),
|
||||
YSync(#[from] realtime_protocol::Error),
|
||||
|
||||
#[error(transparent)]
|
||||
YAwareness(#[from] collab::sync_protocol::awareness::Error),
|
||||
YAwareness(#[from] collab::core::awareness::Error),
|
||||
|
||||
#[error("failed to deserialize message: {0}")]
|
||||
DecodingError(#[from] yrs::encoding::read::Error),
|
||||
|
|
|
|||
|
|
@ -1,14 +1,14 @@
|
|||
use std::sync::{Arc, Weak};
|
||||
|
||||
use collab::core::awareness::Awareness;
|
||||
use collab::core::collab::MutexCollab;
|
||||
use collab::core::collab_state::SyncState;
|
||||
use collab::core::origin::CollabOrigin;
|
||||
use collab::preclude::CollabPlugin;
|
||||
use collab::sync_protocol::awareness::Awareness;
|
||||
use collab::sync_protocol::message::{Message, SyncMessage};
|
||||
use collab_entity::{CollabObject, CollabType};
|
||||
use futures_util::SinkExt;
|
||||
use realtime_entity::collab_msg::{CollabMessage, UpdateSync};
|
||||
use realtime_protocol::{Message, SyncMessage};
|
||||
use tokio_stream::StreamExt;
|
||||
|
||||
use crate::collab_sync::{SinkConfig, SyncQueue};
|
||||
|
|
|
|||
|
|
@ -2,14 +2,14 @@ use crate::collab_sync::{
|
|||
CollabSink, CollabSinkRunner, SinkConfig, SinkState, SyncError, SyncObject,
|
||||
};
|
||||
use bytes::Bytes;
|
||||
use collab::core::awareness::Awareness;
|
||||
use collab::core::collab::MutexCollab;
|
||||
use collab::core::collab_state::SyncState;
|
||||
use collab::core::origin::CollabOrigin;
|
||||
use collab::sync_protocol::awareness::Awareness;
|
||||
use collab::sync_protocol::message::{Message, MessageReader, SyncMessage};
|
||||
use collab::sync_protocol::{handle_msg, ClientSyncProtocol, CollabSyncProtocol};
|
||||
use futures_util::{SinkExt, StreamExt};
|
||||
use realtime_entity::collab_msg::{CollabMessage, InitSync, ServerInit, UpdateSync};
|
||||
use realtime_protocol::{handle_msg, ClientSyncProtocol, CollabSyncProtocol};
|
||||
use realtime_protocol::{Message, MessageReader, SyncMessage};
|
||||
use std::marker::PhantomData;
|
||||
use std::ops::Deref;
|
||||
use std::sync::{Arc, Weak};
|
||||
|
|
|
|||
|
|
@ -38,12 +38,11 @@ impl InsertCollabParams {
|
|||
}
|
||||
}
|
||||
pub fn from_raw_data(
|
||||
object_id: &str,
|
||||
object_id: String,
|
||||
collab_type: CollabType,
|
||||
encoded_collab_v1: Vec<u8>,
|
||||
workspace_id: &str,
|
||||
) -> Self {
|
||||
let object_id = object_id.to_string();
|
||||
let workspace_id = workspace_id.to_string();
|
||||
Self {
|
||||
object_id,
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ base64 = "0.21.5"
|
|||
hkdf = { version = "0.12.3" }
|
||||
sha2 = "0.10.8"
|
||||
serde = { version = "1.0.188", features = ["derive"] }
|
||||
bincode = "1.3.3"
|
||||
bincode.workspace = true
|
||||
bytes.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
|
|
|
|||
|
|
@ -13,10 +13,13 @@ serde_json.workspace = true
|
|||
bytes = { version = "1.0", features = ["serde"] }
|
||||
anyhow = "1.0.75"
|
||||
actix = { version = "0.13", optional = true }
|
||||
bincode = "1.3.3"
|
||||
bincode.workspace = true
|
||||
tokio-tungstenite = { version = "0.20.1", optional = true }
|
||||
prost = "0.12.1"
|
||||
database-entity.workspace = true
|
||||
yrs.workspace = true
|
||||
thiserror = "1.0.48"
|
||||
realtime-protocol.workspace = true
|
||||
|
||||
[features]
|
||||
actix_message = ["actix"]
|
||||
|
|
|
|||
|
|
@ -8,8 +8,8 @@ use collab::core::origin::CollabOrigin;
|
|||
use collab::preclude::merge_updates_v1;
|
||||
use collab::preclude::updates::decoder::DecoderV1;
|
||||
use collab::preclude::updates::encoder::{Encode, Encoder, EncoderV1};
|
||||
use collab::sync_protocol::message::{Message, MessageReader, SyncMessage};
|
||||
use collab_entity::CollabType;
|
||||
use realtime_protocol::{Message, MessageReader, SyncMessage};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub trait CollabSinkMessage: Clone + Send + Sync + 'static + Ord + Display {
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
pub mod collab_msg;
|
||||
|
||||
pub mod message;
|
||||
pub mod user;
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,14 @@
|
|||
[package]
|
||||
name = "realtime-protocol"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
yrs.workspace = true
|
||||
thiserror = "1.0.48"
|
||||
serde.workspace = true
|
||||
collab = { version = "0.1.0" }
|
||||
bincode.workspace = true
|
||||
anyhow.workspace = true
|
||||
|
|
@ -0,0 +1,5 @@
|
|||
mod message;
|
||||
mod protocol;
|
||||
|
||||
pub use message::*;
|
||||
pub use protocol::*;
|
||||
|
|
@ -0,0 +1,205 @@
|
|||
use std::time::Duration;
|
||||
|
||||
use anyhow::anyhow;
|
||||
use collab::core::awareness::{Awareness, AwarenessUpdate};
|
||||
use collab::core::collab::{MutexCollab, TransactionMutExt};
|
||||
use collab::core::origin::CollabOrigin;
|
||||
use yrs::updates::decoder::Decode;
|
||||
use yrs::updates::encoder::{Encode, Encoder};
|
||||
use yrs::{ReadTxn, StateVector, Transact, Update};
|
||||
|
||||
use crate::message::{CustomMessage, Error, Message, SyncMessage, SyncMeta};
|
||||
|
||||
// ***************************
|
||||
// Client A Client B Server
|
||||
// | | |
|
||||
// |---(1)--Sync Step1----->|
|
||||
// | | |
|
||||
// |<--(2)--Sync Step2------|
|
||||
// |<-------Sync Step1------|
|
||||
// | | |
|
||||
// |---(3)--Sync Step2----->|
|
||||
// | | |
|
||||
// **************************
|
||||
// |---(1)-- Update-------->|
|
||||
// | | |
|
||||
// | | (2) Apply->|
|
||||
// | | |
|
||||
// | |<-(3) Broadcast
|
||||
// | | |
|
||||
// | |< (4) Apply |
|
||||
/// A implementation of [CollabSyncProtocol].
|
||||
#[derive(Clone)]
|
||||
pub struct ClientSyncProtocol;
|
||||
impl CollabSyncProtocol for ClientSyncProtocol {
|
||||
fn check<E: Encoder>(&self, encoder: &mut E, last_sync_at: i64) -> Result<(), Error> {
|
||||
let meta = SyncMeta { last_sync_at };
|
||||
Message::Custom(CustomMessage::SyncCheck(meta)).encode(encoder);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub trait CollabSyncProtocol {
|
||||
fn check<E: Encoder>(&self, _encoder: &mut E, _last_sync_at: i64) -> Result<(), Error> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn start<E: Encoder>(&self, awareness: &Awareness, encoder: &mut E) -> Result<(), Error> {
|
||||
let (sv, update) = {
|
||||
let sv = awareness.doc().transact().state_vector();
|
||||
let update = awareness.update()?;
|
||||
(sv, update)
|
||||
};
|
||||
|
||||
Message::Sync(SyncMessage::SyncStep1(sv)).encode(encoder);
|
||||
Message::Awareness(update).encode(encoder);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Given a [StateVector] of a remote side, calculate missing
|
||||
/// updates. Returns a sync-step-2 message containing a calculated update.
|
||||
fn handle_sync_step1(
|
||||
&self,
|
||||
awareness: &Awareness,
|
||||
sv: StateVector,
|
||||
) -> Result<Option<Vec<u8>>, Error> {
|
||||
let update = awareness
|
||||
.doc()
|
||||
.try_transact()
|
||||
.map_err(|err| Error::YrsTransaction(format!("fail to handle sync step1. error: {}", err)))?
|
||||
.encode_state_as_update_v1(&sv);
|
||||
Ok(Some(
|
||||
Message::Sync(SyncMessage::SyncStep2(update)).encode_v1(),
|
||||
))
|
||||
}
|
||||
|
||||
/// Handle reply for a sync-step-1 send from this replica previously. By default just apply
|
||||
/// an update to current `awareness` document instance.
|
||||
fn handle_sync_step2(
|
||||
&self,
|
||||
origin: &Option<&CollabOrigin>,
|
||||
awareness: &mut Awareness,
|
||||
update: Update,
|
||||
) -> Result<Option<Vec<u8>>, Error> {
|
||||
let mut txn = match origin {
|
||||
Some(origin) => awareness.doc().try_transact_mut_with((*origin).clone()),
|
||||
None => awareness.doc().try_transact_mut(),
|
||||
}
|
||||
.map_err(|err| Error::YrsTransaction(format!("fail to handle sync step2. error: {}", err)))?;
|
||||
txn.try_apply_update(update).map_err(|err| {
|
||||
Error::YrsTransaction(format!("fail to apply sync step2 update. error: {}", err))
|
||||
})?;
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Handle continuous update send from the client. By default just apply an update to a current
|
||||
/// `awareness` document instance.
|
||||
fn handle_update(
|
||||
&self,
|
||||
origin: &Option<&CollabOrigin>,
|
||||
awareness: &mut Awareness,
|
||||
update: Update,
|
||||
) -> Result<Option<Vec<u8>>, Error> {
|
||||
self.handle_sync_step2(origin, awareness, update)
|
||||
}
|
||||
|
||||
fn handle_auth(
|
||||
&self,
|
||||
_awareness: &Awareness,
|
||||
deny_reason: Option<String>,
|
||||
) -> Result<Option<Vec<u8>>, Error> {
|
||||
if let Some(reason) = deny_reason {
|
||||
Err(Error::PermissionDenied { reason })
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
/// Reply to awareness query or just incoming [AwarenessUpdate], where current `awareness`
|
||||
/// instance is being updated with incoming data.
|
||||
fn handle_awareness_update(
|
||||
&self,
|
||||
awareness: &mut Awareness,
|
||||
update: AwarenessUpdate,
|
||||
) -> Result<Option<Vec<u8>>, Error> {
|
||||
awareness.apply_update(update)?;
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn handle_custom_message(
|
||||
&self,
|
||||
_awareness: &mut Awareness,
|
||||
_msg: CustomMessage,
|
||||
) -> Result<Option<Vec<u8>>, Error> {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
/// Handles incoming messages from the client/server
|
||||
pub fn handle_msg<P: CollabSyncProtocol>(
|
||||
origin: &Option<&CollabOrigin>,
|
||||
protocol: &P,
|
||||
collab: &MutexCollab,
|
||||
msg: Message,
|
||||
) -> Result<Option<Vec<u8>>, Error> {
|
||||
match msg {
|
||||
Message::Sync(msg) => match msg {
|
||||
SyncMessage::SyncStep1(sv) => {
|
||||
let collab = collab
|
||||
.try_lock_for(Duration::from_millis(400))
|
||||
.ok_or(Error::Internal(anyhow!(
|
||||
"Timeout while trying to acquire lock"
|
||||
)))?;
|
||||
protocol.handle_sync_step1(collab.get_awareness(), sv)
|
||||
},
|
||||
SyncMessage::SyncStep2(update) => {
|
||||
let mut collab = collab
|
||||
.try_lock_for(Duration::from_millis(400))
|
||||
.ok_or(Error::Internal(anyhow!(
|
||||
"Timeout while trying to acquire lock"
|
||||
)))?;
|
||||
protocol.handle_sync_step2(
|
||||
origin,
|
||||
collab.get_mut_awareness(),
|
||||
Update::decode_v1(&update)?,
|
||||
)
|
||||
},
|
||||
SyncMessage::Update(update) => {
|
||||
let mut collab = collab
|
||||
.try_lock_for(Duration::from_millis(400))
|
||||
.ok_or(Error::Internal(anyhow!(
|
||||
"Timeout while trying to acquire lock"
|
||||
)))?;
|
||||
protocol.handle_update(
|
||||
origin,
|
||||
collab.get_mut_awareness(),
|
||||
Update::decode_v1(&update)?,
|
||||
)
|
||||
},
|
||||
},
|
||||
Message::Auth(reason) => {
|
||||
let collab = collab
|
||||
.try_lock_for(Duration::from_millis(400))
|
||||
.ok_or(Error::Internal(anyhow!(
|
||||
"Timeout while trying to acquire lock"
|
||||
)))?;
|
||||
protocol.handle_auth(collab.get_awareness(), reason)
|
||||
},
|
||||
Message::Awareness(update) => {
|
||||
let mut collab = collab
|
||||
.try_lock_for(Duration::from_millis(400))
|
||||
.ok_or(Error::Internal(anyhow!(
|
||||
"Timeout while trying to acquire lock"
|
||||
)))?;
|
||||
protocol.handle_awareness_update(collab.get_mut_awareness(), update)
|
||||
},
|
||||
Message::Custom(msg) => {
|
||||
let mut collab = collab
|
||||
.try_lock_for(Duration::from_millis(400))
|
||||
.ok_or(Error::Internal(anyhow!(
|
||||
"Timeout while trying to acquire lock"
|
||||
)))?;
|
||||
protocol.handle_custom_message(collab.get_mut_awareness(), msg)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
@ -32,6 +32,7 @@ database-entity.workspace = true
|
|||
yrs.workspace = true
|
||||
chrono = "0.4.30"
|
||||
realtime-entity = { workspace = true, features = ["actix_message"] }
|
||||
realtime-protocol.workspace = true
|
||||
uuid = { version = "1", features = ["v4"] }
|
||||
|
||||
[dev-dependencies]
|
||||
|
|
|
|||
|
|
@ -1,12 +1,13 @@
|
|||
use collab::core::awareness;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::collaborate::sync_protocol::ServerSyncProtocol;
|
||||
use collab::core::awareness::{Awareness, AwarenessUpdate};
|
||||
use collab::core::collab::MutexCollab;
|
||||
use collab::core::origin::CollabOrigin;
|
||||
use collab::sync_protocol::awareness::{Awareness, AwarenessUpdate};
|
||||
use collab::sync_protocol::message::{Message, MessageReader, MSG_SYNC, MSG_SYNC_UPDATE};
|
||||
use collab::sync_protocol::{awareness, handle_msg};
|
||||
use futures_util::{SinkExt, StreamExt};
|
||||
use realtime_protocol::handle_msg;
|
||||
use realtime_protocol::{Message, MessageReader, MSG_SYNC, MSG_SYNC_UPDATE};
|
||||
use tokio::select;
|
||||
use tokio::sync::broadcast::error::SendError;
|
||||
use tokio::sync::broadcast::{channel, Sender};
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ use collab_entity::CollabType;
|
|||
use database::collab::CollabStorage;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use collab::core::collab_plugin::EncodedCollabV1;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::task::spawn_blocking;
|
||||
|
|
@ -177,6 +178,10 @@ where
|
|||
f(&collab);
|
||||
}
|
||||
|
||||
pub fn encode_v1(&self) -> EncodedCollabV1 {
|
||||
self.collab.lock().encode_collab_v1()
|
||||
}
|
||||
|
||||
pub async fn is_empty(&self) -> bool {
|
||||
self.subscribers.read().await.is_empty()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,11 +5,11 @@ use app_error::AppError;
|
|||
use async_trait::async_trait;
|
||||
|
||||
use crate::collaborate::{CollabAccessControl, CollabUserId};
|
||||
use collab::core::awareness::Awareness;
|
||||
use collab::core::collab::TransactionMutExt;
|
||||
use collab::core::collab_plugin::EncodedCollabV1;
|
||||
use collab::core::origin::CollabOrigin;
|
||||
use collab::preclude::{CollabPlugin, Doc, TransactionMut};
|
||||
use collab::sync_protocol::awareness::Awareness;
|
||||
use collab_entity::CollabType;
|
||||
use database::collab::CollabStorage;
|
||||
use database_entity::dto::{AFAccessLevel, InsertCollabParams, QueryCollabParams};
|
||||
|
|
@ -160,7 +160,7 @@ where
|
|||
match result {
|
||||
Ok(encoded_collab_v1) => {
|
||||
let params = InsertCollabParams::from_raw_data(
|
||||
object_id,
|
||||
object_id.to_string(),
|
||||
self.collab_type.clone(),
|
||||
encoded_collab_v1,
|
||||
&self.workspace_id,
|
||||
|
|
@ -199,36 +199,27 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
fn flush(&self, object_id: &str, data: &EncodedCollabV1) {
|
||||
fn flush(&self, object_id: &str, doc: &Doc) {
|
||||
let storage = self.storage.clone();
|
||||
match data.encode_to_bytes() {
|
||||
Ok(encoded_collab_v1) => {
|
||||
let params = InsertCollabParams::from_raw_data(
|
||||
object_id,
|
||||
self.collab_type.clone(),
|
||||
encoded_collab_v1,
|
||||
&self.workspace_id,
|
||||
);
|
||||
|
||||
info!(
|
||||
"[realtime] start flushing {}:{} with len: {}",
|
||||
object_id,
|
||||
params.collab_type,
|
||||
params.encoded_collab_v1.len()
|
||||
);
|
||||
|
||||
let uid = self.uid;
|
||||
tokio::spawn(async move {
|
||||
let object_id = params.object_id.clone();
|
||||
match storage.insert_collab(&uid, params).await {
|
||||
Ok(_) => info!("[realtime] end flushing collab: {}", object_id),
|
||||
Err(err) => error!("save collab failed: {:?}", err),
|
||||
}
|
||||
});
|
||||
},
|
||||
Err(err) => {
|
||||
error!("fail to encode EncodedDocV1 to bytes: {:?}", err);
|
||||
},
|
||||
let uid = self.uid;
|
||||
let workspace_id = self.workspace_id.clone();
|
||||
let collab_type = self.collab_type.clone();
|
||||
let object_id = object_id.to_string();
|
||||
if let Ok(encoded_collab_v1) = {
|
||||
let txn = doc.transact();
|
||||
let doc_state = txn.encode_state_as_update_v1(&StateVector::default());
|
||||
let state_vector = txn.state_vector().encode_v1();
|
||||
EncodedCollabV1::new(doc_state, state_vector).encode_to_bytes()
|
||||
} {
|
||||
let params =
|
||||
InsertCollabParams::from_raw_data(object_id, collab_type, encoded_collab_v1, &workspace_id);
|
||||
let object_id = params.object_id.clone();
|
||||
tokio::spawn(async move {
|
||||
match storage.insert_collab(&uid, params).await {
|
||||
Ok(_) => info!("[realtime] flushing collab: {}", object_id),
|
||||
Err(err) => error!("save collab failed: {:?}", err),
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -238,7 +238,7 @@ where
|
|||
.run()
|
||||
.await?;
|
||||
|
||||
broadcast_message(&user, &collab_message, &client_stream_by_user).await;
|
||||
broadcast_message(&user, collab_message, &client_stream_by_user).await;
|
||||
Ok(())
|
||||
})
|
||||
},
|
||||
|
|
@ -253,7 +253,7 @@ where
|
|||
#[inline]
|
||||
async fn broadcast_message<U>(
|
||||
user: &U,
|
||||
collab_message: &CollabMessage,
|
||||
collab_message: CollabMessage,
|
||||
client_streams: &Arc<RwLock<HashMap<U, CollabClientStream>>>,
|
||||
) where
|
||||
U: RealtimeUser,
|
||||
|
|
@ -263,7 +263,7 @@ async fn broadcast_message<U>(
|
|||
trace!("[realtime]: receives collab message: {}", collab_message);
|
||||
match client_stream
|
||||
.stream_tx
|
||||
.send(Ok(RealtimeMessage::Collab(collab_message.clone())))
|
||||
.send(Ok(RealtimeMessage::Collab(collab_message)))
|
||||
{
|
||||
Ok(_) => {},
|
||||
Err(e) => error!("send error: {}", e),
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
use collab::sync_protocol::awareness::Awareness;
|
||||
use collab::sync_protocol::message::{CustomMessage, Error, Message, SyncMessage};
|
||||
use collab::sync_protocol::CollabSyncProtocol;
|
||||
use collab::core::awareness::Awareness;
|
||||
use realtime_protocol::CollabSyncProtocol;
|
||||
use realtime_protocol::{CustomMessage, Error, Message, SyncMessage};
|
||||
use yrs::updates::encoder::{Encode, Encoder, EncoderV1};
|
||||
use yrs::{ReadTxn, StateVector, Transact};
|
||||
|
||||
|
|
|
|||
|
|
@ -3,10 +3,10 @@ use collab::error::CollabError;
|
|||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum RealtimeError {
|
||||
#[error(transparent)]
|
||||
YSync(#[from] collab::sync_protocol::message::Error),
|
||||
YSync(#[from] realtime_protocol::Error),
|
||||
|
||||
#[error(transparent)]
|
||||
YAwareness(#[from] collab::sync_protocol::awareness::Error),
|
||||
YAwareness(#[from] collab::core::awareness::Error),
|
||||
|
||||
#[error("failed to deserialize message: {0}")]
|
||||
DecodingError(#[from] yrs::encoding::read::Error),
|
||||
|
|
|
|||
Loading…
Reference in New Issue