diff --git a/deploy.env b/deploy.env index c1b6dae1..12ed54d6 100644 --- a/deploy.env +++ b/deploy.env @@ -125,6 +125,7 @@ APPFLOWY_HISTORY_REDIS_URL=redis://redis:6379 APPFLOWY_HISTORY_DATABASE_URL=postgres://postgres:password@postgres:5432/postgres # AppFlowy Indexer +APPFLOWY_INDEXER_ENABLED=true APPFLOWY_INDEXER_DATABASE_URL=postgres://postgres:password@postgres:5432/postgres APPFLOWY_INDEXER_REDIS_URL=redis://redis:6379 diff --git a/dev.env b/dev.env index 95e783dd..ee368318 100644 --- a/dev.env +++ b/dev.env @@ -111,6 +111,7 @@ APPFLOWY_HISTORY_REDIS_URL=redis://redis:6379 APPFLOWY_HISTORY_DATABASE_URL=postgres://postgres:password@postgres:5432/postgres # AppFlowy Indexer +APPFLOWY_INDEXER_ENABLED=true APPFLOWY_INDEXER_DATABASE_URL=postgres://postgres:password@postgres:5432/postgres APPFLOWY_INDEXER_REDIS_URL=redis://redis:6379 diff --git a/libs/appflowy-ai-client/src/client.rs b/libs/appflowy-ai-client/src/client.rs index c042870b..4dc43650 100644 --- a/libs/appflowy-ai-client/src/client.rs +++ b/libs/appflowy-ai-client/src/client.rs @@ -6,7 +6,6 @@ use crate::dto::{ }; use crate::error::AIError; -use bytes::Bytes; use futures::{ready, Stream, StreamExt, TryStreamExt}; use reqwest; use reqwest::{Method, RequestBuilder, StatusCode}; @@ -21,7 +20,9 @@ use std::borrow::Cow; use std::marker::PhantomData; use std::pin::Pin; +use bytes::Bytes; use std::task::{Context, Poll}; +use std::time::Duration; use tracing::{info, trace}; const AI_MODEL_HEADER_KEY: &str = "ai-model"; @@ -235,6 +236,7 @@ impl AppFlowyAIClient { let resp = self .http_client(Method::POST, &url)? .header(AI_MODEL_HEADER_KEY, model.to_str()) + .timeout(Duration::from_secs(30)) .json(&json) .send() .await?; @@ -258,6 +260,7 @@ impl AppFlowyAIClient { .http_client(Method::POST, &url)? .header(AI_MODEL_HEADER_KEY, model.to_str()) .json(&json) + .timeout(Duration::from_secs(30)) .send() .await?; AIResponse::<()>::stream_response(resp).await @@ -273,6 +276,7 @@ impl AppFlowyAIClient { let resp = self .http_client(Method::GET, &url)? .header(AI_MODEL_HEADER_KEY, model.to_str()) + .timeout(Duration::from_secs(30)) .send() .await?; AIResponse::::from_response(resp) diff --git a/services/appflowy-collaborate/src/group/group_init.rs b/services/appflowy-collaborate/src/group/group_init.rs index 66a6822b..ecdff281 100644 --- a/services/appflowy-collaborate/src/group/group_init.rs +++ b/services/appflowy-collaborate/src/group/group_init.rs @@ -193,12 +193,13 @@ impl CollabGroup { // In debug mode, we set the timeout to 60 seconds if cfg!(debug_assertions) { trace!( - "Group:{} is inactive for {} seconds, subscribers: {}", + "Group:{}:{} is inactive for {} seconds, subscribers: {}", self.object_id, + self.collab_type, modified_at.elapsed().as_secs(), self.subscribers.len() ); - modified_at.elapsed().as_secs() > 60 * 60 + modified_at.elapsed().as_secs() > 60 * 2 } else { let elapsed_secs = modified_at.elapsed().as_secs(); if elapsed_secs > self.timeout_secs() { @@ -209,8 +210,9 @@ impl CollabGroup { const MAXIMUM_SECS: u64 = 3 * 60 * 60; if elapsed_secs > MAXIMUM_SECS { info!( - "Group:{} is inactive for {} seconds, subscribers: {}", + "Group:{}:{} is inactive for {} seconds, subscribers: {}", self.object_id, + self.collab_type, modified_at.elapsed().as_secs(), self.subscribers.len() ); diff --git a/services/appflowy-collaborate/src/group/state.rs b/services/appflowy-collaborate/src/group/state.rs index 59b1e8e6..86b82543 100644 --- a/services/appflowy-collaborate/src/group/state.rs +++ b/services/appflowy-collaborate/src/group/state.rs @@ -45,7 +45,12 @@ impl GroupManagementState { } } - info!("Remove inactive group ids: {:?}", inactive_group_ids); + info!( + "total groups:{}, inactive group:{:?}, inactive group ids:{:?}", + self.group_by_object_id.len() as i64, + inactive_group_ids.len(), + inactive_group_ids, + ); for object_id in &inactive_group_ids { self.remove_group(object_id).await; } diff --git a/services/appflowy-collaborate/src/indexer/provider.rs b/services/appflowy-collaborate/src/indexer/provider.rs index 4f02ca1e..dd064acc 100644 --- a/services/appflowy-collaborate/src/indexer/provider.rs +++ b/services/appflowy-collaborate/src/indexer/provider.rs @@ -12,8 +12,11 @@ use collab::preclude::Collab; use collab_entity::CollabType; use sqlx::PgPool; use tokio_stream::StreamExt; +use tracing::info; use uuid::Uuid; +use crate::config::get_env_var; +use crate::indexer::DocumentIndexer; use app_error::AppError; use appflowy_ai_client::client::AppFlowyAIClient; use database::collab::select_blob_from_af_collab; @@ -21,8 +24,6 @@ use database::index::{get_collabs_without_embeddings, upsert_collab_embeddings}; use database::workspace::select_workspace_settings; use database_entity::dto::{AFCollabEmbeddingParams, AFCollabEmbeddings, CollabParams}; -use crate::indexer::DocumentIndexer; - #[async_trait] pub trait Indexer: Send + Sync { fn embedding_params(&self, collab: &Collab) -> Result, AppError>; @@ -60,7 +61,14 @@ pub struct IndexerProvider { impl IndexerProvider { pub fn new(db: PgPool, ai_client: AppFlowyAIClient) -> Arc { let mut cache: HashMap> = HashMap::new(); - cache.insert(CollabType::Document, DocumentIndexer::new(ai_client)); + let enabled = get_env_var("APPFLOWY_INDEXER_ENABLED", "true") + .parse::() + .unwrap_or(true); + + info!("Indexer is enabled: {}", enabled); + if enabled { + cache.insert(CollabType::Document, DocumentIndexer::new(ai_client)); + } Arc::new(Self { db, indexer_cache: cache, diff --git a/services/appflowy-collaborate/src/rt_server.rs b/services/appflowy-collaborate/src/rt_server.rs index d43eda26..eb3080ba 100644 --- a/services/appflowy-collaborate/src/rt_server.rs +++ b/services/appflowy-collaborate/src/rt_server.rs @@ -289,7 +289,6 @@ fn spawn_period_check_inactive_group( interval.tick().await; if let Some(groups) = weak_groups.upgrade() { let inactive_group_ids = groups.get_inactive_groups().await; - trace!("Inactive group ids: {:?}", inactive_group_ids); for id in inactive_group_ids { cloned_group_sender_by_object_id.remove(&id); } diff --git a/src/telemetry.rs b/src/telemetry.rs index 209eccc8..f1cfae2d 100644 --- a/src/telemetry.rs +++ b/src/telemetry.rs @@ -1,8 +1,9 @@ -use actix_web::rt::task::JoinHandle; -use tracing::subscriber::set_global_default; -use tracing_subscriber::{layer::SubscriberExt, EnvFilter}; - use crate::config::config::Environment; +use actix_web::rt::task::JoinHandle; +use chrono::Local; +use tracing::subscriber::set_global_default; +use tracing_subscriber::fmt::format::Writer; +use tracing_subscriber::{layer::SubscriberExt, EnvFilter}; /// Register a subscriber as global default to process span data. /// @@ -20,6 +21,7 @@ pub fn init_subscriber(app_env: &Environment, filters: Vec) { Environment::Local => { let subscriber = builder .with_ansi(true) + .with_timer(CustomTime) .with_target(false) .with_file(false) .pretty() @@ -39,6 +41,13 @@ pub fn init_subscriber(app_env: &Environment, filters: Vec) { } } +struct CustomTime; +impl tracing_subscriber::fmt::time::FormatTime for CustomTime { + fn format_time(&self, w: &mut Writer<'_>) -> std::fmt::Result { + write!(w, "{}", Local::now().format("%Y-%m-%d %H:%M:%S")) + } +} + pub fn spawn_blocking_with_tracing(f: F) -> JoinHandle where F: FnOnce() -> R + Send + 'static,