chore: Remove last sync at (#735)
* chore: remove last sync at field * chore: post rebase fixes * chore: update test for missing update detection
This commit is contained in:
parent
ba666fac14
commit
ae3833ea91
|
|
@ -2041,7 +2041,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "collab"
|
||||
version = "0.2.0"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=fe0fb7e93033b017f594749aade756be8973157b#fe0fb7e93033b017f594749aade756be8973157b"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c4777db#c4777db5d32ee418ee8c6782bf851fdcf2a2eb60"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"arc-swap",
|
||||
|
|
@ -2066,7 +2066,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "collab-document"
|
||||
version = "0.2.0"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=fe0fb7e93033b017f594749aade756be8973157b#fe0fb7e93033b017f594749aade756be8973157b"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c4777db#c4777db5d32ee418ee8c6782bf851fdcf2a2eb60"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"arc-swap",
|
||||
|
|
@ -2086,7 +2086,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "collab-entity"
|
||||
version = "0.2.0"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=fe0fb7e93033b017f594749aade756be8973157b#fe0fb7e93033b017f594749aade756be8973157b"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c4777db#c4777db5d32ee418ee8c6782bf851fdcf2a2eb60"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bytes",
|
||||
|
|
@ -2105,7 +2105,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "collab-folder"
|
||||
version = "0.2.0"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=fe0fb7e93033b017f594749aade756be8973157b#fe0fb7e93033b017f594749aade756be8973157b"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c4777db#c4777db5d32ee418ee8c6782bf851fdcf2a2eb60"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"arc-swap",
|
||||
|
|
@ -2189,7 +2189,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "collab-user"
|
||||
version = "0.2.0"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=fe0fb7e93033b017f594749aade756be8973157b#fe0fb7e93033b017f594749aade756be8973157b"
|
||||
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c4777db#c4777db5d32ee418ee8c6782bf851fdcf2a2eb60"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"collab",
|
||||
|
|
|
|||
10
Cargo.toml
10
Cargo.toml
|
|
@ -282,11 +282,11 @@ debug = true
|
|||
[patch.crates-io]
|
||||
# It's diffcult to resovle different version with the same crate used in AppFlowy Frontend and the Client-API crate.
|
||||
# So using patch to workaround this issue.
|
||||
collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "fe0fb7e93033b017f594749aade756be8973157b" }
|
||||
collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "fe0fb7e93033b017f594749aade756be8973157b" }
|
||||
collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "fe0fb7e93033b017f594749aade756be8973157b" }
|
||||
collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "fe0fb7e93033b017f594749aade756be8973157b" }
|
||||
collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "fe0fb7e93033b017f594749aade756be8973157b" }
|
||||
collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c4777db" }
|
||||
collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c4777db" }
|
||||
collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c4777db" }
|
||||
collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c4777db" }
|
||||
collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c4777db" }
|
||||
|
||||
[features]
|
||||
history = []
|
||||
|
|
|
|||
|
|
@ -139,7 +139,7 @@ where
|
|||
Stream: StreamExt<Item = Result<ServerCollabMessage, E>> + Send + Sync + Unpin + 'static,
|
||||
Channel: Send + Sync + 'static,
|
||||
{
|
||||
fn did_init(&self, _collab: &Collab, _object_id: &str, _last_sync_at: i64) {
|
||||
fn did_init(&self, _collab: &Collab, _object_id: &str) {
|
||||
// Most of the time, it should be successful to queue init sync by 1st time.
|
||||
let retry_strategy = FixedInterval::new(Duration::from_secs(1)).take(10);
|
||||
let action = InitSyncAction {
|
||||
|
|
|
|||
|
|
@ -155,10 +155,9 @@ impl Display for SyncReason {
|
|||
fn gen_sync_state<P: CollabSyncProtocol>(
|
||||
awareness: &Awareness,
|
||||
protocol: &P,
|
||||
sync_before: bool,
|
||||
) -> Result<Vec<u8>, SyncError> {
|
||||
let mut encoder = EncoderV1::new();
|
||||
protocol.start(awareness, &mut encoder, sync_before)?;
|
||||
protocol.start(awareness, &mut encoder)?;
|
||||
Ok(encoder.to_vec())
|
||||
}
|
||||
|
||||
|
|
@ -195,7 +194,6 @@ where
|
|||
return Err(SyncError::Internal(err));
|
||||
}
|
||||
|
||||
let sync_before = collab.get_last_sync_at() > 0;
|
||||
match reason {
|
||||
SyncReason::MissUpdates {
|
||||
state_vector_v1,
|
||||
|
|
@ -208,7 +206,7 @@ where
|
|||
reason
|
||||
);
|
||||
let awareness = collab.get_awareness();
|
||||
let payload = gen_sync_state(awareness, &ClientSyncProtocol, sync_before)?;
|
||||
let payload = gen_sync_state(awareness, &ClientSyncProtocol)?;
|
||||
sink.queue_init_sync(|msg_id| {
|
||||
let init_sync = InitSync::new(
|
||||
origin,
|
||||
|
|
@ -248,7 +246,7 @@ where
|
|||
reason
|
||||
);
|
||||
let awareness = collab.get_awareness();
|
||||
let payload = gen_sync_state(awareness, &ClientSyncProtocol, sync_before)?;
|
||||
let payload = gen_sync_state(awareness, &ClientSyncProtocol)?;
|
||||
|
||||
sink.queue_init_sync(|msg_id| {
|
||||
let init_sync = InitSync::new(
|
||||
|
|
|
|||
|
|
@ -88,7 +88,6 @@ pub trait CollabSyncProtocol {
|
|||
&self,
|
||||
awareness: &Awareness,
|
||||
encoder: &mut E,
|
||||
_sync_before: bool,
|
||||
) -> Result<(), RTProtocolError> {
|
||||
let (state_vector, awareness_update) = {
|
||||
let state_vector = awareness
|
||||
|
|
|
|||
|
|
@ -365,7 +365,7 @@ async fn handle_one_client_message(
|
|||
message_origin
|
||||
);
|
||||
|
||||
match handle_one_message_payload(
|
||||
handle_one_message_payload(
|
||||
object_id,
|
||||
message_origin.clone(),
|
||||
msg_id,
|
||||
|
|
@ -375,15 +375,6 @@ async fn handle_one_client_message(
|
|||
seq_num,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(ack) => {
|
||||
let mut lock = collab.write().await;
|
||||
let collab: &mut Collab = (*lock).borrow_mut();
|
||||
collab.set_last_sync_at(chrono::Utc::now().timestamp());
|
||||
Ok(ack)
|
||||
},
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle the message sent from the client
|
||||
|
|
|
|||
|
|
@ -9,7 +9,8 @@ use database_entity::dto::AFAccessLevel;
|
|||
|
||||
#[tokio::test]
|
||||
async fn client_apply_update_find_missing_update_test() {
|
||||
let (_client_1, mut client_2, object_id, mut expected_json) = make_clients().await;
|
||||
let (mut client_1, mut client_2, object_id, mut expected_json) = make_clients().await;
|
||||
// "title" => "hello world" is not delivered to client_2 and is considered a missing update
|
||||
client_2.ws_client.enable_receive_message();
|
||||
{
|
||||
let mut lock = client_2
|
||||
|
|
@ -22,8 +23,23 @@ async fn client_apply_update_find_missing_update_test() {
|
|||
let collab = (*lock).borrow_mut();
|
||||
collab.insert("content", "hello world");
|
||||
}
|
||||
{
|
||||
// in order to detect missing update, we need to make another edit on client_1 - when this
|
||||
// update is received by client_2 it will figure out that it was also missing
|
||||
// "title" => "hello world"
|
||||
let mut lock = client_1
|
||||
.collabs
|
||||
.get_mut(&object_id)
|
||||
.unwrap()
|
||||
.collab
|
||||
.write()
|
||||
.await;
|
||||
let collab = (*lock).borrow_mut();
|
||||
collab.insert("ping", "pong");
|
||||
}
|
||||
|
||||
expected_json["content"] = Value::String("hello world".to_string());
|
||||
expected_json["ping"] = Value::String("pong".to_string());
|
||||
|
||||
// the collab ping will trigger a init sync with reason InitSyncReason::MissUpdates after a period of time
|
||||
assert_client_collab_include_value(&mut client_2, &object_id, expected_json)
|
||||
|
|
@ -33,8 +49,24 @@ async fn client_apply_update_find_missing_update_test() {
|
|||
|
||||
#[tokio::test]
|
||||
async fn client_ping_find_missing_update_test() {
|
||||
let (_client_1, mut client_2, object_id, expected_json) = make_clients().await;
|
||||
let (mut client_1, mut client_2, object_id, mut expected_json) = make_clients().await;
|
||||
// "title" => "hello world" is not delivered to client_2 and is considered a missing update
|
||||
client_2.ws_client.enable_receive_message();
|
||||
{
|
||||
// in order to detect missing update, we need to make another edit on client_1 - when this
|
||||
// update is received by client_2 it will figure out that it was also missing
|
||||
// "title" => "hello world"
|
||||
let mut lock = client_1
|
||||
.collabs
|
||||
.get_mut(&object_id)
|
||||
.unwrap()
|
||||
.collab
|
||||
.write()
|
||||
.await;
|
||||
let collab = (*lock).borrow_mut();
|
||||
collab.insert("ping", "pong");
|
||||
}
|
||||
expected_json["ping"] = Value::String("pong".to_string());
|
||||
|
||||
// the collab ping will trigger a init sync with reason InitSyncReason::MissUpdates after a period of time
|
||||
assert_client_collab_include_value(&mut client_2, &object_id, expected_json)
|
||||
|
|
|
|||
Loading…
Reference in New Issue