chore: post merge fixes

This commit is contained in:
Bartosz Sypytkowski 2024-12-16 08:48:09 +01:00
parent 1b35326f0a
commit 73872a2b1d
5 changed files with 56 additions and 74 deletions

View File

@ -24,7 +24,6 @@ use crate::actix_ws::server::RealtimeServerActor;
use crate::api::{collab_scope, ws_scope};
use crate::collab::access_control::CollabStorageAccessControlImpl;
use access_control::casbin::access::AccessControl;
use appflowy_ai_client::client::AppFlowyAIClient;
use collab_stream::stream_router::{StreamRouter, StreamRouterOptions};
use database::file::s3_client_impl::AwsS3BucketClientImpl;

View File

@ -149,7 +149,10 @@ where
},
GroupCommand::GenerateCollabEmbedding { object_id } => {
if let Some(group) = self.group_manager.get_group(&object_id).await {
group.generate_embeddings().await;
match group.generate_embeddings().await {
Ok(_) => trace!("successfully created embeddings for {}", object_id),
Err(err) => trace!("failed to create embeddings for {}: {}", object_id, err),
}
}
},
GroupCommand::CalculateMissingUpdate {

View File

@ -1,6 +1,5 @@
use crate::error::RealtimeError;
use crate::indexer::Indexer;
use crate::metrics::CollabRealtimeMetrics;
use crate::indexer::IndexedCollab;
use anyhow::anyhow;
use app_error::AppError;
use arc_swap::ArcSwap;
@ -19,13 +18,13 @@ use collab_rt_protocol::{Message, MessageReader, RTProtocolError, SyncMessage};
use collab_stream::client::CollabRedisStream;
use collab_stream::collab_update_sink::{AwarenessUpdateSink, CollabUpdateSink};
use crate::indexer::IndexerScheduler;
use crate::metrics::CollabRealtimeMetrics;
use collab_stream::error::StreamError;
use collab_stream::model::{AwarenessStreamUpdate, CollabStreamUpdate, MessageId, UpdateFlags};
use dashmap::DashMap;
use database::collab::{CollabStorage, GetCollabOrigin};
use database_entity::dto::{
AFCollabEmbeddings, CollabParams, InsertSnapshotParams, QueryCollabParams,
};
use database_entity::dto::{CollabParams, InsertSnapshotParams, QueryCollabParams};
use futures::{pin_mut, Sink, Stream};
use futures_util::{SinkExt, StreamExt};
use std::sync::atomic::{AtomicU32, Ordering};
@ -38,16 +37,6 @@ use yrs::updates::decoder::{Decode, DecoderV1};
use yrs::updates::encoder::{Encode, Encoder, EncoderV1};
use yrs::{ReadTxn, StateVector, Update};
use collab_stream::error::StreamError;
use database::collab::CollabStorage;
use crate::error::RealtimeError;
use crate::group::broadcast::{CollabBroadcast, Subscription};
use crate::group::persistence::GroupPersistence;
use crate::indexer::IndexerScheduler;
use crate::metrics::CollabRealtimeMetrics;
/// A group used to manage a single [Collab] object
pub struct CollabGroup {
state: Arc<CollabGroupState>,
@ -66,7 +55,6 @@ struct CollabGroupState {
metrics: Arc<CollabRealtimeMetrics>,
/// Cancellation token triggered when current collab group is about to be stopped.
/// This will also shut down all subsequent [Subscription]s.
indexer_scheduler: Arc<IndexerScheduler>,
shutdown: CancellationToken,
last_activity: ArcSwap<Instant>,
seq_no: AtomicU32,
@ -107,10 +95,9 @@ impl CollabGroup {
collab_type.clone(),
storage,
collab_redis_stream,
indexer,
indexer_scheduler,
metrics.clone(),
prune_grace_period,
indexer_scheduler
);
let state = Arc::new(CollabGroupState {
@ -346,35 +333,28 @@ impl CollabGroup {
/// Generate embedding for the current Collab immediately
///
pub async fn generate_embeddings(&self) {
let result = self
pub async fn generate_embeddings(&self) -> Result<(), AppError> {
let collab = self
.encode_collab()
.await
.map_err(|e| AppError::Internal(e.into()))?;
let collab = Collab::new_with_source(
CollabOrigin::Server,
self.object_id(),
DataSource::DocStateV1(collab.doc_state.into()),
vec![],
false,
)
.map_err(|e| AppError::Internal(e.into()))?;
let workspace_id = &self.state.workspace_id;
let object_id = &self.state.object_id;
let collab_type = &self.state.collab_type;
self
.state
.persister
.indexer_scheduler
.index_collab(
&self.workspace_id,
&self.object_id,
&self.collab,
&self.collab_type,
)
.await;
match result {
Ok(_) => {
trace!(
"successfully indexed embeddings for {} {}/{}",
self.collab_type,
self.workspace_id,
self.object_id
);
},
Err(err) => {
trace!(
"failed to index embeddings for document {} {}/{}: {}",
self.collab_type,
self.workspace_id,
self.object_id,
err
);
},
}
.index_collab(workspace_id, object_id, &collab, collab_type)
.await
}
pub async fn calculate_missing_update(
@ -917,7 +897,7 @@ struct CollabPersister {
collab_type: CollabType,
storage: Arc<dyn CollabStorage>,
collab_redis_stream: Arc<CollabRedisStream>,
indexer: Option<Arc<dyn Indexer>>,
indexer_scheduler: Arc<IndexerScheduler>,
metrics: Arc<CollabRealtimeMetrics>,
update_sink: CollabUpdateSink,
awareness_sink: AwarenessUpdateSink,
@ -935,7 +915,7 @@ impl CollabPersister {
collab_type: CollabType,
storage: Arc<dyn CollabStorage>,
collab_redis_stream: Arc<CollabRedisStream>,
indexer: Option<Arc<dyn Indexer>>,
indexer_scheduler: Arc<IndexerScheduler>,
metrics: Arc<CollabRealtimeMetrics>,
prune_grace_period: Duration,
) -> Self {
@ -948,7 +928,7 @@ impl CollabPersister {
collab_type,
storage,
collab_redis_stream,
indexer,
indexer_scheduler,
metrics,
update_sink,
awareness_sink,
@ -1188,12 +1168,8 @@ impl CollabPersister {
.metrics
.collab_size
.observe(encoded_collab.len() as f64);
let mut params = CollabParams::new(&self.object_id, self.collab_type.clone(), encoded_collab);
match self.embeddings(collab).await {
Ok(embeddings) => params.embeddings = embeddings,
Err(err) => tracing::warn!("failed to fetch embeddings `{}`: {}", self.object_id, err),
}
let params = CollabParams::new(&self.object_id, self.collab_type.clone(), encoded_collab);
let encoded_collab = params.encoded_collab_v1.clone();
self
.storage
@ -1209,6 +1185,23 @@ impl CollabPersister {
light_len
);
let indexed_collab = IndexedCollab {
object_id: self.object_id.clone(),
collab_type: self.collab_type.clone(),
encoded_collab,
};
if let Err(err) = self
.indexer_scheduler
.index_encoded_collab_one(&self.workspace_id, indexed_collab)
{
tracing::warn!(
"failed to index collab `{}/{}`: {}",
self.workspace_id,
self.object_id,
err
);
}
// 3. finally we can drop Redis messages
let now = SystemTime::UNIX_EPOCH.elapsed().unwrap().as_millis();
let msg_id = MessageId {
@ -1227,16 +1220,6 @@ impl CollabPersister {
Ok(())
}
async fn embeddings(&self, collab: &Collab) -> Result<Option<AFCollabEmbeddings>, AppError> {
if let Some(indexer) = self.indexer.clone() {
let params = indexer.embedding_params(collab).await?;
let embeddings = indexer.embeddings(params).await?;
Ok(embeddings)
} else {
Ok(None)
}
}
async fn load_collab_full(&self, keep_history: bool) -> Result<Option<Collab>, RealtimeError> {
let doc_state = if keep_history {
// if we want history-keeping variant, we need to get a snapshot

View File

@ -145,7 +145,7 @@ where
collab_type
);
let group = Arc::new(CollabGroup::new(
let group = CollabGroup::new(
user.uid,
workspace_id.to_string(),
object_id.to_string(),

View File

@ -13,7 +13,6 @@ use bytes::Bytes;
use collab::core::collab::DataSource;
use collab::core::origin::CollabOrigin;
use collab::entity::EncodedCollab;
use collab::lock::RwLock;
use collab::preclude::Collab;
use collab_entity::CollabType;
use database::collab::{CollabStorage, GetCollabOrigin};
@ -216,7 +215,7 @@ impl IndexerScheduler {
&self,
workspace_id: &str,
object_id: &str,
collab: &Arc<RwLock<Collab>>,
collab: &Collab,
collab_type: &CollabType,
) -> Result<(), AppError> {
if !self.index_enabled() {
@ -235,9 +234,7 @@ impl IndexerScheduler {
))
})?;
let lock = collab.read().await;
let chunks = indexer.create_embedded_chunks(&lock, embedder.model())?;
drop(lock); // release the read lock ASAP
let chunks = indexer.create_embedded_chunks(collab, embedder.model())?;
let threads = self.threads.clone();
let tx = self.schedule_tx.clone();
@ -348,7 +345,7 @@ fn get_unindexed_collabs(
match &cid.collab_type {
CollabType::Document => {
let collab = storage
.get_encode_collab(GetCollabOrigin::Server, cid.clone().into(), false)
.get_encode_collab(GetCollabOrigin::Server, cid.clone().into())
.await?;
yield UnindexedCollab {