feat: Import metrics (#911)

* chore: implement metrics for import

* chore: add metrics
This commit is contained in:
Nathan.fooo 2024-10-20 11:05:22 +08:00 committed by GitHub
parent b1f37dbbf6
commit 57c44818e2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 132 additions and 71 deletions

64
Cargo.lock generated
View File

@ -821,13 +821,13 @@ dependencies = [
"mailer",
"md5",
"mime_guess",
"prometheus-client",
"redis 0.25.4",
"secrecy",
"serde",
"serde_json",
"serde_repr",
"sqlx",
"tempdir",
"thiserror",
"tokio",
"tokio-stream",
@ -3374,12 +3374,6 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "fuchsia-cprng"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba"
[[package]]
name = "funty"
version = "2.0.0"
@ -5649,19 +5643,6 @@ version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09"
[[package]]
name = "rand"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293"
dependencies = [
"fuchsia-cprng",
"libc",
"rand_core 0.3.1",
"rdrand",
"winapi",
]
[[package]]
name = "rand"
version = "0.7.3"
@ -5706,21 +5687,6 @@ dependencies = [
"rand_core 0.6.4",
]
[[package]]
name = "rand_core"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b"
dependencies = [
"rand_core 0.4.2",
]
[[package]]
name = "rand_core"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc"
[[package]]
name = "rand_core"
version = "0.5.1"
@ -5790,15 +5756,6 @@ dependencies = [
"yasna",
]
[[package]]
name = "rdrand"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2"
dependencies = [
"rand_core 0.3.1",
]
[[package]]
name = "redis"
version = "0.23.3"
@ -5926,15 +5883,6 @@ version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b"
[[package]]
name = "remove_dir_all"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7"
dependencies = [
"winapi",
]
[[package]]
name = "rend"
version = "0.4.2"
@ -7268,16 +7216,6 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369"
[[package]]
name = "tempdir"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15f2b5fb00ccdf689e0149d1b1b3c03fead81c2b37735d812fa8bddbbf41b6d8"
dependencies = [
"rand 0.4.6",
"remove_dir_all",
]
[[package]]
name = "tempfile"
version = "3.10.1"

View File

@ -48,6 +48,5 @@ uuid.workspace = true
mailer.workspace = true
md5.workspace = true
base64.workspace = true
tempdir = "0.3.7"
prometheus-client = "0.22.3"

View File

@ -14,6 +14,11 @@ use axum::Router;
use secrecy::ExposeSecret;
use crate::mailer::AFWorkerMailer;
use crate::metric::ImportMetrics;
use axum::extract::State;
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::routing::get;
use mailer::sender::Mailer;
use std::sync::{Arc, Once};
use std::time::Duration;
@ -85,12 +90,14 @@ pub async fn create_app(listener: TcpListener, config: Config) -> Result<(), Err
let mailer = get_worker_mailer(&config).await?;
let s3_client = get_aws_s3_client(&config.s3_setting).await?;
let metrics = AppMetrics::new();
let state = AppState {
redis_client,
pg_pool,
s3_client,
mailer: mailer.clone(),
metrics,
};
let local_set = LocalSet::new();
@ -98,13 +105,16 @@ pub async fn create_app(listener: TcpListener, config: Config) -> Result<(), Err
let import_worker_fut = local_set.run_until(run_import_worker(
state.pg_pool.clone(),
state.redis_client.clone(),
Some(state.metrics.import_metrics.clone()),
Arc::new(state.s3_client.clone()),
Arc::new(email_notifier),
"import_task_stream",
10,
));
let app = Router::new().with_state(state);
let app = Router::new()
.route("/metrics", get(metrics_handler))
.with_state(Arc::new(state));
tokio::select! {
_ = import_worker_fut => {
@ -124,6 +134,7 @@ pub struct AppState {
pub pg_pool: PgPool,
pub s3_client: S3ClientImpl,
pub mailer: AFWorkerMailer,
pub metrics: AppMetrics,
}
async fn get_worker_mailer(config: &Config) -> Result<AFWorkerMailer, Error> {
@ -180,3 +191,34 @@ pub async fn get_aws_s3_client(s3_setting: &S3Setting) -> Result<S3ClientImpl, E
bucket: s3_setting.bucket.clone(),
})
}
#[derive(Clone)]
pub struct AppMetrics {
#[allow(dead_code)]
registry: Arc<prometheus_client::registry::Registry>,
import_metrics: Arc<ImportMetrics>,
}
impl AppMetrics {
pub fn new() -> Self {
let mut registry = prometheus_client::registry::Registry::default();
let import_metrics = Arc::new(ImportMetrics::register(&mut registry));
Self {
registry: Arc::new(registry),
import_metrics,
}
}
}
async fn metrics_handler(State(state): State<Arc<AppState>>) -> impl IntoResponse {
let mut buffer = String::new();
if let Err(err) = prometheus_client::encoding::text::encode(&mut buffer, &state.metrics.registry)
{
return (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to encode metrics: {:?}", err),
)
.into_response();
}
(StatusCode::OK, buffer).into_response()
}

View File

@ -26,8 +26,8 @@ use database::workspace::{
};
use database_entity::dto::CollabParams;
use crate::metric::ImportMetrics;
use async_zip::base::read::stream::{Ready, ZipFileReader};
use collab_importer::zip_tool::async_zip::async_unzip;
use collab_importer::zip_tool::sync_zip::sync_unzip;
use futures::stream::FuturesUnordered;
@ -67,6 +67,7 @@ const CONSUMER_NAME: &str = "appflowy_worker";
pub async fn run_import_worker(
pg_pool: PgPool,
mut redis_client: ConnectionManager,
metrics: Option<Arc<ImportMetrics>>,
s3_client: Arc<dyn S3Client>,
notifier: Arc<dyn ImportNotifier>,
stream_name: &str,
@ -90,6 +91,7 @@ pub async fn run_import_worker(
GROUP_NAME,
CONSUMER_NAME,
notifier.clone(),
&metrics,
)
.await;
@ -103,6 +105,7 @@ pub async fn run_import_worker(
CONSUMER_NAME,
notifier.clone(),
tick_interval_secs,
&metrics,
)
.await?;
@ -119,6 +122,7 @@ async fn process_un_acked_tasks(
group_name: &str,
consumer_name: &str,
notifier: Arc<dyn ImportNotifier>,
metrics: &Option<Arc<ImportMetrics>>,
) {
// when server restarts, we need to check if there are any unacknowledged tasks
match get_un_ack_tasks(stream_name, group_name, consumer_name, redis_client).await {
@ -136,6 +140,7 @@ async fn process_un_acked_tasks(
s3_client,
pg_pool,
notifier.clone(),
metrics,
)
.await;
}
@ -155,6 +160,7 @@ async fn process_upcoming_tasks(
consumer_name: &str,
notifier: Arc<dyn ImportNotifier>,
interval_secs: u64,
metrics: &Option<Arc<ImportMetrics>>,
) -> Result<(), ImportError> {
let options = StreamReadOptions::default()
.group(group_name, consumer_name)
@ -189,6 +195,7 @@ async fn process_upcoming_tasks(
let stream_name = stream_name.to_string();
let group_name = group_name.to_string();
let storage_dir = storage_dir.to_path_buf();
let metrics = metrics.clone();
task_handlers.push(spawn_local(async move {
consume_task(
&storage_dir,
@ -200,6 +207,7 @@ async fn process_upcoming_tasks(
&cloned_s3_client,
&pg_pool,
notifier,
&metrics,
)
.await?;
Ok::<(), ImportError>(())
@ -233,6 +241,7 @@ async fn consume_task(
s3_client: &Arc<dyn S3Client>,
pg_pool: &Pool<Postgres>,
notifier: Arc<dyn ImportNotifier>,
metrics: &Option<Arc<ImportMetrics>>,
) -> Result<(), ImportError> {
let result = process_task(
storage_dir,
@ -241,6 +250,7 @@ async fn consume_task(
redis_client,
pg_pool,
notifier,
metrics,
)
.await;
@ -263,6 +273,7 @@ async fn process_task(
redis_client: &mut ConnectionManager,
pg_pool: &PgPool,
notifier: Arc<dyn ImportNotifier>,
metrics: &Option<Arc<ImportMetrics>>,
) -> Result<(), ImportError> {
let retry_interval: u64 = get_env_var("APPFLOWY_WORKER_IMPORT_NOTION_RETRY_INTERVAL", "10")
.parse()
@ -287,6 +298,7 @@ async fn process_task(
3,
Duration::from_secs(retry_interval),
streaming,
metrics,
)
.await;
match unzip_result {
@ -305,7 +317,7 @@ async fn process_task(
}
clean_up(s3_client, &task).await;
notify_user(&task, result, notifier).await?;
notify_user(&task, result, notifier, metrics).await?;
},
Err(err) => {
// If there is any errors when download or unzip the file, we will remove the file from S3 and notify the user.
@ -314,7 +326,7 @@ async fn process_task(
}
remove_workspace(&task.workspace_id, pg_pool).await;
clean_up(s3_client, &task).await;
notify_user(&task, Err(err), notifier).await?;
notify_user(&task, Err(err), notifier, metrics).await?;
},
}
@ -347,14 +359,15 @@ pub async fn download_and_unzip_file_retry(
max_retries: usize,
interval: Duration,
streaming: bool,
metrics: &Option<Arc<ImportMetrics>>,
) -> Result<PathBuf, ImportError> {
let mut attempt = 0;
loop {
attempt += 1;
match download_and_unzip_file(storage_dir, import_task, s3_client, streaming).await {
match download_and_unzip_file(storage_dir, import_task, s3_client, streaming, metrics).await {
Ok(result) => return Ok(result),
Err(err) if attempt < max_retries && !err.is_file_not_found() => {
error!(
warn!(
"{} attempt {} failed: {}. Retrying in {:?}...",
import_task.workspace_id, attempt, err, interval
);
@ -380,6 +393,7 @@ async fn download_and_unzip_file(
import_task: &NotionImportTask,
s3_client: &Arc<dyn S3Client>,
streaming: bool,
metrics: &Option<Arc<ImportMetrics>>,
) -> Result<PathBuf, ImportError> {
let S3StreamResponse {
stream,
@ -389,7 +403,11 @@ async fn download_and_unzip_file(
.get_blob_stream(import_task.s3_key.as_str())
.await
.map_err(|err| ImportError::Internal(err.into()))?;
let buffer_size = buffer_size_from_content_length(content_length);
if let Some(metrics) = metrics {
metrics.record_import_size_bytes(buffer_size);
}
if streaming {
let zip_reader = get_zip_reader(buffer_size, StreamOrFile::Stream(stream)).await?;
let unique_file_name = Uuid::new_v4().to_string();
@ -832,11 +850,15 @@ async fn notify_user(
import_task: &NotionImportTask,
result: Result<(), ImportError>,
notifier: Arc<dyn ImportNotifier>,
metrics: &Option<Arc<ImportMetrics>>,
) -> Result<(), ImportError> {
let task_id = import_task.task_id.to_string();
let (error, error_detail) = match result {
Ok(_) => {
info!("[Import]: successfully imported:{}", import_task);
if let Some(metrics) = metrics {
metrics.incr_import_success_count(1);
}
(None, None)
},
Err(err) => {
@ -844,6 +866,9 @@ async fn notify_user(
"[Import]: failed to import:{}: error:{:?}",
import_task, err
);
if let Some(metrics) = metrics {
metrics.incr_import_fail_count(1);
}
let (error, error_detail) = err.report(&task_id);
(Some(error), Some(error_detail))
},

View File

@ -1,4 +1,5 @@
pub mod error;
pub mod import_worker;
mod mailer;
pub mod metric;
pub mod s3_client;

View File

@ -4,6 +4,8 @@ pub mod error;
pub mod import_worker;
pub(crate) mod s3_client;
mod metric;
mod mailer;
use crate::application::run_server;
use crate::config::Config;

View File

@ -0,0 +1,53 @@
use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::metrics::histogram::{exponential_buckets, Histogram};
use prometheus_client::registry::Registry;
pub struct ImportMetrics {
pub update_size_bytes: Histogram,
pub import_success_count: Gauge,
pub import_fail_count: Gauge,
}
impl ImportMetrics {
pub fn init() -> Self {
let update_size_buckets = exponential_buckets(1024.0, 2.0, 10);
Self {
update_size_bytes: Histogram::new(update_size_buckets),
import_success_count: Default::default(),
import_fail_count: Default::default(),
}
}
pub fn register(registry: &mut Registry) -> Self {
let metrics = Self::init();
let web_update_registry = registry.sub_registry_with_prefix("appflowy_web");
web_update_registry.register(
"import_payload_size_bytes",
"Size of the update in bytes",
metrics.update_size_bytes.clone(),
);
web_update_registry.register(
"import_success_count",
"import success count",
metrics.import_success_count.clone(),
);
web_update_registry.register(
"import_fail_count",
"import fail count",
metrics.import_fail_count.clone(),
);
metrics
}
pub fn record_import_size_bytes(&self, size: usize) {
self.update_size_bytes.observe(size as f64);
}
pub fn incr_import_success_count(&self, count: i64) {
self.import_success_count.inc_by(count);
}
pub fn incr_import_fail_count(&self, count: i64) {
self.import_fail_count.inc_by(count);
}
}

View File

@ -143,6 +143,7 @@ fn run_importer_worker(
let import_worker_fut = local_set.run_until(run_import_worker(
pg_pool,
redis_client,
None,
Arc::new(MockS3Client),
notifier,
&stream_name,