feat: Optimize sync (#341)
* chore: optimize sync * chore: optimize sync * chore: optimize sync * chore: update collab rev
This commit is contained in:
parent
0dd9e3f12c
commit
1590e948c6
|
|
@ -1299,6 +1299,7 @@ dependencies = [
|
|||
"gotrue",
|
||||
"gotrue-entity",
|
||||
"governor",
|
||||
"log",
|
||||
"mime",
|
||||
"mime_guess",
|
||||
"parking_lot 0.12.1",
|
||||
|
|
@ -1360,7 +1361,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "collab"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=01be7a981515dd02a8bd37e3e79f1d24eade0f47#01be7a981515dd02a8bd37e3e79f1d24eade0f47"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=35c649ea201e12bf40f5352a8bf9c46141e013a5#35c649ea201e12bf40f5352a8bf9c46141e013a5"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
|
|
@ -1382,7 +1383,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "collab-document"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=01be7a981515dd02a8bd37e3e79f1d24eade0f47#01be7a981515dd02a8bd37e3e79f1d24eade0f47"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=35c649ea201e12bf40f5352a8bf9c46141e013a5#35c649ea201e12bf40f5352a8bf9c46141e013a5"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"collab",
|
||||
|
|
@ -1401,7 +1402,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "collab-entity"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=01be7a981515dd02a8bd37e3e79f1d24eade0f47#01be7a981515dd02a8bd37e3e79f1d24eade0f47"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=35c649ea201e12bf40f5352a8bf9c46141e013a5#35c649ea201e12bf40f5352a8bf9c46141e013a5"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bytes",
|
||||
|
|
@ -1416,7 +1417,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "collab-folder"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=01be7a981515dd02a8bd37e3e79f1d24eade0f47#01be7a981515dd02a8bd37e3e79f1d24eade0f47"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=35c649ea201e12bf40f5352a8bf9c46141e013a5#35c649ea201e12bf40f5352a8bf9c46141e013a5"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"chrono",
|
||||
|
|
|
|||
|
|
@ -172,10 +172,10 @@ inherits = "release"
|
|||
debug = true
|
||||
|
||||
[patch.crates-io]
|
||||
collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "01be7a981515dd02a8bd37e3e79f1d24eade0f47" }
|
||||
collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "01be7a981515dd02a8bd37e3e79f1d24eade0f47" }
|
||||
collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "01be7a981515dd02a8bd37e3e79f1d24eade0f47" }
|
||||
collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "01be7a981515dd02a8bd37e3e79f1d24eade0f47" }
|
||||
collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "35c649ea201e12bf40f5352a8bf9c46141e013a5" }
|
||||
collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "35c649ea201e12bf40f5352a8bf9c46141e013a5" }
|
||||
collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "35c649ea201e12bf40f5352a8bf9c46141e013a5" }
|
||||
collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "35c649ea201e12bf40f5352a8bf9c46141e013a5" }
|
||||
|
||||
[features]
|
||||
custom_env= []
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@
|
|||
# Generate the current dependency list
|
||||
cargo tree > current_deps.txt
|
||||
|
||||
BASELINE_COUNT=1684
|
||||
BASELINE_COUNT=1685
|
||||
CURRENT_COUNT=$(cat current_deps.txt | wc -l)
|
||||
|
||||
echo "Expected dependency count (baseline): $BASELINE_COUNT"
|
||||
|
|
|
|||
|
|
@ -534,6 +534,16 @@ impl TestClient {
|
|||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
pub async fn get_edit_collab_json(&self, object_id: &str) -> Value {
|
||||
self
|
||||
.collab_by_object_id
|
||||
.get(object_id)
|
||||
.unwrap()
|
||||
.collab
|
||||
.lock()
|
||||
.to_json_value()
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn assert_server_snapshot(
|
||||
|
|
@ -638,12 +648,11 @@ pub async fn assert_server_collab(
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn assert_client_collab(
|
||||
pub async fn assert_client_collab_within_30_secs(
|
||||
client: &mut TestClient,
|
||||
object_id: &str,
|
||||
key: &str,
|
||||
expected: Value,
|
||||
_retry_duration: u64,
|
||||
) {
|
||||
let secs = 30;
|
||||
let object_id = object_id.to_string();
|
||||
|
|
@ -676,7 +685,7 @@ pub async fn assert_client_collab(
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn assert_client_collab_include_value(
|
||||
pub async fn assert_client_collab_include_value_within_30_secs(
|
||||
client: &mut TestClient,
|
||||
object_id: &str,
|
||||
expected: Value,
|
||||
|
|
|
|||
|
|
@ -44,6 +44,7 @@ database-entity.workspace = true
|
|||
app-error = { workspace = true, features = ["tokio_error", "bincode_error"] }
|
||||
scraper = { version = "0.17.1", optional = true }
|
||||
governor = { version = "0.6.0" }
|
||||
log = "0.4.20"
|
||||
|
||||
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
|
||||
tokio-retry = "0.3"
|
||||
|
|
|
|||
|
|
@ -1,14 +1,16 @@
|
|||
mod channel;
|
||||
mod error;
|
||||
mod pending_msg;
|
||||
mod plugin;
|
||||
mod sink;
|
||||
mod sync;
|
||||
mod sink_config;
|
||||
mod sink_pending_queue;
|
||||
mod sync_control;
|
||||
|
||||
pub use channel::*;
|
||||
pub use error::*;
|
||||
pub use plugin::*;
|
||||
pub use sink::*;
|
||||
pub use sync::*;
|
||||
pub use sink_config::*;
|
||||
pub use sync_control::*;
|
||||
|
||||
pub use realtime_entity::collab_msg;
|
||||
|
|
|
|||
|
|
@ -1,27 +1,27 @@
|
|||
use std::sync::{Arc, Weak};
|
||||
|
||||
use collab::core::awareness::Awareness;
|
||||
use collab::core::collab::MutexCollab;
|
||||
use collab::core::collab_state::SyncState;
|
||||
use collab::core::origin::CollabOrigin;
|
||||
use collab::preclude::CollabPlugin;
|
||||
use collab::preclude::{Collab, CollabPlugin};
|
||||
use collab_entity::{CollabObject, CollabType};
|
||||
use futures_util::SinkExt;
|
||||
use realtime_entity::collab_msg::{CollabMessage, UpdateSync};
|
||||
use realtime_protocol::{Message, SyncMessage};
|
||||
use tokio_stream::StreamExt;
|
||||
|
||||
use crate::collab_sync::{SinkConfig, SyncQueue};
|
||||
use crate::collab_sync::SyncControl;
|
||||
use tokio_stream::wrappers::WatchStream;
|
||||
use tracing::trace;
|
||||
|
||||
use crate::collab_sync::sink_config::SinkConfig;
|
||||
use crate::platform_spawn;
|
||||
use crate::ws::{ConnectState, WSConnectStateReceiver};
|
||||
use yrs::updates::encoder::Encode;
|
||||
|
||||
pub struct SyncPlugin<Sink, Stream, C> {
|
||||
object: SyncObject,
|
||||
sync_queue: Arc<SyncQueue<Sink, Stream>>,
|
||||
sync_queue: Arc<SyncControl<Sink, Stream>>,
|
||||
// Used to keep the lifetime of the channel
|
||||
#[allow(dead_code)]
|
||||
channel: Option<Arc<C>>,
|
||||
|
|
@ -53,13 +53,13 @@ where
|
|||
mut ws_connect_state: WSConnectStateReceiver,
|
||||
) -> Self {
|
||||
let weak_local_collab = collab.clone();
|
||||
let sync_queue = SyncQueue::new(
|
||||
let sync_queue = SyncControl::new(
|
||||
object.clone(),
|
||||
origin,
|
||||
sink,
|
||||
sink_config,
|
||||
stream,
|
||||
collab.clone(),
|
||||
sink_config,
|
||||
pause,
|
||||
);
|
||||
|
||||
|
|
@ -86,16 +86,19 @@ where
|
|||
(weak_local_collab.upgrade(), weak_sync_queue.upgrade())
|
||||
{
|
||||
if let Some(local_collab) = local_collab.try_lock() {
|
||||
let last_sync_at = local_collab.get_last_sync_at();
|
||||
sync_queue.resume();
|
||||
sync_queue.init_sync(local_collab.get_awareness(), last_sync_at);
|
||||
sync_queue.init_sync(&local_collab);
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
},
|
||||
ConnectState::Unauthorized | ConnectState::Closed => {
|
||||
if let Some(sync_queue) = weak_sync_queue.upgrade() {
|
||||
// Stop sync if the websocket is unauthorized or disconnected
|
||||
sync_queue.pause();
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
},
|
||||
_ => {},
|
||||
|
|
@ -123,8 +126,8 @@ where
|
|||
Stream: StreamExt<Item = Result<CollabMessage, E>> + Send + Sync + Unpin + 'static,
|
||||
C: Send + Sync + 'static,
|
||||
{
|
||||
fn did_init(&self, _awareness: &Awareness, _object_id: &str, last_sync_at: i64) {
|
||||
self.sync_queue.init_sync(_awareness, last_sync_at);
|
||||
fn did_init(&self, collab: &Collab, _object_id: &str, _last_sync_at: i64) {
|
||||
self.sync_queue.init_sync(collab);
|
||||
}
|
||||
|
||||
fn receive_local_update(&self, origin: &CollabOrigin, _object_id: &str, update: &[u8]) {
|
||||
|
|
|
|||
|
|
@ -3,10 +3,11 @@ use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
|
|||
use std::sync::{Arc, Weak};
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::collab_sync::pending_msg::{MessageState, PendingMsgQueue};
|
||||
use crate::collab_sync::{SyncError, SyncObject, DEFAULT_SYNC_TIMEOUT};
|
||||
use crate::collab_sync::sink_pending_queue::{MessageState, SinkPendingQueue};
|
||||
use crate::collab_sync::{SyncError, SyncObject};
|
||||
use futures_util::SinkExt;
|
||||
|
||||
use crate::collab_sync::sink_config::{SinkConfig, SinkStrategy};
|
||||
use crate::platform_spawn;
|
||||
use realtime_entity::collab_msg::{CollabMessage, CollabSinkMessage, MsgId};
|
||||
use tokio::sync::{mpsc, oneshot, watch, Mutex};
|
||||
|
|
@ -35,21 +36,17 @@ pub struct CollabSink<Sink, Msg> {
|
|||
/// The [Sink] is used to send the messages to the remote. It might be a websocket sink or
|
||||
/// other sink that implements the [SinkExt] trait.
|
||||
sender: Arc<Mutex<Sink>>,
|
||||
|
||||
/// The [PendingMsgQueue] is used to queue the messages that are waiting to be sent to the
|
||||
/// The [SinkPendingQueue] is used to queue the messages that are waiting to be sent to the
|
||||
/// remote. It will merge the messages if possible.
|
||||
pending_msg_queue: Arc<parking_lot::Mutex<PendingMsgQueue<Msg>>>,
|
||||
pending_msg_queue: Arc<parking_lot::Mutex<SinkPendingQueue<Msg>>>,
|
||||
msg_id_counter: Arc<DefaultMsgIdCounter>,
|
||||
|
||||
/// The [watch::Sender] is used to notify the [CollabSinkRunner] to process the pending messages.
|
||||
/// Sending `false` will stop the [CollabSinkRunner].
|
||||
notifier: Arc<watch::Sender<bool>>,
|
||||
config: SinkConfig,
|
||||
|
||||
/// Stop the [IntervalRunner] if the sink strategy is [SinkStrategy::FixInterval].
|
||||
#[allow(dead_code)]
|
||||
interval_runner_stop_tx: Option<mpsc::Sender<()>>,
|
||||
|
||||
/// Used to calculate the time interval between two messages. Only used when the sink strategy
|
||||
/// is [SinkStrategy::FixInterval].
|
||||
instant: Mutex<Instant>,
|
||||
|
|
@ -84,7 +81,7 @@ where
|
|||
let notifier = Arc::new(notifier);
|
||||
let state_notifier = Arc::new(sync_state_tx);
|
||||
let sender = Arc::new(Mutex::new(sink));
|
||||
let pending_msg_queue = PendingMsgQueue::new(uid);
|
||||
let pending_msg_queue = SinkPendingQueue::new(uid);
|
||||
let pending_msg_queue = Arc::new(parking_lot::Mutex::new(pending_msg_queue));
|
||||
let msg_id_counter = Arc::new(msg_id_counter);
|
||||
//
|
||||
|
|
@ -112,7 +109,7 @@ where
|
|||
}
|
||||
|
||||
/// Put the message into the queue and notify the sink to process the next message.
|
||||
/// After the [Msg] was pushed into the [PendingMsgQueue]. The queue will pop the next msg base on
|
||||
/// After the [Msg] was pushed into the [SinkPendingQueue]. The queue will pop the next msg base on
|
||||
/// its priority. And the message priority is determined by the [Msg] that implement the [Ord] and
|
||||
/// [PartialOrd] trait. Check out the [CollabMessage] for more details.
|
||||
///
|
||||
|
|
@ -134,8 +131,13 @@ where
|
|||
// When the client is connected, remove all pending messages and send the init message.
|
||||
{
|
||||
let mut pending_msg_queue = self.pending_msg_queue.lock();
|
||||
// if there is an init message in the queue, return;
|
||||
if let Some(msg) = pending_msg_queue.peek() {
|
||||
if msg.get_msg().is_init_msg() {
|
||||
return;
|
||||
}
|
||||
}
|
||||
pending_msg_queue.clear();
|
||||
|
||||
let msg_id = self.msg_id_counter.next();
|
||||
let msg = f(msg_id);
|
||||
pending_msg_queue.push_msg(msg_id, msg);
|
||||
|
|
@ -145,6 +147,16 @@ where
|
|||
self.notify();
|
||||
}
|
||||
|
||||
pub fn can_queue_init_sync(&self) -> bool {
|
||||
let pending_msg_queue = self.pending_msg_queue.lock();
|
||||
if let Some(msg) = pending_msg_queue.peek() {
|
||||
if msg.get_msg().is_init_msg() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
true
|
||||
}
|
||||
|
||||
pub fn clear(&self) {
|
||||
self.pending_msg_queue.lock().clear();
|
||||
}
|
||||
|
|
@ -341,7 +353,10 @@ where
|
|||
}
|
||||
},
|
||||
},
|
||||
Err(err) => error!("Send message failed error: {}", err),
|
||||
Err(err) => {
|
||||
// the error might be caused by the sending message was removed from the queue.
|
||||
trace!("pending message oneshot channel error: {}", err)
|
||||
},
|
||||
}
|
||||
|
||||
self.notify()
|
||||
|
|
@ -406,16 +421,6 @@ impl<Msg> CollabSinkRunner<Msg> {
|
|||
}
|
||||
}
|
||||
|
||||
pub struct SinkConfig {
|
||||
/// `timeout` is the time to wait for the remote to ack the message. If the remote
|
||||
/// does not ack the message in time, the message will be sent again.
|
||||
pub send_timeout: Duration,
|
||||
/// `maximum_payload_size` is the maximum size of the messages to be merged.
|
||||
pub maximum_payload_size: usize,
|
||||
/// `strategy` is the strategy to send the messages.
|
||||
pub strategy: SinkStrategy,
|
||||
}
|
||||
|
||||
fn calculate_timeout(payload_len: usize, default: Duration) -> Duration {
|
||||
match payload_len {
|
||||
0..=40959 => default,
|
||||
|
|
@ -426,61 +431,6 @@ fn calculate_timeout(payload_len: usize, default: Duration) -> Duration {
|
|||
}
|
||||
}
|
||||
|
||||
impl SinkConfig {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
pub fn send_timeout(mut self, secs: u64) -> Self {
|
||||
let timeout_duration = Duration::from_secs(secs);
|
||||
if let SinkStrategy::FixInterval(duration) = self.strategy {
|
||||
if timeout_duration < duration {
|
||||
warn!("The timeout duration should greater than the fix interval duration");
|
||||
}
|
||||
}
|
||||
self.send_timeout = timeout_duration;
|
||||
self
|
||||
}
|
||||
|
||||
/// `max_zip_size` is the maximum size of the messages to be merged.
|
||||
pub fn with_max_payload_size(mut self, max_size: usize) -> Self {
|
||||
self.maximum_payload_size = max_size;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_strategy(mut self, strategy: SinkStrategy) -> Self {
|
||||
if let SinkStrategy::FixInterval(duration) = strategy {
|
||||
if self.send_timeout < duration {
|
||||
warn!("The timeout duration should greater than the fix interval duration");
|
||||
}
|
||||
}
|
||||
self.strategy = strategy;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for SinkConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
send_timeout: Duration::from_secs(DEFAULT_SYNC_TIMEOUT),
|
||||
maximum_payload_size: 1024 * 64,
|
||||
strategy: SinkStrategy::ASAP,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub enum SinkStrategy {
|
||||
/// Send the message as soon as possible.
|
||||
ASAP,
|
||||
/// Send the message in a fixed interval.
|
||||
FixInterval(Duration),
|
||||
}
|
||||
|
||||
impl SinkStrategy {
|
||||
pub fn is_fix_interval(&self) -> bool {
|
||||
matches!(self, SinkStrategy::FixInterval(_))
|
||||
}
|
||||
}
|
||||
|
||||
pub trait MsgIdCounter: Send + Sync + 'static {
|
||||
/// Get the next message id. The message id should be unique.
|
||||
fn next(&self) -> MsgId;
|
||||
|
|
|
|||
|
|
@ -0,0 +1,68 @@
|
|||
use crate::collab_sync::DEFAULT_SYNC_TIMEOUT;
|
||||
use std::time::Duration;
|
||||
use tracing::warn;
|
||||
|
||||
pub struct SinkConfig {
|
||||
/// `timeout` is the time to wait for the remote to ack the message. If the remote
|
||||
/// does not ack the message in time, the message will be sent again.
|
||||
pub send_timeout: Duration,
|
||||
/// `maximum_payload_size` is the maximum size of the messages to be merged.
|
||||
pub maximum_payload_size: usize,
|
||||
/// `strategy` is the strategy to send the messages.
|
||||
pub strategy: SinkStrategy,
|
||||
}
|
||||
|
||||
impl SinkConfig {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
pub fn send_timeout(mut self, secs: u64) -> Self {
|
||||
let timeout_duration = Duration::from_secs(secs);
|
||||
if let SinkStrategy::FixInterval(duration) = self.strategy {
|
||||
if timeout_duration < duration {
|
||||
warn!("The timeout duration should greater than the fix interval duration");
|
||||
}
|
||||
}
|
||||
self.send_timeout = timeout_duration;
|
||||
self
|
||||
}
|
||||
|
||||
/// `max_zip_size` is the maximum size of the messages to be merged.
|
||||
pub fn with_max_payload_size(mut self, max_size: usize) -> Self {
|
||||
self.maximum_payload_size = max_size;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_strategy(mut self, strategy: SinkStrategy) -> Self {
|
||||
if let SinkStrategy::FixInterval(duration) = strategy {
|
||||
if self.send_timeout < duration {
|
||||
warn!("The timeout duration should greater than the fix interval duration");
|
||||
}
|
||||
}
|
||||
self.strategy = strategy;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for SinkConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
send_timeout: Duration::from_secs(DEFAULT_SYNC_TIMEOUT),
|
||||
maximum_payload_size: 1024 * 64,
|
||||
strategy: SinkStrategy::ASAP,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub enum SinkStrategy {
|
||||
/// Send the message as soon as possible.
|
||||
ASAP,
|
||||
/// Send the message in a fixed interval.
|
||||
FixInterval(Duration),
|
||||
}
|
||||
|
||||
impl SinkStrategy {
|
||||
pub fn is_fix_interval(&self) -> bool {
|
||||
matches!(self, SinkStrategy::FixInterval(_))
|
||||
}
|
||||
}
|
||||
|
|
@ -7,13 +7,13 @@ use realtime_entity::collab_msg::{CollabSinkMessage, MsgId};
|
|||
use tokio::sync::oneshot;
|
||||
use tracing::{trace, warn};
|
||||
|
||||
pub(crate) struct PendingMsgQueue<Msg> {
|
||||
pub(crate) struct SinkPendingQueue<Msg> {
|
||||
#[allow(dead_code)]
|
||||
uid: i64,
|
||||
queue: BinaryHeap<PendingMessage<Msg>>,
|
||||
}
|
||||
|
||||
impl<Msg> PendingMsgQueue<Msg>
|
||||
impl<Msg> SinkPendingQueue<Msg>
|
||||
where
|
||||
Msg: CollabSinkMessage,
|
||||
{
|
||||
|
|
@ -29,7 +29,7 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
impl<Msg> Deref for PendingMsgQueue<Msg>
|
||||
impl<Msg> Deref for SinkPendingQueue<Msg>
|
||||
where
|
||||
Msg: CollabSinkMessage,
|
||||
{
|
||||
|
|
@ -40,7 +40,7 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
impl<Msg> DerefMut for PendingMsgQueue<Msg>
|
||||
impl<Msg> DerefMut for SinkPendingQueue<Msg>
|
||||
where
|
||||
Msg: CollabSinkMessage,
|
||||
{
|
||||
|
|
@ -1,62 +1,68 @@
|
|||
use crate::collab_sync::{
|
||||
CollabSink, CollabSinkRunner, SinkConfig, SinkState, SyncError, SyncObject,
|
||||
};
|
||||
use crate::collab_sync::sink_config::SinkConfig;
|
||||
use crate::collab_sync::{CollabSink, CollabSinkRunner, SinkState, SyncError, SyncObject};
|
||||
use crate::platform_spawn;
|
||||
use bytes::Bytes;
|
||||
use collab::core::awareness::Awareness;
|
||||
use collab::core::collab::MutexCollab;
|
||||
use collab::core::collab_state::SyncState;
|
||||
use collab::core::origin::CollabOrigin;
|
||||
use collab::preclude::Collab;
|
||||
use futures_util::{SinkExt, StreamExt};
|
||||
use log::trace;
|
||||
use realtime_entity::collab_msg::{AckCode, CollabMessage, InitSync, ServerInit, UpdateSync};
|
||||
use realtime_protocol::{handle_collab_message, ClientSyncProtocol, CollabSyncProtocol};
|
||||
use realtime_protocol::{Message, MessageReader, SyncMessage};
|
||||
use std::marker::PhantomData;
|
||||
use std::ops::Deref;
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::sync::{Arc, Weak};
|
||||
use tokio::sync::watch;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::sync::{watch, Mutex};
|
||||
use tokio_stream::wrappers::WatchStream;
|
||||
use tracing::{error, span, trace, warn, Level};
|
||||
use tracing::{error, info, warn};
|
||||
use yrs::encoding::read::Cursor;
|
||||
use yrs::updates::decoder::DecoderV1;
|
||||
use yrs::updates::encoder::{Encoder, EncoderV1};
|
||||
|
||||
pub const DEFAULT_SYNC_TIMEOUT: u64 = 4;
|
||||
pub const NUMBER_OF_UPDATE_TRIGGER_INIT_SYNC: u32 = 5;
|
||||
|
||||
pub struct SyncQueue<Sink, Stream> {
|
||||
const DEBOUNCE_DURATION: Duration = Duration::from_secs(10);
|
||||
|
||||
pub struct SyncControl<Sink, Stream> {
|
||||
object: SyncObject,
|
||||
origin: CollabOrigin,
|
||||
/// The [CollabSink] is used to send the updates to the remote. It will send the current
|
||||
/// update periodically if the timeout is reached or it will send the next update if
|
||||
/// it receive previous ack from the remote.
|
||||
sink: Arc<CollabSink<Sink, CollabMessage>>,
|
||||
/// The [SyncStream] will be spawned in a separate task It continuously receive
|
||||
/// The [ObserveCollab] will be spawned in a separate task It continuously receive
|
||||
/// the updates from the remote.
|
||||
#[allow(dead_code)]
|
||||
stream: SyncStream<Sink, Stream>,
|
||||
protocol: ClientSyncProtocol,
|
||||
observe_collab: ObserveCollab<Sink, Stream>,
|
||||
sync_state: Arc<watch::Sender<SyncState>>,
|
||||
}
|
||||
|
||||
impl<Sink, Stream> Drop for SyncQueue<Sink, Stream> {
|
||||
impl<Sink, Stream> Drop for SyncControl<Sink, Stream> {
|
||||
fn drop(&mut self) {
|
||||
trace!("Drop SyncQueue {}", self.object.object_id);
|
||||
}
|
||||
}
|
||||
|
||||
impl<E, Sink, Stream> SyncQueue<Sink, Stream>
|
||||
impl<E, Sink, Stream> SyncControl<Sink, Stream>
|
||||
where
|
||||
E: Into<anyhow::Error> + Send + Sync + 'static,
|
||||
Sink: SinkExt<CollabMessage, Error = E> + Send + Sync + Unpin + 'static,
|
||||
Stream: StreamExt<Item = Result<CollabMessage, E>> + Send + Sync + Unpin + 'static,
|
||||
{
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
object: SyncObject,
|
||||
origin: CollabOrigin,
|
||||
sink: Sink,
|
||||
sink_config: SinkConfig,
|
||||
stream: Stream,
|
||||
collab: Weak<MutexCollab>,
|
||||
config: SinkConfig,
|
||||
pause: bool,
|
||||
) -> Self {
|
||||
let protocol = ClientSyncProtocol;
|
||||
|
|
@ -65,25 +71,26 @@ where
|
|||
let (sync_state_tx, sink_state_rx) = watch::channel(SinkState::Init);
|
||||
debug_assert!(origin.client_user_id().is_some());
|
||||
|
||||
// Create the sink and start the sink runner.
|
||||
let sink = Arc::new(CollabSink::new(
|
||||
origin.client_user_id().unwrap_or(0),
|
||||
object.clone(),
|
||||
sink,
|
||||
notifier,
|
||||
sync_state_tx,
|
||||
config,
|
||||
sink_config,
|
||||
pause,
|
||||
));
|
||||
|
||||
platform_spawn(CollabSinkRunner::run(Arc::downgrade(&sink), notifier_rx));
|
||||
let cloned_protocol = protocol.clone();
|
||||
let object_id = object.object_id.clone();
|
||||
let stream = SyncStream::new(
|
||||
|
||||
// Create the observe collab stream.
|
||||
let _cloned_protocol = protocol.clone();
|
||||
let _object_id = object.object_id.clone();
|
||||
let stream = ObserveCollab::new(
|
||||
origin.clone(),
|
||||
object_id,
|
||||
object.clone(),
|
||||
stream,
|
||||
protocol,
|
||||
collab,
|
||||
collab.clone(),
|
||||
Arc::downgrade(&sink),
|
||||
);
|
||||
|
||||
|
|
@ -113,8 +120,7 @@ where
|
|||
object,
|
||||
origin,
|
||||
sink,
|
||||
stream,
|
||||
protocol: cloned_protocol,
|
||||
observe_collab: stream,
|
||||
sync_state,
|
||||
}
|
||||
}
|
||||
|
|
@ -131,22 +137,8 @@ where
|
|||
self.sync_state.subscribe()
|
||||
}
|
||||
|
||||
pub fn init_sync(&self, awareness: &Awareness, _last_sync_at: i64) {
|
||||
if let Some(payload) = doc_init_state(awareness, &self.protocol) {
|
||||
self.sink.queue_init_sync(|msg_id| {
|
||||
InitSync::new(
|
||||
self.origin.clone(),
|
||||
self.object.object_id.clone(),
|
||||
self.object.collab_type.clone(),
|
||||
self.object.workspace_id.clone(),
|
||||
msg_id,
|
||||
payload,
|
||||
)
|
||||
.into()
|
||||
});
|
||||
} else {
|
||||
self.sink.notify();
|
||||
}
|
||||
pub fn init_sync(&self, collab: &Collab) {
|
||||
_init_sync(self.origin.clone(), &self.object, collab, &self.sink);
|
||||
}
|
||||
|
||||
/// Remove all the messages in the sink queue
|
||||
|
|
@ -168,7 +160,34 @@ fn doc_init_state<P: CollabSyncProtocol>(awareness: &Awareness, protocol: &P) ->
|
|||
}
|
||||
}
|
||||
|
||||
impl<Sink, Stream> Deref for SyncQueue<Sink, Stream> {
|
||||
pub fn _init_sync<E, Sink>(
|
||||
origin: CollabOrigin,
|
||||
sync_object: &SyncObject,
|
||||
collab: &Collab,
|
||||
sink: &Arc<CollabSink<Sink, CollabMessage>>,
|
||||
) where
|
||||
E: Into<anyhow::Error> + Send + Sync + 'static,
|
||||
Sink: SinkExt<CollabMessage, Error = E> + Send + Sync + Unpin + 'static,
|
||||
{
|
||||
let awareness = collab.get_awareness();
|
||||
if let Some(payload) = doc_init_state(awareness, &ClientSyncProtocol) {
|
||||
sink.queue_init_sync(|msg_id| {
|
||||
InitSync::new(
|
||||
origin,
|
||||
sync_object.object_id.clone(),
|
||||
sync_object.collab_type.clone(),
|
||||
sync_object.workspace_id.clone(),
|
||||
msg_id,
|
||||
payload,
|
||||
)
|
||||
.into()
|
||||
})
|
||||
} else {
|
||||
sink.notify();
|
||||
}
|
||||
}
|
||||
|
||||
impl<Sink, Stream> Deref for SyncControl<Sink, Stream> {
|
||||
type Target = Arc<CollabSink<Sink, CollabMessage>>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
|
|
@ -177,7 +196,7 @@ impl<Sink, Stream> Deref for SyncQueue<Sink, Stream> {
|
|||
}
|
||||
|
||||
/// Use to continuously receive updates from remote.
|
||||
struct SyncStream<Sink, Stream> {
|
||||
struct ObserveCollab<Sink, Stream> {
|
||||
object_id: String,
|
||||
#[allow(dead_code)]
|
||||
weak_collab: Weak<MutexCollab>,
|
||||
|
|
@ -185,37 +204,37 @@ struct SyncStream<Sink, Stream> {
|
|||
phantom_stream: PhantomData<Stream>,
|
||||
}
|
||||
|
||||
impl<Sink, Stream> Drop for SyncStream<Sink, Stream> {
|
||||
impl<Sink, Stream> Drop for ObserveCollab<Sink, Stream> {
|
||||
fn drop(&mut self) {
|
||||
trace!("Drop SyncStream {}", self.object_id);
|
||||
}
|
||||
}
|
||||
|
||||
impl<E, Sink, Stream> SyncStream<Sink, Stream>
|
||||
impl<E, Sink, Stream> ObserveCollab<Sink, Stream>
|
||||
where
|
||||
E: Into<anyhow::Error> + Send + Sync + 'static,
|
||||
Sink: SinkExt<CollabMessage, Error = E> + Send + Sync + Unpin + 'static,
|
||||
Stream: StreamExt<Item = Result<CollabMessage, E>> + Send + Sync + Unpin + 'static,
|
||||
{
|
||||
pub fn new<P>(
|
||||
pub fn new(
|
||||
origin: CollabOrigin,
|
||||
object_id: String,
|
||||
object: SyncObject,
|
||||
stream: Stream,
|
||||
protocol: P,
|
||||
weak_collab: Weak<MutexCollab>,
|
||||
sink: Weak<CollabSink<Sink, CollabMessage>>,
|
||||
) -> Self
|
||||
where
|
||||
P: CollabSyncProtocol + Send + Sync + 'static,
|
||||
{
|
||||
) -> Self {
|
||||
let seq_num = Arc::new(AtomicU32::new(0));
|
||||
let last_init_sync = LastSyncTime::new();
|
||||
let object_id = object.object_id.clone();
|
||||
let cloned_weak_collab = weak_collab.clone();
|
||||
platform_spawn(SyncStream::<Sink, Stream>::spawn_doc_stream::<P>(
|
||||
platform_spawn(ObserveCollab::<Sink, Stream>::observer_collab_message(
|
||||
origin,
|
||||
object_id.clone(),
|
||||
object,
|
||||
stream,
|
||||
cloned_weak_collab,
|
||||
sink,
|
||||
protocol,
|
||||
seq_num,
|
||||
last_init_sync,
|
||||
));
|
||||
Self {
|
||||
object_id,
|
||||
|
|
@ -226,16 +245,15 @@ where
|
|||
}
|
||||
|
||||
// Spawn the stream that continuously reads the doc's updates from remote.
|
||||
async fn spawn_doc_stream<P>(
|
||||
async fn observer_collab_message(
|
||||
origin: CollabOrigin,
|
||||
object_id: String,
|
||||
object: SyncObject,
|
||||
mut stream: Stream,
|
||||
weak_collab: Weak<MutexCollab>,
|
||||
weak_sink: Weak<CollabSink<Sink, CollabMessage>>,
|
||||
protocol: P,
|
||||
) where
|
||||
P: CollabSyncProtocol + Send + Sync + 'static,
|
||||
{
|
||||
seq_num: Arc<AtomicU32>,
|
||||
last_init_sync: LastSyncTime,
|
||||
) {
|
||||
while let Some(collab_message_result) = stream.next().await {
|
||||
let collab = match weak_collab.upgrade() {
|
||||
Some(collab) => collab,
|
||||
|
|
@ -258,10 +276,14 @@ where
|
|||
},
|
||||
};
|
||||
|
||||
let span = span!(Level::TRACE, "doc_stream", object_id = %msg.object_id());
|
||||
let _enter = span.enter();
|
||||
if let Err(error) = SyncStream::<Sink, Stream>::process_message::<P>(
|
||||
&origin, &object_id, &protocol, &collab, &sink, msg,
|
||||
if let Err(error) = ObserveCollab::<Sink, Stream>::process_message(
|
||||
&origin,
|
||||
&object,
|
||||
&collab,
|
||||
&sink,
|
||||
msg,
|
||||
&seq_num,
|
||||
&last_init_sync,
|
||||
)
|
||||
.await
|
||||
{
|
||||
|
|
@ -269,7 +291,7 @@ where
|
|||
// TODO(nathan): ask the client to resolve the conflict.
|
||||
error!(
|
||||
"collab:{} can not be synced because of error: {}",
|
||||
object_id, error
|
||||
object.object_id, error
|
||||
);
|
||||
break;
|
||||
} else {
|
||||
|
|
@ -280,22 +302,38 @@ where
|
|||
}
|
||||
|
||||
/// Continuously handle messages from the remote doc
|
||||
async fn process_message<P>(
|
||||
async fn process_message(
|
||||
origin: &CollabOrigin,
|
||||
object_id: &str,
|
||||
protocol: &P,
|
||||
object: &SyncObject,
|
||||
collab: &Arc<MutexCollab>,
|
||||
sink: &Arc<CollabSink<Sink, CollabMessage>>,
|
||||
msg: CollabMessage,
|
||||
) -> Result<(), SyncError>
|
||||
where
|
||||
P: CollabSyncProtocol + Send + Sync + 'static,
|
||||
{
|
||||
broadcast_seq_num: &Arc<AtomicU32>,
|
||||
last_sync_time: &LastSyncTime,
|
||||
) -> Result<(), SyncError> {
|
||||
// If server return the AckCode::ApplyInternalError, which means the server can not apply the
|
||||
// update
|
||||
if let CollabMessage::ClientAck(ack) = &msg {
|
||||
if ack.code == AckCode::CannotApplyUpdate {
|
||||
return Err(SyncError::CannotApplyUpdate(object_id.to_string()));
|
||||
if matches!(msg, CollabMessage::ClientAck(ref ack) if ack.code == AckCode::CannotApplyUpdate) {
|
||||
return Err(SyncError::CannotApplyUpdate(object.object_id.clone()));
|
||||
}
|
||||
|
||||
if let Some(msg_seq_num) = msg.broadcase_seq_num() {
|
||||
let prev_seq_num = broadcast_seq_num.load(Ordering::SeqCst);
|
||||
broadcast_seq_num.store(msg_seq_num, Ordering::SeqCst);
|
||||
|
||||
// Check if the received seq_num indicates missing updates.
|
||||
if msg_seq_num > prev_seq_num + NUMBER_OF_UPDATE_TRIGGER_INIT_SYNC
|
||||
&& sink.can_queue_init_sync()
|
||||
&& last_sync_time.should_sync(DEBOUNCE_DURATION).await
|
||||
{
|
||||
if let Some(lock_guard) = collab.try_lock() {
|
||||
info!(
|
||||
"collab:{} missing updates, start init sync",
|
||||
object.object_id
|
||||
);
|
||||
_init_sync(origin.clone(), object, &lock_guard, sink);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -310,31 +348,39 @@ where
|
|||
_ => return Ok(()),
|
||||
};
|
||||
|
||||
trace!("start process message:{:?}", msg.msg_id());
|
||||
SyncStream::<Sink, Stream>::process_payload(origin, payload, object_id, protocol, collab, sink)
|
||||
.await?;
|
||||
trace!(
|
||||
"start process message:{:?}, len:{}",
|
||||
msg.msg_id(),
|
||||
msg.len()
|
||||
);
|
||||
ObserveCollab::<Sink, Stream>::process_payload(
|
||||
origin,
|
||||
payload,
|
||||
&object.object_id,
|
||||
collab,
|
||||
sink,
|
||||
broadcast_seq_num,
|
||||
)
|
||||
.await?;
|
||||
trace!("end process message: {:?}", msg.msg_id());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn process_payload<P>(
|
||||
async fn process_payload(
|
||||
origin: &CollabOrigin,
|
||||
payload: &Bytes,
|
||||
object_id: &str,
|
||||
protocol: &P,
|
||||
collab: &Arc<MutexCollab>,
|
||||
sink: &Arc<CollabSink<Sink, CollabMessage>>,
|
||||
) -> Result<(), SyncError>
|
||||
where
|
||||
P: CollabSyncProtocol + Send + Sync + 'static,
|
||||
{
|
||||
_broadcast_seq_num: &Arc<AtomicU32>,
|
||||
) -> Result<(), SyncError> {
|
||||
let mut decoder = DecoderV1::new(Cursor::new(payload));
|
||||
let reader = MessageReader::new(&mut decoder);
|
||||
for msg in reader {
|
||||
let msg = msg?;
|
||||
trace!(" {}", msg);
|
||||
let is_sync_step_1 = matches!(msg, Message::Sync(SyncMessage::SyncStep1(_)));
|
||||
if let Some(payload) = handle_collab_message(origin, protocol, collab, msg)? {
|
||||
if let Some(payload) = handle_collab_message(origin, &ClientSyncProtocol, collab, msg)? {
|
||||
if is_sync_step_1 {
|
||||
// flush
|
||||
match collab.try_lock() {
|
||||
|
|
@ -356,3 +402,26 @@ where
|
|||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
struct LastSyncTime {
|
||||
last_sync: Mutex<Instant>,
|
||||
}
|
||||
|
||||
impl LastSyncTime {
|
||||
fn new() -> Self {
|
||||
LastSyncTime {
|
||||
last_sync: Mutex::new(Instant::now() - Duration::from_secs(3600)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn should_sync(&self, debounce_duration: Duration) -> bool {
|
||||
let now = Instant::now();
|
||||
let mut last_sync_locked = self.last_sync.lock().await;
|
||||
if now.duration_since(*last_sync_locked) > debounce_duration {
|
||||
*last_sync_locked = now;
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -51,6 +51,7 @@ pub trait WSClientHttpSender: Send + Sync {
|
|||
|
||||
type WeakChannel = Weak<WebSocketChannel<CollabMessage>>;
|
||||
type ChannelByObjectId = HashMap<String, Vec<WeakChannel>>;
|
||||
type AFRateLimiter = RateLimiter<NotKeyed, InMemoryState, DefaultClock, NoOpMiddleware>;
|
||||
pub type WSConnectStateReceiver = Receiver<ConnectState>;
|
||||
|
||||
pub(crate) type StateNotify = parking_lot::Mutex<ConnectStateNotify>;
|
||||
|
|
@ -66,8 +67,10 @@ pub struct WSClient {
|
|||
collab_channels: Arc<RwLock<ChannelByObjectId>>,
|
||||
ping: Arc<Mutex<Option<ServerFixIntervalPing>>>,
|
||||
stop_tx: Mutex<Option<oneshot::Sender<()>>>,
|
||||
rate_limiter:
|
||||
Arc<tokio::sync::RwLock<RateLimiter<NotKeyed, InMemoryState, DefaultClock, NoOpMiddleware>>>,
|
||||
rate_limiter: Arc<tokio::sync::RwLock<AFRateLimiter>>,
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
skip_realtime_message: Arc<std::sync::atomic::AtomicBool>,
|
||||
}
|
||||
impl WSClient {
|
||||
pub fn new<H>(config: WSClientConfig, http_sender: H) -> Self
|
||||
|
|
@ -92,6 +95,9 @@ impl WSClient {
|
|||
ping,
|
||||
stop_tx: Mutex::new(None),
|
||||
rate_limiter: Arc::new(tokio::sync::RwLock::new(rate_limiter)),
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
skip_realtime_message: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -156,11 +162,22 @@ impl WSClient {
|
|||
|
||||
let user_message_tx = self.user_channel.as_ref().clone();
|
||||
let rate_limiter = self.rate_limiter.clone();
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
let cloned_skip_realtime_message = self.skip_realtime_message.clone();
|
||||
|
||||
// Receive messages from the websocket, and send them to the channels.
|
||||
platform_spawn(async move {
|
||||
while let Some(Ok(ws_msg)) = stream.next().await {
|
||||
match ws_msg {
|
||||
Message::Binary(_) => {
|
||||
#[cfg(debug_assertions)]
|
||||
{
|
||||
if cloned_skip_realtime_message.load(std::sync::atomic::Ordering::SeqCst) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
match RealtimeMessage::try_from(&ws_msg) {
|
||||
Ok(msg) => {
|
||||
match msg {
|
||||
|
|
@ -341,3 +358,18 @@ fn gen_rate_limiter(
|
|||
let quota = Quota::per_second(NonZeroU32::new(times_per_sec).unwrap());
|
||||
RateLimiter::direct(quota)
|
||||
}
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
impl WSClient {
|
||||
pub fn disable_receive_message(&self) {
|
||||
self
|
||||
.skip_realtime_message
|
||||
.store(true, std::sync::atomic::Ordering::SeqCst);
|
||||
}
|
||||
|
||||
pub fn enable_receive_message(&self) {
|
||||
self
|
||||
.skip_realtime_message
|
||||
.store(false, std::sync::atomic::Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -106,6 +106,13 @@ impl CollabMessage {
|
|||
matches!(self, CollabMessage::ServerInitSync(_))
|
||||
}
|
||||
|
||||
pub fn broadcase_seq_num(&self) -> Option<u32> {
|
||||
match self {
|
||||
CollabMessage::ServerBroadcast(data) => Some(data.seq_num),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn type_str(&self) -> String {
|
||||
match self {
|
||||
CollabMessage::ClientInitSync(_) => "ClientInitSync".to_string(),
|
||||
|
|
@ -468,14 +475,16 @@ pub struct CollabBroadcastData {
|
|||
/// "The payload is encoded using the `EncoderV1` with the `Message` struct.
|
||||
/// It can be parsed into: Message::Sync::(SyncMessage::Update(update))
|
||||
payload: Bytes,
|
||||
seq_num: u32,
|
||||
}
|
||||
|
||||
impl CollabBroadcastData {
|
||||
pub fn new(origin: CollabOrigin, object_id: String, payload: Vec<u8>) -> Self {
|
||||
pub fn new(origin: CollabOrigin, object_id: String, payload: Vec<u8>, seq_num: u32) -> Self {
|
||||
Self {
|
||||
origin,
|
||||
object_id,
|
||||
payload: Bytes::from(payload),
|
||||
seq_num,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -47,7 +47,11 @@ pub trait CollabSyncProtocol {
|
|||
|
||||
fn start<E: Encoder>(&self, awareness: &Awareness, encoder: &mut E) -> Result<(), Error> {
|
||||
let (sv, update) = {
|
||||
let sv = awareness.doc().transact().state_vector();
|
||||
let sv = awareness
|
||||
.doc()
|
||||
.try_transact()
|
||||
.map_err(|e| Error::YrsTransaction(e.to_string()))?
|
||||
.state_vector();
|
||||
let update = awareness.update()?;
|
||||
(sv, update)
|
||||
};
|
||||
|
|
|
|||
|
|
@ -1,25 +1,19 @@
|
|||
use anyhow::anyhow;
|
||||
use collab::core::awareness;
|
||||
use std::future::Future;
|
||||
use std::iter::Take;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::collaborate::sync_protocol::ServerSyncProtocol;
|
||||
use collab::core::awareness;
|
||||
use collab::core::awareness::{Awareness, AwarenessUpdate};
|
||||
use collab::core::collab::MutexCollab;
|
||||
use collab::core::origin::CollabOrigin;
|
||||
use futures_util::{SinkExt, StreamExt};
|
||||
use realtime_protocol::{handle_collab_message, Error};
|
||||
use realtime_protocol::{Message, MessageReader, MSG_SYNC, MSG_SYNC_UPDATE};
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::sync::Arc;
|
||||
use tokio::select;
|
||||
use tokio::sync::broadcast::error::SendError;
|
||||
use tokio::sync::broadcast::{channel, Sender};
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::time::Instant;
|
||||
use tokio_retry::strategy::FixedInterval;
|
||||
use tokio_retry::{Action, Retry};
|
||||
|
||||
use yrs::updates::decoder::DecoderV1;
|
||||
use yrs::updates::encoder::{Encode, Encoder, EncoderV1};
|
||||
use yrs::UpdateSubscription;
|
||||
|
|
@ -34,12 +28,18 @@ use yrs::encoding::write::Write;
|
|||
/// A broadcast can be used to propagate updates produced by yrs [yrs::Doc] and [Awareness]
|
||||
/// to subscribes. One broadcast can be used to propagate updates for a single document with
|
||||
/// object_id.
|
||||
///
|
||||
pub struct CollabBroadcast {
|
||||
object_id: String,
|
||||
collab: MutexCollab,
|
||||
sender: Sender<CollabMessage>,
|
||||
awareness_sub: Mutex<Option<awareness::UpdateSubscription>>,
|
||||
/// Keep the lifetime of the document observer subscription. The subscription will be stopped
|
||||
/// when the broadcast is dropped.
|
||||
doc_subscription: Mutex<Option<UpdateSubscription>>,
|
||||
broadcast_seq_num_counter: Arc<AtomicU32>,
|
||||
/// The last modified time of the document.
|
||||
pub modified_at: Arc<Mutex<Instant>>,
|
||||
}
|
||||
|
||||
impl Drop for CollabBroadcast {
|
||||
|
|
@ -65,6 +65,8 @@ impl CollabBroadcast {
|
|||
sender,
|
||||
awareness_sub: Default::default(),
|
||||
doc_subscription: Default::default(),
|
||||
broadcast_seq_num_counter: Arc::new(Default::default()),
|
||||
modified_at: Arc::new(Mutex::new(Instant::now())),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -75,14 +77,23 @@ impl CollabBroadcast {
|
|||
// Observer the document's update and broadcast it to all subscribers.
|
||||
let cloned_oid = self.object_id.clone();
|
||||
let broadcast_sink = self.sender.clone();
|
||||
let seq_num_counter = self.broadcast_seq_num_counter.clone();
|
||||
let modified_at = self.modified_at.clone();
|
||||
|
||||
// Observer the document's update and broadcast it to all subscribers. When one of the clients
|
||||
// sends an update to the document that alters its state, the document observer will trigger
|
||||
// an update event. This event is then broadcast to all connected clients. After broadcasting, all
|
||||
// connected clients will receive the update and apply it to their local document state.
|
||||
let doc_sub = mutex_collab
|
||||
.get_mut_awareness()
|
||||
.doc_mut()
|
||||
.observe_update_v1(move |txn, event| {
|
||||
let value = seq_num_counter.fetch_add(1, Ordering::SeqCst);
|
||||
|
||||
let update_len = event.update.len();
|
||||
let origin = CollabOrigin::from(txn);
|
||||
let payload = gen_update_message(&event.update);
|
||||
let msg = CollabBroadcastData::new(origin, cloned_oid.clone(), payload);
|
||||
let msg = CollabBroadcastData::new(origin, cloned_oid.clone(), payload, value + 1);
|
||||
|
||||
match broadcast_sink.send(msg.into()) {
|
||||
Ok(_) => trace!("observe doc update with len:{}", update_len),
|
||||
|
|
@ -91,6 +102,10 @@ impl CollabBroadcast {
|
|||
update_len, e
|
||||
),
|
||||
}
|
||||
|
||||
if let Ok(mut modified_at) = modified_at.try_lock() {
|
||||
*modified_at = Instant::now();
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
|
|
@ -138,7 +153,6 @@ impl CollabBroadcast {
|
|||
/// - `subscriber_origin`: Identifies the subscriber's origin to avoid echoing messages back.
|
||||
/// - `sink`: A `Sink` implementation for sending messages to the subscriber(Each connected client).
|
||||
/// - `stream`: A `Stream` implementation for receiving messages from the subscriber((Each connected client)).
|
||||
/// - `modified_at`: A shared, mutable reference to track the last modification time of the document.
|
||||
///
|
||||
/// # Behavior
|
||||
/// - [Sink] Forwards updates received from the document observer to all subscribers through 'sink', excluding the originator
|
||||
|
|
@ -163,7 +177,6 @@ impl CollabBroadcast {
|
|||
subscriber_origin: CollabOrigin,
|
||||
mut sink: Sink,
|
||||
mut stream: Stream,
|
||||
modified_at: Arc<Mutex<Instant>>,
|
||||
) -> Subscription
|
||||
where
|
||||
Sink: SinkExt<CollabMessage> + Clone + Send + Sync + Unpin + 'static,
|
||||
|
|
@ -173,11 +186,12 @@ impl CollabBroadcast {
|
|||
{
|
||||
let cloned_origin = subscriber_origin.clone();
|
||||
trace!("[realtime]: new subscriber: {}", subscriber_origin);
|
||||
// Receive a update from the document observer and forward the update to all
|
||||
// connected subscribers using its Sink.
|
||||
let sink_stop_tx = {
|
||||
let sink = sink.clone();
|
||||
let mut sink = sink.clone();
|
||||
let (stop_tx, mut stop_rx) = tokio::sync::mpsc::channel::<()>(1);
|
||||
|
||||
// the receiver will continue to receive updates from the document observer and forward the update to
|
||||
// connected subscriber using its Sink. The loop will break if the stop_rx receives a message.
|
||||
let mut receiver = self.sender.subscribe();
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
|
|
@ -190,13 +204,8 @@ impl CollabBroadcast {
|
|||
continue;
|
||||
}
|
||||
|
||||
trace!("[realtime]: broadcast collab message: {}", message);
|
||||
let action = SinkCollabMessageAction {
|
||||
sink: &sink,
|
||||
message,
|
||||
};
|
||||
|
||||
if let Err(err) = action.run().await {
|
||||
trace!("[realtime]: broadcast message to client: {}", message);
|
||||
if let Err(err) = sink.send(message).await {
|
||||
error!("fail to broadcast message:{}", err);
|
||||
}
|
||||
}
|
||||
|
|
@ -209,16 +218,14 @@ impl CollabBroadcast {
|
|||
stop_tx
|
||||
};
|
||||
|
||||
// Receive messages from clients and reply with the response. The message may alter the
|
||||
// document that the current broadcast group is associated with. If the message alter
|
||||
// the document state then the document observer will be triggered and the update will be
|
||||
// broadcast to all connected subscribers. Check out the [observe_update_v1] and [sink_task]
|
||||
// above.
|
||||
let stream_stop_tx = {
|
||||
let (stream_stop_tx, mut stop_rx) = tokio::sync::mpsc::channel::<()>(1);
|
||||
let collab = self.collab().clone();
|
||||
let object_id = self.object_id.clone();
|
||||
|
||||
// the stream will continue to receive messages from the client and it will stop if the stop_rx
|
||||
// receives a message. If the client's message alter the document state, it will trigger the
|
||||
// document observer and broadcast the update to all connected subscribers. Check out the [observe_update_v1] and [sink_task] above.
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
select! {
|
||||
|
|
@ -226,11 +233,9 @@ impl CollabBroadcast {
|
|||
result = stream.next() => {
|
||||
match result {
|
||||
Some(Ok(collab_msg)) => {
|
||||
// The message is valid if it has a payload and the object_id matches the broadcast's object_id.
|
||||
if object_id == collab_msg.object_id() && collab_msg.payload().is_some() {
|
||||
handle_client_collab_message(&object_id, &mut sink, &collab_msg, &collab).await;
|
||||
if let Ok(mut modified_at) = modified_at.try_lock() {
|
||||
*modified_at = Instant::now();
|
||||
}
|
||||
} else {
|
||||
warn!("Invalid collab message: {:?}", collab_msg);
|
||||
}
|
||||
|
|
@ -267,7 +272,6 @@ async fn handle_client_collab_message<Sink>(
|
|||
match collab_msg.payload() {
|
||||
None => {},
|
||||
Some(payload) => {
|
||||
let object_id = object_id.to_string();
|
||||
let mut decoder = DecoderV1::from(payload.as_ref());
|
||||
let origin = collab_msg.origin().clone();
|
||||
let reader = MessageReader::new(&mut decoder);
|
||||
|
|
@ -279,7 +283,7 @@ async fn handle_client_collab_message<Sink>(
|
|||
if let Some(msg_id) = collab_msg.msg_id() {
|
||||
match result {
|
||||
Ok(payload) => {
|
||||
let resp = CollabAck::new(origin.clone(), object_id.clone(), msg_id)
|
||||
let resp = CollabAck::new(origin.clone(), object_id.to_string(), msg_id)
|
||||
.with_payload(payload.unwrap_or_default());
|
||||
|
||||
trace!("Send response to client: {}", resp);
|
||||
|
|
@ -289,7 +293,7 @@ async fn handle_client_collab_message<Sink>(
|
|||
},
|
||||
Err(err) => {
|
||||
error!("handle collab:{} message error:{}", object_id, err);
|
||||
let resp = CollabAck::new(origin.clone(), object_id.clone(), msg_id)
|
||||
let resp = CollabAck::new(origin.clone(), object_id.to_string(), msg_id)
|
||||
.with_code(ack_code_from_error(&err));
|
||||
|
||||
if let Err(err) = sink.send(resp.into()).await {
|
||||
|
|
@ -382,39 +386,3 @@ fn gen_awareness_update_message(
|
|||
let update = awareness.update_with_clients(changed)?;
|
||||
Ok(update)
|
||||
}
|
||||
|
||||
pub struct SinkCollabMessageAction<'a, Sink: Clone> {
|
||||
pub sink: &'a Sink,
|
||||
pub message: CollabMessage,
|
||||
}
|
||||
|
||||
impl<'a, Sink> SinkCollabMessageAction<'a, Sink>
|
||||
where
|
||||
Sink: SinkExt<CollabMessage> + Clone + Send + Sync + Unpin + 'a,
|
||||
{
|
||||
pub fn run(self) -> Retry<Take<FixedInterval>, SinkCollabMessageAction<'a, Sink>> {
|
||||
let retry_strategy = FixedInterval::new(Duration::from_secs(2)).take(5);
|
||||
Retry::spawn(retry_strategy, self)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, Sink> Action for SinkCollabMessageAction<'a, Sink>
|
||||
where
|
||||
Sink: SinkExt<CollabMessage> + Clone + Send + Sync + Unpin + 'a,
|
||||
{
|
||||
type Future = Pin<Box<dyn Future<Output = Result<Self::Item, Self::Error>> + Send + Sync + 'a>>;
|
||||
type Item = ();
|
||||
type Error = RealtimeError;
|
||||
|
||||
fn run(&mut self) -> Self::Future {
|
||||
let mut sink = self.sink.clone();
|
||||
let message = self.message.clone();
|
||||
Box::pin(async move {
|
||||
sink
|
||||
.send(message)
|
||||
.await
|
||||
.map_err(|_err| RealtimeError::Internal(anyhow!("Sink message fail")))?;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,9 +11,9 @@ use database::collab::CollabStorage;
|
|||
use futures_util::{SinkExt, StreamExt};
|
||||
use realtime_entity::collab_msg::CollabMessage;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use tokio::task::spawn_blocking;
|
||||
use tokio::time::Instant;
|
||||
|
||||
use tracing::{debug, error, event, instrument, trace};
|
||||
|
||||
pub struct CollabGroupControl<S, U, AC> {
|
||||
|
|
@ -171,7 +171,6 @@ pub struct CollabGroup<U> {
|
|||
/// broadcast.
|
||||
subscribers: DashMap<U, Subscription>,
|
||||
user_by_user_device: DashMap<String, U>,
|
||||
pub modified_at: Arc<Mutex<Instant>>,
|
||||
}
|
||||
|
||||
impl<U> Drop for CollabGroup<U> {
|
||||
|
|
@ -189,14 +188,12 @@ where
|
|||
collab: Arc<MutexCollab>,
|
||||
broadcast: CollabBroadcast,
|
||||
) -> Self {
|
||||
let modified_at = Arc::new(Mutex::new(Instant::now()));
|
||||
Self {
|
||||
collab_type,
|
||||
collab,
|
||||
broadcast,
|
||||
subscribers: Default::default(),
|
||||
user_by_user_device: Default::default(),
|
||||
modified_at,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -294,9 +291,7 @@ where
|
|||
<Sink as futures_util::Sink<CollabMessage>>::Error: std::error::Error + Send + Sync,
|
||||
E: Into<Error> + Send + Sync + 'static,
|
||||
{
|
||||
let sub = self
|
||||
.broadcast
|
||||
.subscribe(subscriber_origin, sink, stream, self.modified_at.clone());
|
||||
let sub = self.broadcast.subscribe(subscriber_origin, sink, stream);
|
||||
|
||||
// Remove the old user if it exists
|
||||
let user_device = user.user_device();
|
||||
|
|
@ -332,7 +327,7 @@ where
|
|||
/// Check if the group is active. A group is considered active if it has at least one
|
||||
/// subscriber or has been modified within the last 10 minutes.
|
||||
pub async fn is_inactive(&self) -> bool {
|
||||
let modified_at = self.modified_at.lock().await;
|
||||
let modified_at = self.broadcast.modified_at.lock().await;
|
||||
if cfg!(debug_assertions) {
|
||||
modified_at.elapsed().as_secs() > 60
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -6,12 +6,12 @@ use std::fmt::Display;
|
|||
|
||||
use crate::collaborate::CollabAccessControl;
|
||||
use anyhow::anyhow;
|
||||
use collab::core::awareness::Awareness;
|
||||
|
||||
use collab::core::collab::TransactionMutExt;
|
||||
use collab::core::collab_plugin::EncodedCollab;
|
||||
use collab::core::origin::CollabOrigin;
|
||||
use collab::core::transaction::DocTransactionExtension;
|
||||
use collab::preclude::{CollabPlugin, Doc, TransactionMut};
|
||||
use collab::preclude::{Collab, CollabPlugin, Doc, TransactionMut};
|
||||
use collab_entity::CollabType;
|
||||
use database::collab::CollabStorage;
|
||||
use database_entity::dto::{
|
||||
|
|
@ -208,7 +208,7 @@ where
|
|||
},
|
||||
}
|
||||
}
|
||||
fn did_init(&self, _awareness: &Awareness, _object_id: &str, _last_sync_at: i64) {
|
||||
fn did_init(&self, _collab: &Collab, _object_id: &str, _last_sync_at: i64) {
|
||||
self.edit_state.set_did_load()
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,7 +1,8 @@
|
|||
use crate::collab::util::generate_random_string;
|
||||
use assert_json_diff::{assert_json_eq, assert_json_include};
|
||||
use client_api_test_util::{
|
||||
assert_client_collab, assert_client_collab_include_value, assert_server_collab, TestClient,
|
||||
assert_client_collab_include_value_within_30_secs, assert_client_collab_within_30_secs,
|
||||
assert_server_collab, TestClient,
|
||||
};
|
||||
use collab_entity::CollabType;
|
||||
use database_entity::dto::{AFAccessLevel, AFRole};
|
||||
|
|
@ -37,7 +38,7 @@ async fn recv_updates_without_permission_test() {
|
|||
.lock()
|
||||
.insert("name", "AppFlowy");
|
||||
client_1.wait_object_sync_complete(&object_id).await;
|
||||
assert_client_collab(&mut client_2, &object_id, "name", json!({}), 3).await;
|
||||
assert_client_collab_within_30_secs(&mut client_2, &object_id, "name", json!({})).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
@ -78,7 +79,7 @@ async fn recv_remote_updates_with_readonly_permission_test() {
|
|||
let expected = json!({
|
||||
"name": "AppFlowy"
|
||||
});
|
||||
assert_client_collab(&mut client_2, &object_id, "name", expected.clone(), 10).await;
|
||||
assert_client_collab_within_30_secs(&mut client_2, &object_id, "name", expected.clone()).await;
|
||||
assert_server_collab(
|
||||
&workspace_id,
|
||||
&mut client_1.api_client,
|
||||
|
|
@ -136,7 +137,7 @@ async fn init_sync_with_readonly_permission_test() {
|
|||
client_2
|
||||
.open_collab(&workspace_id, &object_id, collab_type.clone())
|
||||
.await;
|
||||
assert_client_collab_include_value(&mut client_2, &object_id, expected).await;
|
||||
assert_client_collab_include_value_within_30_secs(&mut client_2, &object_id, expected).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
@ -173,7 +174,7 @@ async fn edit_collab_with_readonly_permission_test() {
|
|||
.collab
|
||||
.lock()
|
||||
.insert("name", "AppFlowy");
|
||||
assert_client_collab_include_value(
|
||||
assert_client_collab_include_value_within_30_secs(
|
||||
&mut client_2,
|
||||
&object_id,
|
||||
json!({
|
||||
|
|
@ -230,7 +231,8 @@ async fn edit_collab_with_read_and_write_permission_test() {
|
|||
let expected = json!({
|
||||
"name": "AppFlowy"
|
||||
});
|
||||
assert_client_collab_include_value(&mut client_2, &object_id, expected.clone()).await;
|
||||
assert_client_collab_include_value_within_30_secs(&mut client_2, &object_id, expected.clone())
|
||||
.await;
|
||||
|
||||
assert_server_collab(
|
||||
&workspace_id,
|
||||
|
|
@ -280,7 +282,7 @@ async fn edit_collab_with_full_access_permission_test() {
|
|||
let expected = json!({
|
||||
"name": "AppFlowy"
|
||||
});
|
||||
assert_client_collab(&mut client_2, &object_id, "name", expected.clone(), 5).await;
|
||||
assert_client_collab_within_30_secs(&mut client_2, &object_id, "name", expected.clone()).await;
|
||||
|
||||
assert_server_collab(
|
||||
&workspace_id,
|
||||
|
|
@ -349,7 +351,7 @@ async fn edit_collab_with_full_access_then_readonly_permission() {
|
|||
.insert("subtitle", "Writing Rust, fun");
|
||||
}
|
||||
|
||||
assert_client_collab_include_value(
|
||||
assert_client_collab_include_value_within_30_secs(
|
||||
&mut client_2,
|
||||
&object_id,
|
||||
json!({
|
||||
|
|
|
|||
|
|
@ -23,11 +23,15 @@ async fn edit_workspace_without_permission() {
|
|||
.insert("name", "AppFlowy");
|
||||
client_1.wait_object_sync_complete(&workspace_id).await;
|
||||
|
||||
assert_client_collab_include_value(&mut client_1, &workspace_id, json!({"name": "AppFlowy"}))
|
||||
.await;
|
||||
assert_client_collab_include_value_within_30_secs(
|
||||
&mut client_1,
|
||||
&workspace_id,
|
||||
json!({"name": "AppFlowy"}),
|
||||
)
|
||||
.await;
|
||||
|
||||
// client 2 has not permission to read/edit the workspace
|
||||
assert_client_collab_include_value(&mut client_2, &workspace_id, json!({})).await;
|
||||
assert_client_collab_include_value_within_30_secs(&mut client_2, &workspace_id, json!({})).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
@ -52,20 +56,18 @@ async fn init_sync_workspace_with_guest_permission() {
|
|||
.insert("name", "AppFlowy");
|
||||
client_1.wait_object_sync_complete(&workspace_id).await;
|
||||
|
||||
assert_client_collab(
|
||||
assert_client_collab_within_30_secs(
|
||||
&mut client_1,
|
||||
&workspace_id,
|
||||
"name",
|
||||
json!({"name": "AppFlowy"}),
|
||||
3,
|
||||
)
|
||||
.await;
|
||||
assert_client_collab(
|
||||
assert_client_collab_within_30_secs(
|
||||
&mut client_2,
|
||||
&workspace_id,
|
||||
"name",
|
||||
json!({"name": "AppFlowy"}),
|
||||
3,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
|
@ -104,8 +106,18 @@ async fn edit_workspace_with_guest_permission() {
|
|||
.lock()
|
||||
.insert("name", "nathan");
|
||||
|
||||
assert_client_collab_include_value(&mut client_1, &workspace_id, json!({"name": "zack"})).await;
|
||||
assert_client_collab_include_value(&mut client_2, &workspace_id, json!({"name": "nathan"})).await;
|
||||
assert_client_collab_include_value_within_30_secs(
|
||||
&mut client_1,
|
||||
&workspace_id,
|
||||
json!({"name": "zack"}),
|
||||
)
|
||||
.await;
|
||||
assert_client_collab_include_value_within_30_secs(
|
||||
&mut client_2,
|
||||
&workspace_id,
|
||||
json!({"name": "nathan"}),
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_server_collab(
|
||||
&workspace_id,
|
||||
|
|
|
|||
|
|
@ -1,3 +1,5 @@
|
|||
use crate::collab::util::generate_random_string;
|
||||
use client_api::collab_sync::NUMBER_OF_UPDATE_TRIGGER_INIT_SYNC;
|
||||
use client_api_test_util::*;
|
||||
use collab_entity::CollabType;
|
||||
use database_entity::dto::{AFAccessLevel, QueryCollabParams};
|
||||
|
|
@ -8,7 +10,7 @@ use tokio::time::sleep;
|
|||
use tracing::trace;
|
||||
|
||||
#[tokio::test]
|
||||
async fn edit_collab_with_ws_reconnect_sync_test() {
|
||||
async fn sync_collab_content_after_reconnect_test() {
|
||||
let object_id = uuid::Uuid::new_v4().to_string();
|
||||
let collab_type = CollabType::Document;
|
||||
|
||||
|
|
@ -66,7 +68,7 @@ async fn edit_collab_with_ws_reconnect_sync_test() {
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn edit_collab_with_different_devices_test() {
|
||||
async fn same_client_with_diff_devices_edit_same_collab_test() {
|
||||
let collab_type = CollabType::Document;
|
||||
let registered_user = generate_unique_registered_user().await;
|
||||
let mut client_1 = TestClient::user_with_new_device(registered_user.clone()).await;
|
||||
|
|
@ -110,13 +112,80 @@ async fn edit_collab_with_different_devices_test() {
|
|||
let expected_json = json!({
|
||||
"name": "workspace"
|
||||
});
|
||||
assert_client_collab(&mut client_1, &object_id, "name", expected_json.clone(), 10).await;
|
||||
assert_client_collab(&mut client_2, &object_id, "name", expected_json.clone(), 10).await;
|
||||
assert_client_collab_within_30_secs(&mut client_1, &object_id, "name", expected_json.clone())
|
||||
.await;
|
||||
assert_client_collab_within_30_secs(&mut client_2, &object_id, "name", expected_json.clone())
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn same_client_with_diff_devices_edit_diff_collab_test() {
|
||||
let registered_user = generate_unique_registered_user().await;
|
||||
let collab_type = CollabType::Document;
|
||||
let mut device_1 = TestClient::user_with_new_device(registered_user.clone()).await;
|
||||
let mut device_2 = TestClient::user_with_new_device(registered_user.clone()).await;
|
||||
|
||||
let workspace_id = device_1.workspace_id().await;
|
||||
|
||||
// different devices create different collabs. the collab will be synced between devices
|
||||
let object_id_1 = device_1
|
||||
.create_and_edit_collab(&workspace_id, collab_type.clone())
|
||||
.await;
|
||||
let object_id_2 = device_2
|
||||
.create_and_edit_collab(&workspace_id, collab_type.clone())
|
||||
.await;
|
||||
|
||||
// client 1 edit the collab with object_id_1
|
||||
device_1
|
||||
.collab_by_object_id
|
||||
.get_mut(&object_id_1)
|
||||
.unwrap()
|
||||
.collab
|
||||
.lock()
|
||||
.insert("name", "object 1");
|
||||
device_1.wait_object_sync_complete(&object_id_1).await;
|
||||
|
||||
// client 2 edit the collab with object_id_2
|
||||
device_2
|
||||
.collab_by_object_id
|
||||
.get_mut(&object_id_2)
|
||||
.unwrap()
|
||||
.collab
|
||||
.lock()
|
||||
.insert("name", "object 2");
|
||||
device_2.wait_object_sync_complete(&object_id_2).await;
|
||||
|
||||
// client1 open the collab with object_id_2
|
||||
device_1
|
||||
.open_collab(&workspace_id, &object_id_2, collab_type.clone())
|
||||
.await;
|
||||
assert_client_collab_within_30_secs(
|
||||
&mut device_1,
|
||||
&object_id_2,
|
||||
"name",
|
||||
json!({
|
||||
"name": "object 2"
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
|
||||
// client2 open the collab with object_id_1
|
||||
device_2
|
||||
.open_collab(&workspace_id, &object_id_1, collab_type.clone())
|
||||
.await;
|
||||
assert_client_collab_within_30_secs(
|
||||
&mut device_2,
|
||||
&object_id_1,
|
||||
"name",
|
||||
json!({
|
||||
"name": "object 1"
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn edit_document_with_both_clients_offline_then_online_sync_test() {
|
||||
let _object_id = uuid::Uuid::new_v4().to_string();
|
||||
let collab_type = CollabType::Document;
|
||||
let mut client_1 = TestClient::new_user().await;
|
||||
let mut client_2 = TestClient::new_user().await;
|
||||
|
|
@ -180,6 +249,121 @@ async fn edit_document_with_both_clients_offline_then_online_sync_test() {
|
|||
"8": "Task 8",
|
||||
"9": "Task 9"
|
||||
});
|
||||
assert_client_collab_include_value(&mut client_1, &object_id, expected_json.clone()).await;
|
||||
assert_client_collab_include_value(&mut client_2, &object_id, expected_json.clone()).await;
|
||||
assert_client_collab_include_value_within_30_secs(
|
||||
&mut client_1,
|
||||
&object_id,
|
||||
expected_json.clone(),
|
||||
)
|
||||
.await;
|
||||
assert_client_collab_include_value_within_30_secs(
|
||||
&mut client_2,
|
||||
&object_id,
|
||||
expected_json.clone(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn init_sync_when_missing_updates_test() {
|
||||
let text = generate_random_string(1024);
|
||||
let collab_type = CollabType::Document;
|
||||
let mut client_1 = TestClient::new_user().await;
|
||||
let mut client_2 = TestClient::new_user().await;
|
||||
|
||||
// Create a collaborative document with client_1 and invite client_2 to collaborate.
|
||||
let workspace_id = client_1.workspace_id().await;
|
||||
let object_id = client_1
|
||||
.create_and_edit_collab(&workspace_id, collab_type.clone())
|
||||
.await;
|
||||
client_1
|
||||
.add_client_as_collab_member(
|
||||
&workspace_id,
|
||||
&object_id,
|
||||
&client_2,
|
||||
AFAccessLevel::ReadAndWrite,
|
||||
)
|
||||
.await;
|
||||
|
||||
// Client_1 makes the first edit by inserting "task 1".
|
||||
client_1
|
||||
.collab_by_object_id
|
||||
.get_mut(&object_id)
|
||||
.unwrap()
|
||||
.collab
|
||||
.lock()
|
||||
.insert("1", "task 1");
|
||||
client_1.wait_object_sync_complete(&object_id).await;
|
||||
|
||||
// Client_2 opens the collaboration, triggering an initial sync to receive "task 1".
|
||||
client_2
|
||||
.open_collab(&workspace_id, &object_id, collab_type.clone())
|
||||
.await;
|
||||
client_2.wait_object_sync_complete(&object_id).await;
|
||||
|
||||
// Validate both clients have "task 1" after the initial sync.
|
||||
assert_eq!(
|
||||
client_1.get_edit_collab_json(&object_id).await,
|
||||
json!({ "1": "task 1" })
|
||||
);
|
||||
assert_eq!(
|
||||
client_2.get_edit_collab_json(&object_id).await,
|
||||
json!({ "1": "task 1" })
|
||||
);
|
||||
|
||||
// Simulate client_2 missing updates by enabling skip_realtime_message.
|
||||
client_2.ws_client.disable_receive_message();
|
||||
client_1.wait_object_sync_complete(&object_id).await;
|
||||
|
||||
// Client_1 inserts "task 2", which client_2 misses due to skipping realtime messages.
|
||||
for _ in 0..NUMBER_OF_UPDATE_TRIGGER_INIT_SYNC {
|
||||
client_1
|
||||
.collab_by_object_id
|
||||
.get_mut(&object_id)
|
||||
.unwrap()
|
||||
.collab
|
||||
.lock()
|
||||
.insert("2", text.clone());
|
||||
sleep(Duration::from_millis(300)).await;
|
||||
}
|
||||
client_1.wait_object_sync_complete(&object_id).await;
|
||||
|
||||
client_2
|
||||
.collab_by_object_id
|
||||
.get_mut(&object_id)
|
||||
.unwrap()
|
||||
.collab
|
||||
.lock()
|
||||
.insert("3", "task 3");
|
||||
client_2.wait_object_sync_complete(&object_id).await;
|
||||
|
||||
// Validate client_1's view includes "task 2", and "task 3", while client_2 missed key2 and key3.
|
||||
assert_client_collab_include_value_within_30_secs(
|
||||
&mut client_1,
|
||||
&object_id,
|
||||
json!({ "1": "task 1", "2": text.clone(), "3": "task 3" }),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(
|
||||
client_2.get_edit_collab_json(&object_id).await,
|
||||
json!({ "1": "task 1", "3": "task 3" })
|
||||
);
|
||||
|
||||
// client_2 resumes receiving messages
|
||||
// client_1 triggers a sync that will trigger broadcast message to client_2
|
||||
client_2.ws_client.enable_receive_message();
|
||||
client_1
|
||||
.collab_by_object_id
|
||||
.get_mut(&object_id)
|
||||
.unwrap()
|
||||
.collab
|
||||
.lock()
|
||||
.insert("4", "task 4");
|
||||
client_1.wait_object_sync_complete(&object_id).await;
|
||||
|
||||
assert_client_collab_include_value_within_30_secs(
|
||||
&mut client_2,
|
||||
&object_id,
|
||||
json!({ "1": "task 1", "2": text.clone(), "3": "task 3", "4": "task 4" }),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -258,7 +258,7 @@ async fn user_with_duplicate_devices_connect_edit_test() {
|
|||
new_client.wait_object_sync_complete(&object_id).await;
|
||||
|
||||
// Old client shouldn't receive the new client's edit
|
||||
assert_client_collab_include_value(
|
||||
assert_client_collab_include_value_within_30_secs(
|
||||
&mut old_client,
|
||||
&object_id,
|
||||
json!({
|
||||
|
|
@ -268,7 +268,7 @@ async fn user_with_duplicate_devices_connect_edit_test() {
|
|||
)
|
||||
.await;
|
||||
|
||||
assert_client_collab_include_value(
|
||||
assert_client_collab_include_value_within_30_secs(
|
||||
&mut new_client,
|
||||
&object_id,
|
||||
json!({
|
||||
|
|
@ -342,8 +342,18 @@ async fn two_direction_peer_sync_test() {
|
|||
"name": "AppFlowy",
|
||||
"support platform": "macOS, Windows, Linux, iOS, Android"
|
||||
});
|
||||
assert_client_collab_include_value(&mut client_1, &object_id, expected_json.clone()).await;
|
||||
assert_client_collab_include_value(&mut client_2, &object_id, expected_json.clone()).await;
|
||||
assert_client_collab_include_value_within_30_secs(
|
||||
&mut client_1,
|
||||
&object_id,
|
||||
expected_json.clone(),
|
||||
)
|
||||
.await;
|
||||
assert_client_collab_include_value_within_30_secs(
|
||||
&mut client_2,
|
||||
&object_id,
|
||||
expected_json.clone(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
|
|||
Loading…
Reference in New Issue