Merge branch 'main' into gotrue-2.159.0
This commit is contained in:
commit
66f74f2e4b
|
|
@ -600,6 +600,7 @@ impl TestClient {
|
|||
stream,
|
||||
Some(handler),
|
||||
ws_connect_state,
|
||||
Some(Duration::from_secs(10)),
|
||||
);
|
||||
let lock = collab.read().await;
|
||||
let collab = (*lock).borrow();
|
||||
|
|
@ -672,6 +673,7 @@ impl TestClient {
|
|||
stream,
|
||||
Some(handler),
|
||||
ws_connect_state,
|
||||
Some(Duration::from_secs(10)),
|
||||
);
|
||||
|
||||
let lock = collab.read().await;
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ use std::borrow::BorrowMut;
|
|||
use std::marker::PhantomData;
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::time::Duration;
|
||||
|
||||
use arc_swap::ArcSwap;
|
||||
use collab::core::origin::CollabOrigin;
|
||||
|
|
@ -13,6 +14,8 @@ use tokio_util::sync::CancellationToken;
|
|||
use tracing::{error, instrument, trace, warn};
|
||||
use yrs::encoding::read::Cursor;
|
||||
use yrs::updates::decoder::DecoderV1;
|
||||
use yrs::updates::encoder::Encode;
|
||||
use yrs::ReadTxn;
|
||||
|
||||
use client_api_entity::{validate_data_for_folder, CollabType};
|
||||
use collab_rt_entity::{AckCode, ClientCollabMessage, ServerCollabMessage, ServerInit, UpdateSync};
|
||||
|
|
@ -58,6 +61,7 @@ where
|
|||
stream: Stream,
|
||||
weak_collab: CollabRef,
|
||||
sink: Weak<CollabSink<Sink>>,
|
||||
periodic_sync_interval: Option<Duration>,
|
||||
) -> Self {
|
||||
let object_id = object.object_id.clone();
|
||||
let cloned_weak_collab = weak_collab.clone() as CollabRef;
|
||||
|
|
@ -65,6 +69,17 @@ where
|
|||
let cloned_seq_num_counter = seq_num_counter.clone();
|
||||
let init_sync_cancel_token = ArcSwap::new(Arc::new(CancellationToken::new()));
|
||||
let arc_object = Arc::new(object);
|
||||
|
||||
if let Some(interval) = periodic_sync_interval {
|
||||
tracing::trace!("setting periodic sync step 1 for {}", object_id);
|
||||
af_spawn(ObserveCollab::<Sink, Stream>::periodic_sync_step_1(
|
||||
origin.clone(),
|
||||
sink.clone(),
|
||||
cloned_weak_collab.clone(),
|
||||
interval,
|
||||
object_id.clone(),
|
||||
));
|
||||
}
|
||||
af_spawn(ObserveCollab::<Sink, Stream>::observer_collab_message(
|
||||
origin,
|
||||
arc_object,
|
||||
|
|
@ -83,6 +98,43 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
/// Periodically run sync step 1 to make sure that there are no missing updates from other clients.
|
||||
async fn periodic_sync_step_1(
|
||||
origin: CollabOrigin,
|
||||
weak_sink: Weak<CollabSink<Sink>>,
|
||||
weak_collab: CollabRef,
|
||||
interval: Duration,
|
||||
object_id: String,
|
||||
) {
|
||||
loop {
|
||||
tokio::time::sleep(interval).await;
|
||||
let sink = match weak_sink.upgrade() {
|
||||
Some(sink) => sink,
|
||||
None => break,
|
||||
};
|
||||
|
||||
let collab = match weak_collab.upgrade() {
|
||||
Some(collab) => collab,
|
||||
None => break,
|
||||
};
|
||||
|
||||
let sv = {
|
||||
let lock = collab.read().await;
|
||||
let sv = (*lock).borrow().transact().state_vector();
|
||||
sv
|
||||
};
|
||||
let msg = Message::Sync(SyncMessage::SyncStep1(sv)).encode_v1();
|
||||
sink.queue_msg(|msg_id| {
|
||||
ClientCollabMessage::new_update_sync(UpdateSync::new(
|
||||
origin.clone(),
|
||||
object_id.clone(),
|
||||
msg,
|
||||
msg_id,
|
||||
))
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Spawn the stream that continuously reads the doc's updates from remote.
|
||||
async fn observer_collab_message(
|
||||
origin: CollabOrigin,
|
||||
|
|
|
|||
|
|
@ -64,6 +64,7 @@ where
|
|||
stream: Stream,
|
||||
channel: Option<Arc<Channel>>,
|
||||
mut ws_connect_state: WSConnectStateReceiver,
|
||||
periodic_sync: Option<Duration>,
|
||||
) -> Self {
|
||||
let sync_queue = SyncControl::new(
|
||||
object.clone(),
|
||||
|
|
@ -72,6 +73,7 @@ where
|
|||
sink_config,
|
||||
stream,
|
||||
collab.clone(),
|
||||
periodic_sync,
|
||||
);
|
||||
|
||||
let mut sync_state_stream = sync_queue.subscribe_sync_state();
|
||||
|
|
|
|||
|
|
@ -60,6 +60,7 @@ where
|
|||
sink_config: SinkConfig,
|
||||
stream: Stream,
|
||||
collab: CollabRef,
|
||||
periodic_sync: Option<Duration>,
|
||||
) -> Self {
|
||||
let protocol = ClientSyncProtocol;
|
||||
let (notifier, notifier_rx) = watch::channel(SinkSignal::Proceed);
|
||||
|
|
@ -86,6 +87,7 @@ where
|
|||
stream,
|
||||
collab.clone(),
|
||||
Arc::downgrade(&sink),
|
||||
periodic_sync,
|
||||
);
|
||||
|
||||
Self {
|
||||
|
|
|
|||
|
|
@ -1,13 +1,12 @@
|
|||
use collab::core::awareness::Awareness;
|
||||
use collab::core::collab::{TransactionExt, TransactionMutExt};
|
||||
use collab::core::origin::CollabOrigin;
|
||||
use yrs::updates::encoder::{Encode, Encoder, EncoderV1};
|
||||
use yrs::{ReadTxn, StateVector, Transact, Update};
|
||||
|
||||
use collab_rt_protocol::CollabSyncProtocol;
|
||||
use collab_rt_protocol::{CustomMessage, Message, RTProtocolError, SyncMessage};
|
||||
|
||||
use yrs::updates::encoder::{Encode, Encoder, EncoderV1};
|
||||
use yrs::{ReadTxn, StateVector, Transact, Update};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ServerSyncProtocol;
|
||||
impl CollabSyncProtocol for ServerSyncProtocol {
|
||||
|
|
@ -30,11 +29,12 @@ impl CollabSyncProtocol for ServerSyncProtocol {
|
|||
})?;
|
||||
|
||||
// Retrieve the latest document state from the client after they return online from offline editing.
|
||||
let server_step1_update = txn.state_vector();
|
||||
|
||||
let mut encoder = EncoderV1::new();
|
||||
Message::Sync(SyncMessage::SyncStep2(client_step2_update)).encode(&mut encoder);
|
||||
Message::Sync(SyncMessage::SyncStep1(server_step1_update)).encode(&mut encoder);
|
||||
|
||||
//FIXME: this should never happen as response to sync step 1 from the client, but rather be
|
||||
// send when a connection is established
|
||||
Message::Sync(SyncMessage::SyncStep1(txn.state_vector())).encode(&mut encoder);
|
||||
Ok(Some(encoder.to_vec()))
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -135,16 +135,29 @@ impl GroupManagementState {
|
|||
object_id: object_id.to_string(),
|
||||
};
|
||||
|
||||
self
|
||||
.editing_by_user
|
||||
.entry(user.clone())
|
||||
.or_default()
|
||||
.insert(editing);
|
||||
let entry = self.editing_by_user.entry(user.clone());
|
||||
match entry {
|
||||
dashmap::mapref::entry::Entry::Occupied(_) => {},
|
||||
dashmap::mapref::entry::Entry::Vacant(_) => {
|
||||
self
|
||||
.metrics_calculate
|
||||
.num_of_editing_users
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
},
|
||||
}
|
||||
|
||||
entry.or_default().insert(editing);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn remove_user(&self, user: &RealtimeUser) {
|
||||
let entry = self.editing_by_user.remove(user);
|
||||
if entry.is_some() {
|
||||
self
|
||||
.metrics_calculate
|
||||
.num_of_editing_users
|
||||
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
if let Some(editing_objects) = entry.map(|(_, e)| e) {
|
||||
for editing in editing_objects {
|
||||
match self.group_by_object_id.try_get(&editing.object_id) {
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ pub struct CollabRealtimeMetrics {
|
|||
total_success_get_encode_collab_from_redis: Gauge,
|
||||
total_attempt_get_encode_collab_from_redis: Gauge,
|
||||
opening_collab_count: Gauge,
|
||||
num_of_editing_users: Gauge,
|
||||
/// The number of apply update
|
||||
apply_update_count: Gauge,
|
||||
/// The number of apply update failed
|
||||
|
|
@ -27,6 +28,7 @@ impl CollabRealtimeMetrics {
|
|||
total_success_get_encode_collab_from_redis: Gauge::default(),
|
||||
total_attempt_get_encode_collab_from_redis: Gauge::default(),
|
||||
opening_collab_count: Gauge::default(),
|
||||
num_of_editing_users: Gauge::default(),
|
||||
apply_update_count: Default::default(),
|
||||
apply_update_failed_count: Default::default(),
|
||||
acquire_collab_lock_count: Default::default(),
|
||||
|
|
@ -57,6 +59,11 @@ impl CollabRealtimeMetrics {
|
|||
"number of opening collabs",
|
||||
metrics.opening_collab_count.clone(),
|
||||
);
|
||||
realtime_registry.register(
|
||||
"editing_users_count",
|
||||
"number of editing users",
|
||||
metrics.num_of_editing_users.clone(),
|
||||
);
|
||||
realtime_registry.register(
|
||||
"apply_update_count",
|
||||
"number of apply update",
|
||||
|
|
@ -91,6 +98,7 @@ pub(crate) struct CollabMetricsCalculate {
|
|||
pub(crate) apply_update_count: Arc<AtomicI64>,
|
||||
pub(crate) apply_update_failed_count: Arc<AtomicI64>,
|
||||
pub(crate) num_of_active_collab: Arc<AtomicI64>,
|
||||
pub(crate) num_of_editing_users: Arc<AtomicI64>,
|
||||
}
|
||||
|
||||
pub(crate) fn spawn_metrics<S>(
|
||||
|
|
@ -115,6 +123,13 @@ pub(crate) fn spawn_metrics<S>(
|
|||
.load(std::sync::atomic::Ordering::Relaxed),
|
||||
);
|
||||
|
||||
// editing users
|
||||
metrics.num_of_editing_users.set(
|
||||
metrics_calculation
|
||||
.num_of_editing_users
|
||||
.load(std::sync::atomic::Ordering::Relaxed),
|
||||
);
|
||||
|
||||
// connect user
|
||||
metrics.connected_users.set(
|
||||
metrics_calculation
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ use crate::group::cmd::{GroupCommand, GroupCommandRunner, GroupCommandSender};
|
|||
use crate::group::manager::GroupManager;
|
||||
use crate::indexer::IndexerProvider;
|
||||
use crate::metrics::CollabMetricsCalculate;
|
||||
use crate::rt_server::collaboration_runtime::COLLAB_RUNTIME;
|
||||
use crate::state::RedisConnectionManager;
|
||||
use crate::{spawn_metrics, CollabRealtimeMetrics, RealtimeClientWebsocketSink};
|
||||
|
||||
|
|
@ -199,7 +200,7 @@ where
|
|||
|
||||
let object_id = entry.key().clone();
|
||||
let clone_notify = notify.clone();
|
||||
tokio::spawn(runner.run(object_id, clone_notify));
|
||||
COLLAB_RUNTIME.spawn(runner.run(object_id, clone_notify));
|
||||
entry.insert(new_sender.clone());
|
||||
|
||||
// wait for the runner to be ready to handle the message.
|
||||
|
|
@ -287,9 +288,7 @@ fn spawn_period_check_inactive_group<S, AC>(
|
|||
/// multiple threads, we've incorporated a multi-thread feature.
|
||||
///
|
||||
/// When appflowy-collaborate is deployed as a standalone service, we can use tokio multi-thread.
|
||||
#[cfg(feature = "collab-rt-multi-thread")]
|
||||
mod collaboration_runtime {
|
||||
use std::future::Future;
|
||||
use std::io;
|
||||
|
||||
use lazy_static::lazy_static;
|
||||
|
|
@ -307,12 +306,4 @@ mod collaboration_runtime {
|
|||
.enable_time()
|
||||
.build()
|
||||
}
|
||||
|
||||
pub(crate) fn spawn<T>(future: T) -> tokio::task::JoinHandle<T::Output>
|
||||
where
|
||||
T: Future + Send + 'static,
|
||||
T::Output: Send + 'static,
|
||||
{
|
||||
COLLAB_RUNTIME.spawn(future)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ use database_entity::dto::AFAccessLevel;
|
|||
|
||||
#[tokio::test]
|
||||
async fn client_apply_update_find_missing_update_test() {
|
||||
let (mut client_1, mut client_2, object_id, mut expected_json) = make_clients().await;
|
||||
let (_client_1, mut client_2, object_id, mut expected_json) = make_clients().await;
|
||||
// "title" => "hello world" is not delivered to client_2 and is considered a missing update
|
||||
client_2.ws_client.enable_receive_message();
|
||||
{
|
||||
|
|
@ -23,23 +23,8 @@ async fn client_apply_update_find_missing_update_test() {
|
|||
let collab = (*lock).borrow_mut();
|
||||
collab.insert("content", "hello world");
|
||||
}
|
||||
{
|
||||
// in order to detect missing update, we need to make another edit on client_1 - when this
|
||||
// update is received by client_2 it will figure out that it was also missing
|
||||
// "title" => "hello world"
|
||||
let mut lock = client_1
|
||||
.collabs
|
||||
.get_mut(&object_id)
|
||||
.unwrap()
|
||||
.collab
|
||||
.write()
|
||||
.await;
|
||||
let collab = (*lock).borrow_mut();
|
||||
collab.insert("ping", "pong");
|
||||
}
|
||||
|
||||
expected_json["content"] = Value::String("hello world".to_string());
|
||||
expected_json["ping"] = Value::String("pong".to_string());
|
||||
|
||||
// the collab ping will trigger a init sync with reason InitSyncReason::MissUpdates after a period of time
|
||||
assert_client_collab_include_value(&mut client_2, &object_id, expected_json)
|
||||
|
|
@ -49,24 +34,9 @@ async fn client_apply_update_find_missing_update_test() {
|
|||
|
||||
#[tokio::test]
|
||||
async fn client_ping_find_missing_update_test() {
|
||||
let (mut client_1, mut client_2, object_id, mut expected_json) = make_clients().await;
|
||||
let (_client_1, mut client_2, object_id, expected_json) = make_clients().await;
|
||||
// "title" => "hello world" is not delivered to client_2 and is considered a missing update
|
||||
client_2.ws_client.enable_receive_message();
|
||||
{
|
||||
// in order to detect missing update, we need to make another edit on client_1 - when this
|
||||
// update is received by client_2 it will figure out that it was also missing
|
||||
// "title" => "hello world"
|
||||
let mut lock = client_1
|
||||
.collabs
|
||||
.get_mut(&object_id)
|
||||
.unwrap()
|
||||
.collab
|
||||
.write()
|
||||
.await;
|
||||
let collab = (*lock).borrow_mut();
|
||||
collab.insert("ping", "pong");
|
||||
}
|
||||
expected_json["ping"] = Value::String("pong".to_string());
|
||||
|
||||
// the collab ping will trigger a init sync with reason InitSyncReason::MissUpdates after a period of time
|
||||
assert_client_collab_include_value(&mut client_2, &object_id, expected_json)
|
||||
|
|
|
|||
Loading…
Reference in New Issue