From dfe1b5ad28a9e4e21c36ac776feec6edbfc888ad Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Thu, 10 Oct 2024 12:22:07 +0200 Subject: [PATCH] chore: attach indexer to collab persister --- .../src/group/group_init.rs | 26 ++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/services/appflowy-collaborate/src/group/group_init.rs b/services/appflowy-collaborate/src/group/group_init.rs index d93e5222..7abf99e6 100644 --- a/services/appflowy-collaborate/src/group/group_init.rs +++ b/services/appflowy-collaborate/src/group/group_init.rs @@ -1,8 +1,9 @@ use crate::error::RealtimeError; -use crate::indexer::Indexer; +use crate::indexer::{Indexer, IndexerProvider}; use crate::metrics::CollabRealtimeMetrics; use crate::state::RedisConnectionManager; use anyhow::anyhow; +use app_error::AppError; use arc_swap::{ArcSwap, ArcSwapAny, ArcSwapOption}; use bytes::Bytes; use collab::core::collab::DataSource; @@ -27,7 +28,9 @@ use collab_stream::model::{ use collab_stream::stream_group::StreamGroup; use dashmap::DashMap; use database::collab::{CollabStorage, GetCollabOrigin}; -use database_entity::dto::{CollabParams, InsertSnapshotParams, QueryCollabParams}; +use database_entity::dto::{ + AFCollabEmbeddings, CollabParams, InsertSnapshotParams, QueryCollabParams, +}; use futures::{pin_mut, Sink, Stream}; use futures_util::{SinkExt, StreamExt}; use std::collections::VecDeque; @@ -1028,7 +1031,13 @@ impl CollabPersister { let encoded_collab = EncodedCollab::new_v1(sv, doc_state_light) .encode_to_bytes() .map_err(|err| RealtimeError::Internal(err.into()))?; - let params = CollabParams::new(&self.object_id, self.collab_type.clone(), encoded_collab); + let mut params = CollabParams::new(&self.object_id, self.collab_type.clone(), encoded_collab); + + match self.embeddings(collab).await { + Ok(embeddings) => params.embeddings = embeddings, + Err(err) => tracing::warn!("failed to fetch embeddings `{}`: {}", self.object_id, err), + } + let uid = 0; //FIXME: what UID should go there? self .storage @@ -1046,12 +1055,23 @@ impl CollabPersister { .collab_redis_stream .prune_stream(&stream_key, msg_id) .await?; + let _ = lease.release().await; } Ok(()) } + async fn embeddings(&self, collab: &Collab) -> Result, AppError> { + if let Some(indexer) = self.indexer.clone() { + let params = indexer.embedding_params(&collab)?; + let embeddings = indexer.embeddings(params).await?; + Ok(embeddings) + } else { + Ok(None) + } + } + async fn load_collab_full(&self, keep_history: bool) -> Result { let params = QueryCollabParams::new( self.object_id.clone(),