Merge branch 'main' into feat/delete-user

This commit is contained in:
Zack Fu Zi Xiang 2024-09-02 01:00:28 +08:00
commit a2af9c3b63
No known key found for this signature in database
8 changed files with 42 additions and 13 deletions

View File

@ -125,6 +125,7 @@ APPFLOWY_HISTORY_REDIS_URL=redis://redis:6379
APPFLOWY_HISTORY_DATABASE_URL=postgres://postgres:password@postgres:5432/postgres
# AppFlowy Indexer
APPFLOWY_INDEXER_ENABLED=true
APPFLOWY_INDEXER_DATABASE_URL=postgres://postgres:password@postgres:5432/postgres
APPFLOWY_INDEXER_REDIS_URL=redis://redis:6379

View File

@ -111,6 +111,7 @@ APPFLOWY_HISTORY_REDIS_URL=redis://redis:6379
APPFLOWY_HISTORY_DATABASE_URL=postgres://postgres:password@postgres:5432/postgres
# AppFlowy Indexer
APPFLOWY_INDEXER_ENABLED=true
APPFLOWY_INDEXER_DATABASE_URL=postgres://postgres:password@postgres:5432/postgres
APPFLOWY_INDEXER_REDIS_URL=redis://redis:6379

View File

@ -6,7 +6,6 @@ use crate::dto::{
};
use crate::error::AIError;
use bytes::Bytes;
use futures::{ready, Stream, StreamExt, TryStreamExt};
use reqwest;
use reqwest::{Method, RequestBuilder, StatusCode};
@ -21,7 +20,9 @@ use std::borrow::Cow;
use std::marker::PhantomData;
use std::pin::Pin;
use bytes::Bytes;
use std::task::{Context, Poll};
use std::time::Duration;
use tracing::{info, trace};
const AI_MODEL_HEADER_KEY: &str = "ai-model";
@ -235,6 +236,7 @@ impl AppFlowyAIClient {
let resp = self
.http_client(Method::POST, &url)?
.header(AI_MODEL_HEADER_KEY, model.to_str())
.timeout(Duration::from_secs(30))
.json(&json)
.send()
.await?;
@ -258,6 +260,7 @@ impl AppFlowyAIClient {
.http_client(Method::POST, &url)?
.header(AI_MODEL_HEADER_KEY, model.to_str())
.json(&json)
.timeout(Duration::from_secs(30))
.send()
.await?;
AIResponse::<()>::stream_response(resp).await
@ -273,6 +276,7 @@ impl AppFlowyAIClient {
let resp = self
.http_client(Method::GET, &url)?
.header(AI_MODEL_HEADER_KEY, model.to_str())
.timeout(Duration::from_secs(30))
.send()
.await?;
AIResponse::<RepeatedRelatedQuestion>::from_response(resp)

View File

@ -193,12 +193,13 @@ impl CollabGroup {
// In debug mode, we set the timeout to 60 seconds
if cfg!(debug_assertions) {
trace!(
"Group:{} is inactive for {} seconds, subscribers: {}",
"Group:{}:{} is inactive for {} seconds, subscribers: {}",
self.object_id,
self.collab_type,
modified_at.elapsed().as_secs(),
self.subscribers.len()
);
modified_at.elapsed().as_secs() > 60 * 60
modified_at.elapsed().as_secs() > 60 * 2
} else {
let elapsed_secs = modified_at.elapsed().as_secs();
if elapsed_secs > self.timeout_secs() {
@ -209,8 +210,9 @@ impl CollabGroup {
const MAXIMUM_SECS: u64 = 3 * 60 * 60;
if elapsed_secs > MAXIMUM_SECS {
info!(
"Group:{} is inactive for {} seconds, subscribers: {}",
"Group:{}:{} is inactive for {} seconds, subscribers: {}",
self.object_id,
self.collab_type,
modified_at.elapsed().as_secs(),
self.subscribers.len()
);

View File

@ -45,7 +45,12 @@ impl GroupManagementState {
}
}
info!("Remove inactive group ids: {:?}", inactive_group_ids);
info!(
"total groups:{}, inactive group:{:?}, inactive group ids:{:?}",
self.group_by_object_id.len() as i64,
inactive_group_ids.len(),
inactive_group_ids,
);
for object_id in &inactive_group_ids {
self.remove_group(object_id).await;
}

View File

@ -12,8 +12,11 @@ use collab::preclude::Collab;
use collab_entity::CollabType;
use sqlx::PgPool;
use tokio_stream::StreamExt;
use tracing::info;
use uuid::Uuid;
use crate::config::get_env_var;
use crate::indexer::DocumentIndexer;
use app_error::AppError;
use appflowy_ai_client::client::AppFlowyAIClient;
use database::collab::select_blob_from_af_collab;
@ -21,8 +24,6 @@ use database::index::{get_collabs_without_embeddings, upsert_collab_embeddings};
use database::workspace::select_workspace_settings;
use database_entity::dto::{AFCollabEmbeddingParams, AFCollabEmbeddings, CollabParams};
use crate::indexer::DocumentIndexer;
#[async_trait]
pub trait Indexer: Send + Sync {
fn embedding_params(&self, collab: &Collab) -> Result<Vec<AFCollabEmbeddingParams>, AppError>;
@ -60,7 +61,14 @@ pub struct IndexerProvider {
impl IndexerProvider {
pub fn new(db: PgPool, ai_client: AppFlowyAIClient) -> Arc<Self> {
let mut cache: HashMap<CollabType, Arc<dyn Indexer>> = HashMap::new();
cache.insert(CollabType::Document, DocumentIndexer::new(ai_client));
let enabled = get_env_var("APPFLOWY_INDEXER_ENABLED", "true")
.parse::<bool>()
.unwrap_or(true);
info!("Indexer is enabled: {}", enabled);
if enabled {
cache.insert(CollabType::Document, DocumentIndexer::new(ai_client));
}
Arc::new(Self {
db,
indexer_cache: cache,

View File

@ -289,7 +289,6 @@ fn spawn_period_check_inactive_group<S, AC>(
interval.tick().await;
if let Some(groups) = weak_groups.upgrade() {
let inactive_group_ids = groups.get_inactive_groups().await;
trace!("Inactive group ids: {:?}", inactive_group_ids);
for id in inactive_group_ids {
cloned_group_sender_by_object_id.remove(&id);
}

View File

@ -1,8 +1,9 @@
use actix_web::rt::task::JoinHandle;
use tracing::subscriber::set_global_default;
use tracing_subscriber::{layer::SubscriberExt, EnvFilter};
use crate::config::config::Environment;
use actix_web::rt::task::JoinHandle;
use chrono::Local;
use tracing::subscriber::set_global_default;
use tracing_subscriber::fmt::format::Writer;
use tracing_subscriber::{layer::SubscriberExt, EnvFilter};
/// Register a subscriber as global default to process span data.
///
@ -20,6 +21,7 @@ pub fn init_subscriber(app_env: &Environment, filters: Vec<String>) {
Environment::Local => {
let subscriber = builder
.with_ansi(true)
.with_timer(CustomTime)
.with_target(false)
.with_file(false)
.pretty()
@ -39,6 +41,13 @@ pub fn init_subscriber(app_env: &Environment, filters: Vec<String>) {
}
}
struct CustomTime;
impl tracing_subscriber::fmt::time::FormatTime for CustomTime {
fn format_time(&self, w: &mut Writer<'_>) -> std::fmt::Result {
write!(w, "{}", Local::now().format("%Y-%m-%d %H:%M:%S"))
}
}
pub fn spawn_blocking_with_tracing<F, R>(f: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,