chore: add preliminary check for enabled indexing to batch collab insert

This commit is contained in:
Bartosz Sypytkowski 2024-12-19 06:49:32 +01:00
parent d5252f4273
commit c1d94e3d17
5 changed files with 23 additions and 11 deletions

View File

@ -113,7 +113,6 @@ async fn post_realtime_message_stream_handler(
bytes.extend_from_slice(&item?);
}
event!(tracing::Level::INFO, "message len: {}", bytes.len());
let device_id = device_id.to_string();
let message = parser_realtime_msg(bytes.freeze(), req.clone()).await?;

View File

@ -199,6 +199,10 @@ impl IndexerScheduler {
Ok(())
}
pub fn is_indexing_enabled(&self, collab_type: &CollabType) -> bool {
self.indexer_provider.is_indexing_enabled(collab_type)
}
pub fn index_encoded_collabs(
&self,
workspace_id: &str,

View File

@ -52,4 +52,8 @@ impl IndexerProvider {
pub fn indexer_for(&self, collab_type: &CollabType) -> Option<Arc<dyn Indexer>> {
self.indexer_cache.get(collab_type).cloned()
}
pub fn is_indexing_enabled(&self, collab_type: &CollabType) -> bool {
self.indexer_cache.contains_key(collab_type)
}
}

View File

@ -816,8 +816,7 @@ async fn batch_create_collab_handler(
let total_size = collab_params_list
.iter()
.fold(0, |acc, x| acc + x.encoded_collab_v1.len());
event!(
tracing::Level::INFO,
tracing::info!(
"decompressed {} collab objects in {:?}",
collab_params_list.len(),
start.elapsed()
@ -828,10 +827,18 @@ async fn batch_create_collab_handler(
.can_index_workspace(&workspace_id)
.await?
{
state.indexer_scheduler.index_encoded_collabs(
&workspace_id,
collab_params_list.iter().map(IndexedCollab::from).collect(),
)?;
let indexed_collabs: Vec<_> = collab_params_list
.iter()
.filter(|p| state.indexer_scheduler.is_indexing_enabled(&p.collab_type))
.map(IndexedCollab::from)
.collect();
let len = indexed_collabs.len();
state
.indexer_scheduler
.index_encoded_collabs(&workspace_id, indexed_collabs)?;
tracing::info!("scheduled indexing for {} collabs", len);
}
let start = Instant::now();
@ -840,8 +847,7 @@ async fn batch_create_collab_handler(
.batch_insert_new_collab(&workspace_id, &uid, collab_params_list)
.await?;
event!(
tracing::Level::INFO,
tracing::info!(
"inserted collab objects to disk in {:?}, total size:{}",
start.elapsed(),
total_size
@ -1825,7 +1831,6 @@ async fn post_realtime_message_stream_handler(
bytes.extend_from_slice(&item?);
}
event!(tracing::Level::INFO, "message len: {}", bytes.len());
let device_id = device_id.to_string();
let message = parser_realtime_msg(bytes.freeze(), req.clone()).await?;

View File

@ -741,7 +741,7 @@ pub async fn broadcast_update(
oid: &str,
encoded_update: Vec<u8>,
) -> Result<(), AppError> {
tracing::info!("broadcasting update to group: {}", oid);
tracing::trace!("broadcasting update to group: {}", oid);
let payload = Message::Sync(SyncMessage::Update(encoded_update)).encode_v1();
let msg = ClientCollabMessage::ClientUpdateSync {
data: UpdateSync {