chore: client api init sync (#438)

* chore: implement init sync for sync plugin

* chore: bump collab

* chore: bump collab

* chore: fix test
This commit is contained in:
Nathan.fooo 2024-04-02 23:08:15 +08:00 committed by GitHub
parent 1101c9d72a
commit 12d72fa233
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 42 additions and 24 deletions

8
Cargo.lock generated
View File

@ -1423,7 +1423,7 @@ dependencies = [
[[package]]
name = "collab"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=6d42cd92bc5b8fe489230ea9d6e4ccc87892a8e0#6d42cd92bc5b8fe489230ea9d6e4ccc87892a8e0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=9e519d46bb8c4c5097d8c9dbc8f77707f8041ee2#9e519d46bb8c4c5097d8c9dbc8f77707f8041ee2"
dependencies = [
"anyhow",
"async-trait",
@ -1447,7 +1447,7 @@ dependencies = [
[[package]]
name = "collab-document"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=6d42cd92bc5b8fe489230ea9d6e4ccc87892a8e0#6d42cd92bc5b8fe489230ea9d6e4ccc87892a8e0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=9e519d46bb8c4c5097d8c9dbc8f77707f8041ee2#9e519d46bb8c4c5097d8c9dbc8f77707f8041ee2"
dependencies = [
"anyhow",
"collab",
@ -1466,7 +1466,7 @@ dependencies = [
[[package]]
name = "collab-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=6d42cd92bc5b8fe489230ea9d6e4ccc87892a8e0#6d42cd92bc5b8fe489230ea9d6e4ccc87892a8e0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=9e519d46bb8c4c5097d8c9dbc8f77707f8041ee2#9e519d46bb8c4c5097d8c9dbc8f77707f8041ee2"
dependencies = [
"anyhow",
"bytes",
@ -1481,7 +1481,7 @@ dependencies = [
[[package]]
name = "collab-folder"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=6d42cd92bc5b8fe489230ea9d6e4ccc87892a8e0#6d42cd92bc5b8fe489230ea9d6e4ccc87892a8e0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=9e519d46bb8c4c5097d8c9dbc8f77707f8041ee2#9e519d46bb8c4c5097d8c9dbc8f77707f8041ee2"
dependencies = [
"anyhow",
"chrono",

View File

@ -199,10 +199,10 @@ debug = true
# will be removed when using yrs 0.18.2 that expose pendings
yrs = { git = "https://github.com/appflowy/y-crdt", rev = "3f25bb510ca5274e7657d3713fbed41fb46b4487" }
collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "6d42cd92bc5b8fe489230ea9d6e4ccc87892a8e0" }
collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "6d42cd92bc5b8fe489230ea9d6e4ccc87892a8e0" }
collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "6d42cd92bc5b8fe489230ea9d6e4ccc87892a8e0" }
collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "6d42cd92bc5b8fe489230ea9d6e4ccc87892a8e0" }
collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "9e519d46bb8c4c5097d8c9dbc8f77707f8041ee2" }
collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "9e519d46bb8c4c5097d8c9dbc8f77707f8041ee2" }
collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "9e519d46bb8c4c5097d8c9dbc8f77707f8041ee2" }
collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "9e519d46bb8c4c5097d8c9dbc8f77707f8041ee2" }
[features]
ai_enable = []

View File

@ -14,7 +14,7 @@ use collab_rt_protocol::{Message, SyncMessage};
use futures_util::SinkExt;
use tokio::time::sleep;
use tokio_stream::StreamExt;
use tracing::trace;
use tracing::{error, trace};
use crate::af_spawn;
use crate::ws::{ConnectState, WSConnectStateReceiver};
@ -125,7 +125,7 @@ where
}
}
fn start_init_sync(&self) {
fn try_init_sync(&self) {
if self.did_init_sync.load(Ordering::SeqCst) {
return;
}
@ -159,7 +159,7 @@ where
C: Send + Sync + 'static,
{
fn did_init(&self, _collab: &Collab, _object_id: &str, _last_sync_at: i64) {
self.start_init_sync();
self.try_init_sync();
}
fn receive_local_update(&self, origin: &CollabOrigin, _object_id: &str, update: &[u8]) {
@ -176,7 +176,7 @@ where
ClientCollabMessage::new_update_sync(update_sync)
});
} else {
self.start_init_sync();
self.try_init_sync();
}
}
@ -196,8 +196,20 @@ where
});
}
fn reset(&self, _object_id: &str) {
self.sync_queue.clear();
fn start_init_sync(&self) {
if let Some(collab) = self.collab.upgrade() {
if let Some(collab) = collab.try_lock() {
if !self.sync_queue.can_queue_init_sync() {
return;
}
if let Err(err) = self
.sync_queue
.init_sync(&collab, InitSyncReason::CollabDidInit)
{
error!("Failed to start init sync: {}", err);
}
}
}
}
}

View File

@ -110,21 +110,27 @@ async fn same_client_with_diff_devices_edit_same_collab_test() {
.collab
.lock()
.insert("name", "workspace2");
client_2.reconnect().await;
client_2
.wait_object_sync_complete(&object_id)
.await
.unwrap();
let expected_json = json!({
"name": "workspace2"
});
assert_client_collab_within_secs(&mut client_1, &object_id, "name", expected_json.clone(), 60)
.await;
assert_client_collab_within_secs(&mut client_2, &object_id, "name", expected_json.clone(), 60)
.await;
assert_client_collab_within_secs(
&mut client_1,
&object_id,
"name",
expected_json.clone(),
120,
)
.await;
assert_client_collab_within_secs(
&mut client_2,
&object_id,
"name",
expected_json.clone(),
120,
)
.await;
}
#[tokio::test]