diff --git a/services/appflowy-collaborate/src/application.rs b/services/appflowy-collaborate/src/application.rs index 3f0e6809..8f31663e 100644 --- a/services/appflowy-collaborate/src/application.rs +++ b/services/appflowy-collaborate/src/application.rs @@ -24,7 +24,6 @@ use crate::actix_ws::server::RealtimeServerActor; use crate::api::{collab_scope, ws_scope}; use crate::collab::access_control::CollabStorageAccessControlImpl; use access_control::casbin::access::AccessControl; -use appflowy_ai_client::client::AppFlowyAIClient; use collab_stream::stream_router::{StreamRouter, StreamRouterOptions}; use database::file::s3_client_impl::AwsS3BucketClientImpl; diff --git a/services/appflowy-collaborate/src/group/cmd.rs b/services/appflowy-collaborate/src/group/cmd.rs index 699edd00..2a8538cd 100644 --- a/services/appflowy-collaborate/src/group/cmd.rs +++ b/services/appflowy-collaborate/src/group/cmd.rs @@ -149,7 +149,10 @@ where }, GroupCommand::GenerateCollabEmbedding { object_id } => { if let Some(group) = self.group_manager.get_group(&object_id).await { - group.generate_embeddings().await; + match group.generate_embeddings().await { + Ok(_) => trace!("successfully created embeddings for {}", object_id), + Err(err) => trace!("failed to create embeddings for {}: {}", object_id, err), + } } }, GroupCommand::CalculateMissingUpdate { diff --git a/services/appflowy-collaborate/src/group/group_init.rs b/services/appflowy-collaborate/src/group/group_init.rs index c5b22f72..67fb47a7 100644 --- a/services/appflowy-collaborate/src/group/group_init.rs +++ b/services/appflowy-collaborate/src/group/group_init.rs @@ -1,6 +1,5 @@ use crate::error::RealtimeError; -use crate::indexer::Indexer; -use crate::metrics::CollabRealtimeMetrics; +use crate::indexer::IndexedCollab; use anyhow::anyhow; use app_error::AppError; use arc_swap::ArcSwap; @@ -19,13 +18,13 @@ use collab_rt_protocol::{Message, MessageReader, RTProtocolError, SyncMessage}; use collab_stream::client::CollabRedisStream; use collab_stream::collab_update_sink::{AwarenessUpdateSink, CollabUpdateSink}; +use crate::indexer::IndexerScheduler; +use crate::metrics::CollabRealtimeMetrics; use collab_stream::error::StreamError; use collab_stream::model::{AwarenessStreamUpdate, CollabStreamUpdate, MessageId, UpdateFlags}; use dashmap::DashMap; use database::collab::{CollabStorage, GetCollabOrigin}; -use database_entity::dto::{ - AFCollabEmbeddings, CollabParams, InsertSnapshotParams, QueryCollabParams, -}; +use database_entity::dto::{CollabParams, InsertSnapshotParams, QueryCollabParams}; use futures::{pin_mut, Sink, Stream}; use futures_util::{SinkExt, StreamExt}; use std::sync::atomic::{AtomicU32, Ordering}; @@ -38,16 +37,6 @@ use yrs::updates::decoder::{Decode, DecoderV1}; use yrs::updates::encoder::{Encode, Encoder, EncoderV1}; use yrs::{ReadTxn, StateVector, Update}; -use collab_stream::error::StreamError; - -use database::collab::CollabStorage; - -use crate::error::RealtimeError; -use crate::group::broadcast::{CollabBroadcast, Subscription}; -use crate::group::persistence::GroupPersistence; -use crate::indexer::IndexerScheduler; -use crate::metrics::CollabRealtimeMetrics; - /// A group used to manage a single [Collab] object pub struct CollabGroup { state: Arc, @@ -66,7 +55,6 @@ struct CollabGroupState { metrics: Arc, /// Cancellation token triggered when current collab group is about to be stopped. /// This will also shut down all subsequent [Subscription]s. - indexer_scheduler: Arc, shutdown: CancellationToken, last_activity: ArcSwap, seq_no: AtomicU32, @@ -107,10 +95,9 @@ impl CollabGroup { collab_type.clone(), storage, collab_redis_stream, - indexer, + indexer_scheduler, metrics.clone(), prune_grace_period, - indexer_scheduler ); let state = Arc::new(CollabGroupState { @@ -346,35 +333,28 @@ impl CollabGroup { /// Generate embedding for the current Collab immediately /// - pub async fn generate_embeddings(&self) { - let result = self + pub async fn generate_embeddings(&self) -> Result<(), AppError> { + let collab = self + .encode_collab() + .await + .map_err(|e| AppError::Internal(e.into()))?; + let collab = Collab::new_with_source( + CollabOrigin::Server, + self.object_id(), + DataSource::DocStateV1(collab.doc_state.into()), + vec![], + false, + ) + .map_err(|e| AppError::Internal(e.into()))?; + let workspace_id = &self.state.workspace_id; + let object_id = &self.state.object_id; + let collab_type = &self.state.collab_type; + self + .state + .persister .indexer_scheduler - .index_collab( - &self.workspace_id, - &self.object_id, - &self.collab, - &self.collab_type, - ) - .await; - match result { - Ok(_) => { - trace!( - "successfully indexed embeddings for {} {}/{}", - self.collab_type, - self.workspace_id, - self.object_id - ); - }, - Err(err) => { - trace!( - "failed to index embeddings for document {} {}/{}: {}", - self.collab_type, - self.workspace_id, - self.object_id, - err - ); - }, - } + .index_collab(workspace_id, object_id, &collab, collab_type) + .await } pub async fn calculate_missing_update( @@ -917,7 +897,7 @@ struct CollabPersister { collab_type: CollabType, storage: Arc, collab_redis_stream: Arc, - indexer: Option>, + indexer_scheduler: Arc, metrics: Arc, update_sink: CollabUpdateSink, awareness_sink: AwarenessUpdateSink, @@ -935,7 +915,7 @@ impl CollabPersister { collab_type: CollabType, storage: Arc, collab_redis_stream: Arc, - indexer: Option>, + indexer_scheduler: Arc, metrics: Arc, prune_grace_period: Duration, ) -> Self { @@ -948,7 +928,7 @@ impl CollabPersister { collab_type, storage, collab_redis_stream, - indexer, + indexer_scheduler, metrics, update_sink, awareness_sink, @@ -1188,12 +1168,8 @@ impl CollabPersister { .metrics .collab_size .observe(encoded_collab.len() as f64); - let mut params = CollabParams::new(&self.object_id, self.collab_type.clone(), encoded_collab); - - match self.embeddings(collab).await { - Ok(embeddings) => params.embeddings = embeddings, - Err(err) => tracing::warn!("failed to fetch embeddings `{}`: {}", self.object_id, err), - } + let params = CollabParams::new(&self.object_id, self.collab_type.clone(), encoded_collab); + let encoded_collab = params.encoded_collab_v1.clone(); self .storage @@ -1209,6 +1185,23 @@ impl CollabPersister { light_len ); + let indexed_collab = IndexedCollab { + object_id: self.object_id.clone(), + collab_type: self.collab_type.clone(), + encoded_collab, + }; + if let Err(err) = self + .indexer_scheduler + .index_encoded_collab_one(&self.workspace_id, indexed_collab) + { + tracing::warn!( + "failed to index collab `{}/{}`: {}", + self.workspace_id, + self.object_id, + err + ); + } + // 3. finally we can drop Redis messages let now = SystemTime::UNIX_EPOCH.elapsed().unwrap().as_millis(); let msg_id = MessageId { @@ -1227,16 +1220,6 @@ impl CollabPersister { Ok(()) } - async fn embeddings(&self, collab: &Collab) -> Result, AppError> { - if let Some(indexer) = self.indexer.clone() { - let params = indexer.embedding_params(collab).await?; - let embeddings = indexer.embeddings(params).await?; - Ok(embeddings) - } else { - Ok(None) - } - } - async fn load_collab_full(&self, keep_history: bool) -> Result, RealtimeError> { let doc_state = if keep_history { // if we want history-keeping variant, we need to get a snapshot diff --git a/services/appflowy-collaborate/src/group/manager.rs b/services/appflowy-collaborate/src/group/manager.rs index 972bebcc..3ca70680 100644 --- a/services/appflowy-collaborate/src/group/manager.rs +++ b/services/appflowy-collaborate/src/group/manager.rs @@ -145,7 +145,7 @@ where collab_type ); - let group = Arc::new(CollabGroup::new( + let group = CollabGroup::new( user.uid, workspace_id.to_string(), object_id.to_string(), diff --git a/services/appflowy-collaborate/src/indexer/indexer_scheduler.rs b/services/appflowy-collaborate/src/indexer/indexer_scheduler.rs index c007feb3..04f3c990 100644 --- a/services/appflowy-collaborate/src/indexer/indexer_scheduler.rs +++ b/services/appflowy-collaborate/src/indexer/indexer_scheduler.rs @@ -13,7 +13,6 @@ use bytes::Bytes; use collab::core::collab::DataSource; use collab::core::origin::CollabOrigin; use collab::entity::EncodedCollab; -use collab::lock::RwLock; use collab::preclude::Collab; use collab_entity::CollabType; use database::collab::{CollabStorage, GetCollabOrigin}; @@ -216,7 +215,7 @@ impl IndexerScheduler { &self, workspace_id: &str, object_id: &str, - collab: &Arc>, + collab: &Collab, collab_type: &CollabType, ) -> Result<(), AppError> { if !self.index_enabled() { @@ -235,9 +234,7 @@ impl IndexerScheduler { )) })?; - let lock = collab.read().await; - let chunks = indexer.create_embedded_chunks(&lock, embedder.model())?; - drop(lock); // release the read lock ASAP + let chunks = indexer.create_embedded_chunks(collab, embedder.model())?; let threads = self.threads.clone(); let tx = self.schedule_tx.clone(); @@ -348,7 +345,7 @@ fn get_unindexed_collabs( match &cid.collab_type { CollabType::Document => { let collab = storage - .get_encode_collab(GetCollabOrigin::Server, cid.clone().into(), false) + .get_encode_collab(GetCollabOrigin::Server, cid.clone().into()) .await?; yield UnindexedCollab {