feat: support fetching encoded collab in batch from memory (#837)
This commit is contained in:
parent
03fdcf4621
commit
11732324b5
|
|
@ -723,6 +723,7 @@ dependencies = [
|
|||
"prometheus-client",
|
||||
"prost",
|
||||
"rand 0.8.5",
|
||||
"rayon",
|
||||
"redis 0.25.4",
|
||||
"secrecy",
|
||||
"semver",
|
||||
|
|
|
|||
|
|
@ -147,7 +147,7 @@ lettre = { version = "0.11.7", features = ["tokio1", "tokio1-native-tls"] }
|
|||
handlebars = "5.1.2"
|
||||
pin-project = "1.1.5"
|
||||
byteorder = "1.5.0"
|
||||
rayon = "1.10.0"
|
||||
rayon.workspace = true
|
||||
|
||||
|
||||
[dev-dependencies]
|
||||
|
|
@ -247,6 +247,7 @@ actix-web = { version = "4.5.1", default-features = false, features = [
|
|||
actix-http = { version = "3.6.0", default-features = false }
|
||||
tokio = { version = "1.36.0", features = ["sync"] }
|
||||
tokio-stream = "0.1.14"
|
||||
rayon = "1.10.0"
|
||||
futures-util = "0.3.30"
|
||||
bincode = "1.3.3"
|
||||
client-websocket = { path = "libs/client-websocket" }
|
||||
|
|
|
|||
|
|
@ -138,6 +138,7 @@ pub trait CollabStorage: Send + Sync + 'static {
|
|||
&self,
|
||||
uid: &i64,
|
||||
queries: Vec<QueryCollab>,
|
||||
from_editing_collab: bool,
|
||||
) -> HashMap<String, QueryCollabResult>;
|
||||
|
||||
/// Deletes a collaboration from the storage.
|
||||
|
|
@ -249,8 +250,12 @@ where
|
|||
&self,
|
||||
uid: &i64,
|
||||
queries: Vec<QueryCollab>,
|
||||
from_editing_collab: bool,
|
||||
) -> HashMap<String, QueryCollabResult> {
|
||||
self.as_ref().batch_get_collab(uid, queries).await
|
||||
self
|
||||
.as_ref()
|
||||
.batch_get_collab(uid, queries, from_editing_collab)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn delete_collab(&self, workspace_id: &str, uid: &i64, object_id: &str) -> AppResult<()> {
|
||||
|
|
|
|||
|
|
@ -15,9 +15,19 @@ path = "src/lib.rs"
|
|||
access-control.workspace = true
|
||||
actix.workspace = true
|
||||
actix-web.workspace = true
|
||||
actix-http = { workspace = true, default-features = false, features = ["openssl", "compress-brotli", "compress-gzip"] }
|
||||
actix-http = { workspace = true, default-features = false, features = [
|
||||
"openssl",
|
||||
"compress-brotli",
|
||||
"compress-gzip",
|
||||
] }
|
||||
actix-web-actors = { version = "4.3" }
|
||||
app-error = { workspace = true, features = ["sqlx_error", "actix_web_error", "tokio_error", "bincode_error", "appflowy_ai_error"] }
|
||||
app-error = { workspace = true, features = [
|
||||
"sqlx_error",
|
||||
"actix_web_error",
|
||||
"tokio_error",
|
||||
"bincode_error",
|
||||
"appflowy_ai_error",
|
||||
] }
|
||||
authentication.workspace = true
|
||||
brotli.workspace = true
|
||||
dashmap.workspace = true
|
||||
|
|
@ -29,13 +39,24 @@ tracing = "0.1.40"
|
|||
futures-util = "0.3.30"
|
||||
tokio-util = { version = "0.7", features = ["codec"] }
|
||||
tokio-stream = { version = "0.1.14", features = ["sync"] }
|
||||
tokio = { workspace = true, features = ["net", "sync", "macros", "rt-multi-thread"] }
|
||||
tokio = { workspace = true, features = [
|
||||
"net",
|
||||
"sync",
|
||||
"macros",
|
||||
"rt-multi-thread",
|
||||
] }
|
||||
async-trait = "0.1.77"
|
||||
prost.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
serde_repr.workspace = true
|
||||
sqlx = { workspace = true, default-features = false, features = ["runtime-tokio-rustls", "macros", "postgres", "uuid", "chrono"] }
|
||||
sqlx = { workspace = true, default-features = false, features = [
|
||||
"runtime-tokio-rustls",
|
||||
"macros",
|
||||
"postgres",
|
||||
"uuid",
|
||||
"chrono",
|
||||
] }
|
||||
thiserror = "1.0.56"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
|
||||
anyhow = "1"
|
||||
|
|
@ -66,6 +87,7 @@ lazy_static = "1.4.0"
|
|||
itertools = "0.12.0"
|
||||
validator = "0.16.1"
|
||||
workspace-access.workspace = true
|
||||
rayon.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
rand = "0.8.5"
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ use collab::entity::EncodedCollab;
|
|||
use collab_entity::CollabType;
|
||||
use collab_rt_entity::ClientCollabMessage;
|
||||
use itertools::{Either, Itertools};
|
||||
use rayon::iter::{IntoParallelIterator, ParallelIterator};
|
||||
use sqlx::Transaction;
|
||||
use tokio::time::timeout;
|
||||
use tracing::warn;
|
||||
|
|
@ -158,6 +159,40 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
async fn batch_get_encode_collab_from_editing(
|
||||
&self,
|
||||
object_ids: Vec<String>,
|
||||
) -> HashMap<String, EncodedCollab> {
|
||||
let (ret, rx) = tokio::sync::oneshot::channel();
|
||||
let timeout_duration = Duration::from_secs(10);
|
||||
|
||||
// Attempt to send the command to the realtime server
|
||||
if let Err(err) = self
|
||||
.rt_cmd_sender
|
||||
.send(CollaborationCommand::BatchGetEncodeCollab { object_ids, ret })
|
||||
.await
|
||||
{
|
||||
error!(
|
||||
"Failed to send get encode collab command to realtime server: {}",
|
||||
err
|
||||
);
|
||||
return HashMap::new();
|
||||
}
|
||||
|
||||
// Await the response from the realtime server with a timeout
|
||||
match timeout(timeout_duration, rx).await {
|
||||
Ok(Ok(batch_encoded_collab)) => batch_encoded_collab,
|
||||
Ok(Err(err)) => {
|
||||
error!("Failed to get encode collab from realtime server: {}", err);
|
||||
HashMap::new()
|
||||
},
|
||||
Err(_) => {
|
||||
error!("Timeout waiting for encode collab from realtime server");
|
||||
HashMap::new()
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
async fn queue_insert_collab(
|
||||
&self,
|
||||
workspace_id: &str,
|
||||
|
|
@ -326,6 +361,7 @@ where
|
|||
&self,
|
||||
_uid: &i64,
|
||||
queries: Vec<QueryCollab>,
|
||||
from_editing_collab: bool,
|
||||
) -> HashMap<String, QueryCollabResult> {
|
||||
// Partition queries based on validation into valid queries and errors (with associated error messages).
|
||||
let (valid_queries, mut results): (Vec<_>, HashMap<_, _>) =
|
||||
|
|
@ -340,8 +376,48 @@ where
|
|||
},
|
||||
)),
|
||||
});
|
||||
let cache_queries = if from_editing_collab {
|
||||
let editing_queries = valid_queries.clone();
|
||||
let editing_results = self
|
||||
.batch_get_encode_collab_from_editing(
|
||||
editing_queries
|
||||
.iter()
|
||||
.map(|q| q.object_id.clone())
|
||||
.collect(),
|
||||
)
|
||||
.await;
|
||||
let editing_query_collab_results: HashMap<String, QueryCollabResult> =
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let par_iter = editing_results.into_par_iter();
|
||||
par_iter
|
||||
.map(|(object_id, encoded_collab)| {
|
||||
let encoding_result = encoded_collab.encode_to_bytes();
|
||||
let query_collab_result = match encoding_result {
|
||||
Ok(encoded_collab_bytes) => QueryCollabResult::Success {
|
||||
encode_collab_v1: encoded_collab_bytes,
|
||||
},
|
||||
Err(err) => QueryCollabResult::Failed {
|
||||
error: err.to_string(),
|
||||
},
|
||||
};
|
||||
|
||||
results.extend(self.cache.batch_get_encode_collab(valid_queries).await);
|
||||
(object_id.clone(), query_collab_result)
|
||||
})
|
||||
.collect()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
let editing_object_ids: Vec<String> = editing_query_collab_results.keys().cloned().collect();
|
||||
results.extend(editing_query_collab_results);
|
||||
valid_queries
|
||||
.into_iter()
|
||||
.filter(|q| !editing_object_ids.contains(&q.object_id))
|
||||
.collect()
|
||||
} else {
|
||||
valid_queries
|
||||
};
|
||||
|
||||
results.extend(self.cache.batch_get_encode_collab(cache_queries).await);
|
||||
results
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,22 +1,36 @@
|
|||
use crate::{
|
||||
error::RealtimeError,
|
||||
group::cmd::{GroupCommand, GroupCommandSender},
|
||||
group::{
|
||||
cmd::{GroupCommand, GroupCommandSender},
|
||||
manager::GroupManager,
|
||||
},
|
||||
};
|
||||
use access_control::collab::RealtimeAccessControl;
|
||||
use collab::entity::EncodedCollab;
|
||||
use collab_rt_entity::ClientCollabMessage;
|
||||
use dashmap::DashMap;
|
||||
use std::sync::Arc;
|
||||
use database::collab::CollabStorage;
|
||||
use futures::StreamExt;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{Arc, Weak},
|
||||
};
|
||||
use tracing::error;
|
||||
|
||||
pub type CLCommandSender = tokio::sync::mpsc::Sender<CollaborationCommand>;
|
||||
pub type CLCommandReceiver = tokio::sync::mpsc::Receiver<CollaborationCommand>;
|
||||
|
||||
pub type EncodeCollabSender = tokio::sync::oneshot::Sender<Option<EncodedCollab>>;
|
||||
pub type BatchEncodeCollabSender = tokio::sync::oneshot::Sender<HashMap<String, EncodedCollab>>;
|
||||
pub enum CollaborationCommand {
|
||||
GetEncodeCollab {
|
||||
object_id: String,
|
||||
ret: EncodeCollabSender,
|
||||
},
|
||||
BatchGetEncodeCollab {
|
||||
object_ids: Vec<String>,
|
||||
ret: BatchEncodeCollabSender,
|
||||
},
|
||||
ServerSendCollabMessage {
|
||||
object_id: String,
|
||||
collab_messages: Vec<ClientCollabMessage>,
|
||||
|
|
@ -24,10 +38,14 @@ pub enum CollaborationCommand {
|
|||
},
|
||||
}
|
||||
|
||||
pub(crate) fn spawn_collaboration_command(
|
||||
pub(crate) fn spawn_collaboration_command<S, AC>(
|
||||
mut command_recv: CLCommandReceiver,
|
||||
group_sender_by_object_id: &Arc<DashMap<String, GroupCommandSender>>,
|
||||
) {
|
||||
weak_groups: Weak<GroupManager<S, AC>>,
|
||||
) where
|
||||
S: CollabStorage,
|
||||
AC: RealtimeAccessControl,
|
||||
{
|
||||
let group_sender_by_object_id = group_sender_by_object_id.clone();
|
||||
tokio::spawn(async move {
|
||||
while let Some(cmd) = command_recv.recv().await {
|
||||
|
|
@ -50,6 +68,35 @@ pub(crate) fn spawn_collaboration_command(
|
|||
},
|
||||
}
|
||||
},
|
||||
CollaborationCommand::BatchGetEncodeCollab { object_ids, ret } => {
|
||||
if let Some(group_manager) = weak_groups.upgrade() {
|
||||
let tasks = futures::stream::iter(object_ids)
|
||||
.map(|object_id| {
|
||||
let cloned_group_manager = group_manager.clone();
|
||||
tokio::task::spawn(async move {
|
||||
let group = cloned_group_manager.get_group(&object_id).await;
|
||||
if let Some(group) = group {
|
||||
(object_id, group.encode_collab().await.ok())
|
||||
} else {
|
||||
(object_id, None)
|
||||
}
|
||||
})
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
|
||||
let mut outputs: HashMap<String, EncodedCollab> = HashMap::new();
|
||||
for task in tasks {
|
||||
let result = task.await;
|
||||
if let Ok((object_id, Some(encoded_collab))) = result {
|
||||
outputs.insert(object_id, encoded_collab);
|
||||
}
|
||||
}
|
||||
let _ = ret.send(outputs);
|
||||
} else {
|
||||
let _ = ret.send(HashMap::new());
|
||||
}
|
||||
},
|
||||
CollaborationCommand::ServerSendCollabMessage {
|
||||
object_id,
|
||||
collab_messages,
|
||||
|
|
|
|||
|
|
@ -89,7 +89,11 @@ where
|
|||
|
||||
spawn_period_check_inactive_group(Arc::downgrade(&group_manager), &group_sender_by_object_id);
|
||||
|
||||
spawn_collaboration_command(command_recv, &group_sender_by_object_id);
|
||||
spawn_collaboration_command(
|
||||
command_recv,
|
||||
&group_sender_by_object_id,
|
||||
Arc::downgrade(&group_manager),
|
||||
);
|
||||
|
||||
spawn_metrics(metrics.clone(), storage.clone());
|
||||
|
||||
|
|
|
|||
|
|
@ -885,7 +885,7 @@ async fn batch_get_collab_handler(
|
|||
let result = BatchQueryCollabResult(
|
||||
state
|
||||
.collab_access_control_storage
|
||||
.batch_get_collab(&uid, payload.into_inner().0)
|
||||
.batch_get_collab(&uid, payload.into_inner().0, false)
|
||||
.await,
|
||||
);
|
||||
Ok(Json(AppResponse::Ok().with_data(result)))
|
||||
|
|
|
|||
|
|
@ -189,7 +189,7 @@ async fn get_page_collab_data_for_database(
|
|||
})
|
||||
.collect();
|
||||
let row_query_collab_results = collab_access_control_storage
|
||||
.batch_get_collab(&uid, queries)
|
||||
.batch_get_collab(&uid, queries, true)
|
||||
.await;
|
||||
let row_data = tokio::task::spawn_blocking(move || {
|
||||
let row_collabs: HashMap<String, Vec<u8>> = row_query_collab_results
|
||||
|
|
|
|||
Loading…
Reference in New Issue