chore: update logs (#147)

* chore: update logs

* chore: update logs

* chore: update logs

* chore: update logs
This commit is contained in:
Nathan.fooo 2023-11-05 13:25:47 +08:00 committed by GitHub
parent d088fd2ec8
commit 5559e8cbd7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 52 additions and 37 deletions

4
Cargo.lock generated
View File

@ -1169,7 +1169,7 @@ dependencies = [
[[package]]
name = "collab"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=4d348f11#4d348f11ab6c9922017b45376cd58a217a671cd7"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=bab20052#bab200529ef0306194fa8618cc8708878b01ce04"
dependencies = [
"anyhow",
"async-trait",
@ -1188,7 +1188,7 @@ dependencies = [
[[package]]
name = "collab-entity"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=4d348f11#4d348f11ab6c9922017b45376cd58a217a671cd7"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=bab20052#bab200529ef0306194fa8618cc8708878b01ce04"
dependencies = [
"anyhow",
"bytes",

View File

@ -138,8 +138,8 @@ lto = false
opt-level = 3
[patch.crates-io]
collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "4d348f11" }
collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "4d348f11" }
collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "bab20052" }
collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "bab20052" }
# Comment the above and uncomment the below to use local version of collab by cloning the repo and placing it in libs folder
#collab = { path = "libs/AppFlowy-Collab/collab" }

View File

@ -28,7 +28,7 @@ pub enum AppError {
#[error("Invalid password:{0}")]
InvalidPassword(String),
#[error("OAuth error:{0}")]
#[error("{0}")]
OAuthError(String),
#[error("Missing Payload:{0}")]

View File

@ -1,7 +1,6 @@
use anyhow::Error;
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::fmt::Display;
use std::ops::{Deref, DerefMut};
use realtime_entity::collab_msg::{CollabSinkMessage, MsgId};
@ -15,7 +14,7 @@ pub(crate) struct PendingMsgQueue<Msg> {
impl<Msg> PendingMsgQueue<Msg>
where
Msg: Ord + Clone + Display,
Msg: CollabSinkMessage,
{
pub(crate) fn new(uid: i64) -> Self {
Self {
@ -32,7 +31,7 @@ where
impl<Msg> Deref for PendingMsgQueue<Msg>
where
Msg: Ord,
Msg: CollabSinkMessage,
{
type Target = BinaryHeap<PendingMessage<Msg>>;
@ -43,7 +42,7 @@ where
impl<Msg> DerefMut for PendingMsgQueue<Msg>
where
Msg: Ord,
Msg: CollabSinkMessage,
{
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.queue
@ -60,7 +59,7 @@ pub(crate) struct PendingMessage<Msg> {
impl<Msg> PendingMessage<Msg>
where
Msg: Clone + Display,
Msg: CollabSinkMessage,
{
pub fn new(msg: Msg, msg_id: MsgId) -> Self {
Self {
@ -71,6 +70,10 @@ where
}
}
pub fn object_id(&self) -> &str {
self.msg.collab_object_id()
}
pub fn get_msg(&self) -> &Msg {
&self.msg
}
@ -82,27 +85,29 @@ where
pub fn set_state(&mut self, uid: i64, new_state: MessageState) -> bool {
self.state = new_state;
trace!(
"[Client {}] : msg_id: {}, state: {:?}",
"[Client {}] : oid:{}|msg_id:{},state:{:?}",
uid,
self.msg.collab_object_id(),
self.msg_id,
self.state
);
if !self.state.is_done() {
return false;
}
match self.tx.take() {
None => false,
Some(tx) => {
// Notify that the message with given id was received
match tx.send(self.msg_id) {
Ok(_) => true,
Err(err) => {
warn!("Failed to send msg_id: {}, err: {}", self.msg_id, err);
false
},
}
},
if self.state.is_done() {
match self.tx.take() {
None => false,
Some(tx) => {
// Notify that the message with given id was received
match tx.send(self.msg_id) {
Ok(_) => true,
Err(err) => {
warn!("Failed to send msg_id: {}, err: {}", self.msg_id, err);
false
},
}
},
}
} else {
false
}
}

View File

@ -324,8 +324,12 @@ where
Ok(_) => match self.pending_msg_queue.try_lock() {
None => warn!("Failed to acquire the lock of the pending_msg_queue"),
Some(mut pending_msg_queue) => {
let _ = pending_msg_queue.pop();
trace!("Pending messages: {}", pending_msg_queue.len());
let msg = pending_msg_queue.pop();
trace!(
"{:?}: Pending messages: {}",
msg.map(|msg| msg.object_id().to_owned()),
pending_msg_queue.len()
);
if pending_msg_queue.is_empty() {
if let Err(e) = self.state_notifier.send(SinkState::Finished) {
error!("send sink state failed: {}", e);

View File

@ -266,11 +266,12 @@ where
P: CollabSyncProtocol + Send + Sync + 'static,
{
if match msg.msg_id() {
// The msg_id is None if the message is [ServerBroadcast] or [ServerAwareness]
None => true,
Some(msg_id) => sink.ack_msg(msg.origin(), msg.object_id(), msg_id).await,
} && !msg.payload().is_empty()
{
trace!("💬process messages");
trace!("start process message: {:?}", msg.msg_id());
SyncStream::<Sink, Stream>::process_payload(
origin,
msg.payload(),
@ -280,7 +281,7 @@ where
sink,
)
.await?;
trace!("💬");
trace!("end process message: {:?}", msg.msg_id());
}
Ok(())
}

View File

@ -11,6 +11,7 @@ use collab_entity::CollabType;
use serde::{Deserialize, Serialize};
pub trait CollabSinkMessage: Clone + Send + Sync + 'static + Ord + Display {
fn collab_object_id(&self) -> &str;
/// Returns the length of the message in bytes.
fn length(&self) -> usize;
@ -38,6 +39,10 @@ pub enum CollabMessage {
}
impl CollabSinkMessage for CollabMessage {
fn collab_object_id(&self) -> &str {
self.object_id()
}
fn length(&self) -> usize {
self.payload().len()
}

View File

@ -10,8 +10,8 @@ async fn edit_workspace_without_permission() {
let mut client_2 = TestClient::new_user().await;
let workspace_id = client_1.workspace_id().await;
client_1.edit_workspace_collab(&workspace_id).await;
client_2.edit_workspace_collab(&workspace_id).await;
client_1.open_workspace_collab(&workspace_id).await;
client_2.open_workspace_collab(&workspace_id).await;
client_1
.collab_by_object_id
@ -31,13 +31,13 @@ async fn init_sync_workspace_with_guest_permission() {
let mut client_1 = TestClient::new_user().await;
let mut client_2 = TestClient::new_user().await;
let workspace_id = client_1.workspace_id().await;
client_1.edit_workspace_collab(&workspace_id).await;
client_1.open_workspace_collab(&workspace_id).await;
// add client 2 as the member of the workspace then the client 2 will receive the update.
client_1
.add_workspace_member(&workspace_id, &client_2, AFRole::Guest)
.await;
client_2.edit_workspace_collab(&workspace_id).await;
client_2.open_workspace_collab(&workspace_id).await;
client_1
.collab_by_object_id
@ -57,7 +57,7 @@ async fn edit_workspace_with_guest_permission() {
let mut client_1 = TestClient::new_user().await;
let mut client_2 = TestClient::new_user().await;
let workspace_id = client_1.workspace_id().await;
client_1.edit_workspace_collab(&workspace_id).await;
client_1.open_workspace_collab(&workspace_id).await;
// add client 2 as the member of the workspace then the client 2 will receive the update.
client_1
@ -73,7 +73,7 @@ async fn edit_workspace_with_guest_permission() {
.insert("name", "zack");
client_1.wait_object_sync_complete(&workspace_id).await;
client_2.edit_workspace_collab(&workspace_id).await;
client_2.open_workspace_collab(&workspace_id).await;
// make sure the client 2 has received the remote updates before the client 2 edits the collab
client_2
.wait_object_sync_complete_with_secs(&workspace_id, 10)

View File

@ -325,7 +325,7 @@ impl TestClient {
object_id
}
pub(crate) async fn edit_workspace_collab(&mut self, workspace_id: &str) {
pub(crate) async fn open_workspace_collab(&mut self, workspace_id: &str) {
self
.open_collab(workspace_id, workspace_id, CollabType::Folder)
.await;