chore: attach indexer to collab persister
This commit is contained in:
parent
97335a94ec
commit
bee7caedac
|
|
@ -1,8 +1,9 @@
|
|||
use crate::error::RealtimeError;
|
||||
use crate::indexer::Indexer;
|
||||
use crate::indexer::{Indexer, IndexerProvider};
|
||||
use crate::metrics::CollabRealtimeMetrics;
|
||||
use crate::state::RedisConnectionManager;
|
||||
use anyhow::anyhow;
|
||||
use app_error::AppError;
|
||||
use arc_swap::{ArcSwap, ArcSwapAny, ArcSwapOption};
|
||||
use bytes::Bytes;
|
||||
use collab::core::collab::DataSource;
|
||||
|
|
@ -27,7 +28,9 @@ use collab_stream::model::{
|
|||
use collab_stream::stream_group::StreamGroup;
|
||||
use dashmap::DashMap;
|
||||
use database::collab::{CollabStorage, GetCollabOrigin};
|
||||
use database_entity::dto::{CollabParams, InsertSnapshotParams, QueryCollabParams};
|
||||
use database_entity::dto::{
|
||||
AFCollabEmbeddings, CollabParams, InsertSnapshotParams, QueryCollabParams,
|
||||
};
|
||||
use futures::{pin_mut, Sink, Stream};
|
||||
use futures_util::{SinkExt, StreamExt};
|
||||
use std::collections::VecDeque;
|
||||
|
|
@ -1028,7 +1031,13 @@ impl CollabPersister {
|
|||
let encoded_collab = EncodedCollab::new_v1(sv, doc_state_light)
|
||||
.encode_to_bytes()
|
||||
.map_err(|err| RealtimeError::Internal(err.into()))?;
|
||||
let params = CollabParams::new(&self.object_id, self.collab_type.clone(), encoded_collab);
|
||||
let mut params = CollabParams::new(&self.object_id, self.collab_type.clone(), encoded_collab);
|
||||
|
||||
match self.embeddings(collab).await {
|
||||
Ok(embeddings) => params.embeddings = embeddings,
|
||||
Err(err) => tracing::warn!("failed to fetch embeddings `{}`: {}", self.object_id, err),
|
||||
}
|
||||
|
||||
let uid = 0; //FIXME: what UID should go there?
|
||||
self
|
||||
.storage
|
||||
|
|
@ -1046,12 +1055,23 @@ impl CollabPersister {
|
|||
.collab_redis_stream
|
||||
.prune_stream(&stream_key, msg_id)
|
||||
.await?;
|
||||
|
||||
let _ = lease.release().await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn embeddings(&self, collab: &Collab) -> Result<Option<AFCollabEmbeddings>, AppError> {
|
||||
if let Some(indexer) = self.indexer.clone() {
|
||||
let params = indexer.embedding_params(&collab)?;
|
||||
let embeddings = indexer.embeddings(params).await?;
|
||||
Ok(embeddings)
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
async fn load_collab_full(&self, keep_history: bool) -> Result<Collab, RealtimeError> {
|
||||
let params = QueryCollabParams::new(
|
||||
self.object_id.clone(),
|
||||
|
|
|
|||
Loading…
Reference in New Issue