From 62abd8abbf4dddb5822842f28d1fba67ccfae6de Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Wed, 29 Nov 2023 15:40:52 -0800 Subject: [PATCH] feat: return request id to client api (#186) * docs: update * chore: add logs * chore: test * feat: return request id to the client * feat: print request id in client api * fix: lock table when creating new user --- ...dcc9d963cc033582bf2e945e8bf3a301b4247.json | 22 +++++++++ Cargo.lock | 1 + admin_frontend/src/web_api.rs | 18 +++++++- libs/app_error/src/lib.rs | 16 +++---- libs/client-api/src/http.rs | 39 +++++++++++++++- libs/client-api/src/notify.rs | 3 ++ libs/shared-entity/Cargo.toml | 1 + libs/shared-entity/src/response.rs | 7 --- src/biz/user.rs | 12 +++++ src/middleware/request_id.rs | 46 +++++++++++++++---- 10 files changed, 138 insertions(+), 27 deletions(-) create mode 100644 .sqlx/query-a06e1d9f6f95e4c4c2b98310ebddcc9d963cc033582bf2e945e8bf3a301b4247.json diff --git a/.sqlx/query-a06e1d9f6f95e4c4c2b98310ebddcc9d963cc033582bf2e945e8bf3a301b4247.json b/.sqlx/query-a06e1d9f6f95e4c4c2b98310ebddcc9d963cc033582bf2e945e8bf3a301b4247.json new file mode 100644 index 00000000..909e6ad4 --- /dev/null +++ b/.sqlx/query-a06e1d9f6f95e4c4c2b98310ebddcc9d963cc033582bf2e945e8bf3a301b4247.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT pg_advisory_xact_lock($1)", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "pg_advisory_xact_lock", + "type_info": "Void" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + null + ] + }, + "hash": "a06e1d9f6f95e4c4c2b98310ebddcc9d963cc033582bf2e945e8bf3a301b4247" +} diff --git a/Cargo.lock b/Cargo.lock index 987eaee3..3193cdf7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4310,6 +4310,7 @@ dependencies = [ "collab-entity", "database-entity", "gotrue-entity", + "log", "opener", "reqwest", "rust-s3", diff --git a/admin_frontend/src/web_api.rs b/admin_frontend/src/web_api.rs index a547ea0c..ef530ec6 100644 --- a/admin_frontend/src/web_api.rs +++ b/admin_frontend/src/web_api.rs @@ -79,7 +79,23 @@ pub async fn admin_create_sso_handler( Ok(WebApiResponse::<()>::from_str("SSO Added".into())) } -// provide a link which when open in browser, opens the appflowy app +/// Generates a URL to facilitate login redirection to the AppFlowy app from a web browser. +/// +/// This function creates a custom URL scheme that can be used in a web browser to open the +/// AppFlowy app and automatically handle user login based on the provided `UserSession`. +/// +/// # Returns +/// A `Result` containing `HeaderMap` for HTTP redirection if successful, or `WebApiError` in case of failure. +/// +/// # Example URL Format +/// `appflowy-flutter://login-callback#access_token=...&expires_at=...&expires_in=...&refresh_token=...&token_type=...` +/// +/// The URL includes access token information and other relevant session details. +/// +/// # Usage +/// The client application should implement handling for this URL format, typically through the +/// `sign_in_with_url` method in the `client-api` crate. See [client_api::Client::sign_in_with_url] for more details. +/// pub async fn open_app_handler(session: UserSession) -> Result> { let app_sign_in_url = format!( "appflowy-flutter://login-callback#access_token={}&expires_at={}&expires_in={}&refresh_token={}&token_type={}", diff --git a/libs/app_error/src/lib.rs b/libs/app_error/src/lib.rs index 492b3739..287728ad 100644 --- a/libs/app_error/src/lib.rs +++ b/libs/app_error/src/lib.rs @@ -144,19 +144,19 @@ impl AppError { } impl From for AppError { - fn from(value: reqwest::Error) -> Self { - if value.is_connect() { - return AppError::Connect(value.to_string()); + fn from(error: reqwest::Error) -> Self { + if error.is_connect() { + return AppError::Connect(error.to_string()); } - if value.is_timeout() { - return AppError::RequestTimeout(value.to_string()); + if error.is_timeout() { + return AppError::RequestTimeout(error.to_string()); } - if value.is_request() { - return AppError::InvalidRequest(value.to_string()); + if error.is_request() { + return AppError::InvalidRequest(error.to_string()); } - AppError::Unhandled(value.to_string()) + AppError::Unhandled(error.to_string()) } } diff --git a/libs/client-api/src/http.rs b/libs/client-api/src/http.rs index 21fa0b6d..1c4e0b0a 100644 --- a/libs/client-api/src/http.rs +++ b/libs/client-api/src/http.rs @@ -22,6 +22,7 @@ use mime::Mime; use parking_lot::RwLock; use realtime_entity::EncodedCollabV1; use reqwest::header; + use reqwest::Method; use reqwest::RequestBuilder; use shared_entity::dto::auth_dto::SignInTokenResponse; @@ -143,6 +144,7 @@ impl Client { /// It looks like, e.g., `appflowy-flutter://#access_token=...&expires_in=3600&provider_token=...&refresh_token=...&token_type=bearer`. /// /// return a bool indicating if the user is new + #[instrument(level = "debug", skip_all, err)] pub async fn sign_in_with_url(&self, url: &str) -> Result { let mut access_token: Option = None; let mut token_type: Option = None; @@ -386,6 +388,7 @@ impl Client { .body(msg) .send() .await?; + log_request_id(&resp); AppResponse::<()>::from_response(resp).await?.into_error() } @@ -449,6 +452,7 @@ impl Client { .await? .send() .await?; + log_request_id(&resp); AppResponse::::from_response(resp) .await? .into_data() @@ -462,6 +466,7 @@ impl Client { .await? .send() .await?; + log_request_id(&resp); AppResponse::::from_response(resp) .await? .into_data() @@ -475,6 +480,7 @@ impl Client { .await? .send() .await?; + log_request_id(&resp); AppResponse::::from_response(resp) .await? .into_data() @@ -488,6 +494,7 @@ impl Client { .await? .send() .await?; + log_request_id(&resp); AppResponse::::from_response(resp) .await? .into_data() @@ -508,6 +515,7 @@ impl Client { .await? .send() .await?; + log_request_id(&resp); AppResponse::>::from_response(resp) .await? .into_data() @@ -531,6 +539,7 @@ impl Client { .json(&members) .send() .await?; + log_request_id(&resp); AppResponse::<()>::from_response(resp).await?.into_error()?; Ok(()) } @@ -552,6 +561,7 @@ impl Client { .json(&changeset) .send() .await?; + log_request_id(&resp); AppResponse::<()>::from_response(resp).await?.into_error()?; Ok(()) } @@ -574,12 +584,11 @@ impl Client { .json(&payload) .send() .await?; + log_request_id(&resp); AppResponse::<()>::from_response(resp).await?.into_error()?; Ok(()) } - // pub async fn update_workspace_member(&self, workspace_uuid: Uuid, member) - #[instrument(skip_all, err)] pub async fn sign_in_password( &self, @@ -686,6 +695,7 @@ impl Client { .json(¶ms) .send() .await?; + log_request_id(&resp); AppResponse::<()>::from_response(resp).await?.into_error() } @@ -701,6 +711,7 @@ impl Client { .json(¶ms) .send() .await?; + log_request_id(&resp); AppResponse::<()>::from_response(resp).await?.into_error() } @@ -716,6 +727,7 @@ impl Client { .json(¶ms) .send() .await?; + log_request_id(&resp); AppResponse::<()>::from_response(resp).await?.into_error() } @@ -734,6 +746,7 @@ impl Client { .json(¶ms) .send() .await?; + log_request_id(&resp); AppResponse::::from_response(resp) .await? .into_data() @@ -755,6 +768,7 @@ impl Client { .json(¶ms) .send() .await?; + log_request_id(&resp); AppResponse::::from_response(resp) .await? .into_data() @@ -772,6 +786,7 @@ impl Client { .json(¶ms) .send() .await?; + log_request_id(&resp); AppResponse::<()>::from_response(resp).await?.into_error() } @@ -790,6 +805,7 @@ impl Client { .json(¶ms) .send() .await?; + log_request_id(&resp); AppResponse::<()>::from_response(resp).await?.into_error() } @@ -808,6 +824,7 @@ impl Client { .json(¶ms) .send() .await?; + log_request_id(&resp); AppResponse::::from_response(resp) .await? .into_data() @@ -828,6 +845,7 @@ impl Client { .json(¶ms) .send() .await?; + log_request_id(&resp); AppResponse::<()>::from_response(resp).await?.into_error() } @@ -846,6 +864,7 @@ impl Client { .json(¶ms) .send() .await?; + log_request_id(&resp); AppResponse::<()>::from_response(resp).await?.into_error() } @@ -864,6 +883,7 @@ impl Client { .json(¶ms) .send() .await?; + log_request_id(&resp); AppResponse::::from_response(resp) .await? .into_data() @@ -891,6 +911,7 @@ impl Client { .body(data) .send() .await?; + log_request_id(&resp); let record = AppResponse::::from_response(resp) .await? .into_data()?; @@ -937,6 +958,7 @@ impl Client { .body(data.into()) .send() .await?; + log_request_id(&resp); AppResponse::::from_response(resp) .await? .into_data() @@ -951,6 +973,7 @@ impl Client { .await? .send() .await?; + log_request_id(&resp); match resp.status() { reqwest::StatusCode::OK => { @@ -982,6 +1005,7 @@ impl Client { .send() .await?; + log_request_id(&resp); AppResponse::::from_response(resp) .await? .into_data() @@ -993,6 +1017,7 @@ impl Client { .await? .send() .await?; + log_request_id(&resp); AppResponse::<()>::from_response(resp).await?.into_error() } @@ -1006,6 +1031,7 @@ impl Client { .await? .send() .await?; + log_request_id(&resp); AppResponse::::from_response(resp) .await? .into_data() @@ -1021,6 +1047,7 @@ impl Client { .await? .send() .await?; + log_request_id(&resp); AppResponse::::from_response(resp) .await? .into_data() @@ -1067,3 +1094,11 @@ impl WSClientHttpSender for Client { .map_err(|err| WSError::Internal(anyhow::Error::from(err))) } } + +fn log_request_id(resp: &reqwest::Response) { + if let Some(request_id) = resp.headers().get("x-request-id") { + event!(tracing::Level::DEBUG, "request_id: {:?}", request_id); + } else { + event!(tracing::Level::DEBUG, "request_id: not found"); + } +} diff --git a/libs/client-api/src/notify.rs b/libs/client-api/src/notify.rs index d639732b..2f634d0d 100644 --- a/libs/client-api/src/notify.rs +++ b/libs/client-api/src/notify.rs @@ -2,6 +2,7 @@ use anyhow::Error; use gotrue_entity::dto::GotrueTokenResponse; use std::ops::{Deref, DerefMut}; use tokio::sync::broadcast::{channel, Receiver, Sender}; +use tracing::event; pub type TokenStateReceiver = Receiver; @@ -41,6 +42,7 @@ impl ClientToken { /// /// - `token`: The new `AccessTokenResponse` to be set. pub(crate) fn set(&mut self, new_token: GotrueTokenResponse) { + event!(tracing::Level::DEBUG, "Did set token: {:?}", new_token); let is_new = match &self.token { None => true, Some(old_token) => old_token.access_token != new_token.access_token, @@ -62,6 +64,7 @@ impl ClientToken { pub(crate) fn unset(&mut self) { if self.token.is_some() { self.token = None; + event!(tracing::Level::DEBUG, "unset token"); let _ = self.sender.send(TokenState::Invalid); } } diff --git a/libs/shared-entity/Cargo.toml b/libs/shared-entity/Cargo.toml index 1708d8d5..964fd358 100644 --- a/libs/shared-entity/Cargo.toml +++ b/libs/shared-entity/Cargo.toml @@ -24,6 +24,7 @@ validator = { version = "0.16", features = ["validator_derive", "derive"], optio opener = "0.6.1" url = "2.4.1" rust-s3 = "0.33.0" +log = "0.4.20" [features] cloud = ["actix-web", "sqlx", "validator"] diff --git a/libs/shared-entity/src/response.rs b/libs/shared-entity/src/response.rs index adf9c8e3..dd8a1a85 100644 --- a/libs/shared-entity/src/response.rs +++ b/libs/shared-entity/src/response.rs @@ -157,13 +157,6 @@ impl AppResponseError { } } - pub fn with_message(&self, message: impl Into>) -> Self { - Self { - code: self.code, - message: message.into(), - } - } - pub fn is_record_not_found(&self) -> bool { matches!(self.code, ErrorCode::RecordNotFound) } diff --git a/src/biz/user.rs b/src/biz/user.rs index 3d427dad..cdb09311 100644 --- a/src/biz/user.rs +++ b/src/biz/user.rs @@ -41,6 +41,18 @@ pub async fn verify_token( .begin() .await .context("acquire transaction to verify token")?; + + // To prevent concurrent creation of the same user with the same workspace resources, we lock + // the user row when `verify_token` is called. This means that if multiple requests try to + // create the same user simultaneously, the first request will acquire the lock, create the user, + // and any subsequent requests will wait for the lock to be released. After the lock is released, + // the other requests will proceed and return the result, ensuring that each user is created only once + // and avoiding duplicate entries. + let lock_key = user_uuid.as_u128() as i64; + sqlx::query!("SELECT pg_advisory_xact_lock($1)", lock_key) + .execute(txn.deref_mut()) + .await?; + let is_new = !is_user_exist(txn.deref_mut(), &user_uuid).await?; if is_new { let new_uid = id_gen.write().await.next_id(); diff --git a/src/middleware/request_id.rs b/src/middleware/request_id.rs index 373b47da..c882e916 100644 --- a/src/middleware/request_id.rs +++ b/src/middleware/request_id.rs @@ -1,11 +1,13 @@ use actix_http::header::HeaderName; use std::future::{ready, Ready}; -use tracing::{debug, Instrument, Level}; +use tracing::{Instrument, Level}; use actix_service::{forward_ready, Service, Transform}; use actix_web::dev::{ServiceRequest, ServiceResponse}; use futures_util::future::LocalBoxFuture; +use reqwest::header::HeaderValue; +const X_REQUEST_ID: &str = "x-request-id"; pub struct RequestIdMiddleware; impl Transform for RequestIdMiddleware @@ -41,19 +43,45 @@ where forward_ready!(service); - fn call(&self, req: ServiceRequest) -> Self::Future { - let request_id = get_request_id(&req).unwrap_or(uuid::Uuid::new_v4().to_string()); - debug!("generated request id for: {}", req.path()); + fn call(&self, mut req: ServiceRequest) -> Self::Future { + // Skip generate request id for metrics requests + if req.path() == "/metrics" { + let fut = self.service.call(req); + Box::pin(fut) + } else { + let request_id = match get_request_id(&req) { + Some(request_id) => request_id, + None => { + let request_id = uuid::Uuid::new_v4().to_string(); + if let Ok(header_value) = HeaderValue::from_str(&request_id) { + req + .headers_mut() + .insert(HeaderName::from_static(X_REQUEST_ID), header_value); + } + request_id + }, + }; - // Call the next service - let span = tracing::span!(Level::INFO, "request_id", request_id = %request_id); - let res = self.service.call(req); - Box::pin(res.instrument(span)) + let span = tracing::span!(Level::INFO, "request_id", request_id = %request_id); + let fut = self.service.call(req); + + Box::pin(async move { + let mut res = fut.instrument(span).await?; + + // Insert the request id to the response header + if let Ok(header_value) = HeaderValue::from_str(&request_id) { + res + .headers_mut() + .insert(HeaderName::from_static(X_REQUEST_ID), header_value); + } + Ok(res) + }) + } } } pub fn get_request_id(req: &ServiceRequest) -> Option { - match req.headers().get(HeaderName::from_static("x-request-id")) { + match req.headers().get(HeaderName::from_static(X_REQUEST_ID)) { Some(h) => match h.to_str() { Ok(s) => Some(s.to_owned()), Err(e) => {