feat: return request id to client api (#186)

* docs: update

* chore: add logs

* chore: test

* feat: return request id to the client

* feat: print request id in client api

* fix: lock table when creating new user
This commit is contained in:
Nathan.fooo 2023-11-29 15:40:52 -08:00 committed by GitHub
parent 499d2ac827
commit 62abd8abbf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 138 additions and 27 deletions

View File

@ -0,0 +1,22 @@
{
"db_name": "PostgreSQL",
"query": "SELECT pg_advisory_xact_lock($1)",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "pg_advisory_xact_lock",
"type_info": "Void"
}
],
"parameters": {
"Left": [
"Int8"
]
},
"nullable": [
null
]
},
"hash": "a06e1d9f6f95e4c4c2b98310ebddcc9d963cc033582bf2e945e8bf3a301b4247"
}

1
Cargo.lock generated
View File

@ -4310,6 +4310,7 @@ dependencies = [
"collab-entity",
"database-entity",
"gotrue-entity",
"log",
"opener",
"reqwest",
"rust-s3",

View File

@ -79,7 +79,23 @@ pub async fn admin_create_sso_handler(
Ok(WebApiResponse::<()>::from_str("SSO Added".into()))
}
// provide a link which when open in browser, opens the appflowy app
/// Generates a URL to facilitate login redirection to the AppFlowy app from a web browser.
///
/// This function creates a custom URL scheme that can be used in a web browser to open the
/// AppFlowy app and automatically handle user login based on the provided `UserSession`.
///
/// # Returns
/// A `Result` containing `HeaderMap` for HTTP redirection if successful, or `WebApiError` in case of failure.
///
/// # Example URL Format
/// `appflowy-flutter://login-callback#access_token=...&expires_at=...&expires_in=...&refresh_token=...&token_type=...`
///
/// The URL includes access token information and other relevant session details.
///
/// # Usage
/// The client application should implement handling for this URL format, typically through the
/// `sign_in_with_url` method in the `client-api` crate. See [client_api::Client::sign_in_with_url] for more details.
///
pub async fn open_app_handler(session: UserSession) -> Result<HeaderMap, WebApiError<'static>> {
let app_sign_in_url = format!(
"appflowy-flutter://login-callback#access_token={}&expires_at={}&expires_in={}&refresh_token={}&token_type={}",

View File

@ -144,19 +144,19 @@ impl AppError {
}
impl From<reqwest::Error> for AppError {
fn from(value: reqwest::Error) -> Self {
if value.is_connect() {
return AppError::Connect(value.to_string());
fn from(error: reqwest::Error) -> Self {
if error.is_connect() {
return AppError::Connect(error.to_string());
}
if value.is_timeout() {
return AppError::RequestTimeout(value.to_string());
if error.is_timeout() {
return AppError::RequestTimeout(error.to_string());
}
if value.is_request() {
return AppError::InvalidRequest(value.to_string());
if error.is_request() {
return AppError::InvalidRequest(error.to_string());
}
AppError::Unhandled(value.to_string())
AppError::Unhandled(error.to_string())
}
}

View File

@ -22,6 +22,7 @@ use mime::Mime;
use parking_lot::RwLock;
use realtime_entity::EncodedCollabV1;
use reqwest::header;
use reqwest::Method;
use reqwest::RequestBuilder;
use shared_entity::dto::auth_dto::SignInTokenResponse;
@ -143,6 +144,7 @@ impl Client {
/// It looks like, e.g., `appflowy-flutter://#access_token=...&expires_in=3600&provider_token=...&refresh_token=...&token_type=bearer`.
///
/// return a bool indicating if the user is new
#[instrument(level = "debug", skip_all, err)]
pub async fn sign_in_with_url(&self, url: &str) -> Result<bool, AppResponseError> {
let mut access_token: Option<String> = None;
let mut token_type: Option<String> = None;
@ -386,6 +388,7 @@ impl Client {
.body(msg)
.send()
.await?;
log_request_id(&resp);
AppResponse::<()>::from_response(resp).await?.into_error()
}
@ -449,6 +452,7 @@ impl Client {
.await?
.send()
.await?;
log_request_id(&resp);
AppResponse::<AFUserProfile>::from_response(resp)
.await?
.into_data()
@ -462,6 +466,7 @@ impl Client {
.await?
.send()
.await?;
log_request_id(&resp);
AppResponse::<AFUserWorkspaceInfo>::from_response(resp)
.await?
.into_data()
@ -475,6 +480,7 @@ impl Client {
.await?
.send()
.await?;
log_request_id(&resp);
AppResponse::<AFWorkspaces>::from_response(resp)
.await?
.into_data()
@ -488,6 +494,7 @@ impl Client {
.await?
.send()
.await?;
log_request_id(&resp);
AppResponse::<AFWorkspace>::from_response(resp)
.await?
.into_data()
@ -508,6 +515,7 @@ impl Client {
.await?
.send()
.await?;
log_request_id(&resp);
AppResponse::<Vec<AFWorkspaceMember>>::from_response(resp)
.await?
.into_data()
@ -531,6 +539,7 @@ impl Client {
.json(&members)
.send()
.await?;
log_request_id(&resp);
AppResponse::<()>::from_response(resp).await?.into_error()?;
Ok(())
}
@ -552,6 +561,7 @@ impl Client {
.json(&changeset)
.send()
.await?;
log_request_id(&resp);
AppResponse::<()>::from_response(resp).await?.into_error()?;
Ok(())
}
@ -574,12 +584,11 @@ impl Client {
.json(&payload)
.send()
.await?;
log_request_id(&resp);
AppResponse::<()>::from_response(resp).await?.into_error()?;
Ok(())
}
// pub async fn update_workspace_member(&self, workspace_uuid: Uuid, member)
#[instrument(skip_all, err)]
pub async fn sign_in_password(
&self,
@ -686,6 +695,7 @@ impl Client {
.json(&params)
.send()
.await?;
log_request_id(&resp);
AppResponse::<()>::from_response(resp).await?.into_error()
}
@ -701,6 +711,7 @@ impl Client {
.json(&params)
.send()
.await?;
log_request_id(&resp);
AppResponse::<()>::from_response(resp).await?.into_error()
}
@ -716,6 +727,7 @@ impl Client {
.json(&params)
.send()
.await?;
log_request_id(&resp);
AppResponse::<()>::from_response(resp).await?.into_error()
}
@ -734,6 +746,7 @@ impl Client {
.json(&params)
.send()
.await?;
log_request_id(&resp);
AppResponse::<EncodedCollabV1>::from_response(resp)
.await?
.into_data()
@ -755,6 +768,7 @@ impl Client {
.json(&params)
.send()
.await?;
log_request_id(&resp);
AppResponse::<BatchQueryCollabResult>::from_response(resp)
.await?
.into_data()
@ -772,6 +786,7 @@ impl Client {
.json(&params)
.send()
.await?;
log_request_id(&resp);
AppResponse::<()>::from_response(resp).await?.into_error()
}
@ -790,6 +805,7 @@ impl Client {
.json(&params)
.send()
.await?;
log_request_id(&resp);
AppResponse::<()>::from_response(resp).await?.into_error()
}
@ -808,6 +824,7 @@ impl Client {
.json(&params)
.send()
.await?;
log_request_id(&resp);
AppResponse::<AFCollabMember>::from_response(resp)
.await?
.into_data()
@ -828,6 +845,7 @@ impl Client {
.json(&params)
.send()
.await?;
log_request_id(&resp);
AppResponse::<()>::from_response(resp).await?.into_error()
}
@ -846,6 +864,7 @@ impl Client {
.json(&params)
.send()
.await?;
log_request_id(&resp);
AppResponse::<()>::from_response(resp).await?.into_error()
}
@ -864,6 +883,7 @@ impl Client {
.json(&params)
.send()
.await?;
log_request_id(&resp);
AppResponse::<AFCollabMembers>::from_response(resp)
.await?
.into_data()
@ -891,6 +911,7 @@ impl Client {
.body(data)
.send()
.await?;
log_request_id(&resp);
let record = AppResponse::<AFBlobRecord>::from_response(resp)
.await?
.into_data()?;
@ -937,6 +958,7 @@ impl Client {
.body(data.into())
.send()
.await?;
log_request_id(&resp);
AppResponse::<AFBlobRecord>::from_response(resp)
.await?
.into_data()
@ -951,6 +973,7 @@ impl Client {
.await?
.send()
.await?;
log_request_id(&resp);
match resp.status() {
reqwest::StatusCode::OK => {
@ -982,6 +1005,7 @@ impl Client {
.send()
.await?;
log_request_id(&resp);
AppResponse::<AFBlobMetadata>::from_response(resp)
.await?
.into_data()
@ -993,6 +1017,7 @@ impl Client {
.await?
.send()
.await?;
log_request_id(&resp);
AppResponse::<()>::from_response(resp).await?.into_error()
}
@ -1006,6 +1031,7 @@ impl Client {
.await?
.send()
.await?;
log_request_id(&resp);
AppResponse::<WorkspaceSpaceUsage>::from_response(resp)
.await?
.into_data()
@ -1021,6 +1047,7 @@ impl Client {
.await?
.send()
.await?;
log_request_id(&resp);
AppResponse::<WorkspaceBlobMetadata>::from_response(resp)
.await?
.into_data()
@ -1067,3 +1094,11 @@ impl WSClientHttpSender for Client {
.map_err(|err| WSError::Internal(anyhow::Error::from(err)))
}
}
fn log_request_id(resp: &reqwest::Response) {
if let Some(request_id) = resp.headers().get("x-request-id") {
event!(tracing::Level::DEBUG, "request_id: {:?}", request_id);
} else {
event!(tracing::Level::DEBUG, "request_id: not found");
}
}

View File

@ -2,6 +2,7 @@ use anyhow::Error;
use gotrue_entity::dto::GotrueTokenResponse;
use std::ops::{Deref, DerefMut};
use tokio::sync::broadcast::{channel, Receiver, Sender};
use tracing::event;
pub type TokenStateReceiver = Receiver<TokenState>;
@ -41,6 +42,7 @@ impl ClientToken {
///
/// - `token`: The new `AccessTokenResponse` to be set.
pub(crate) fn set(&mut self, new_token: GotrueTokenResponse) {
event!(tracing::Level::DEBUG, "Did set token: {:?}", new_token);
let is_new = match &self.token {
None => true,
Some(old_token) => old_token.access_token != new_token.access_token,
@ -62,6 +64,7 @@ impl ClientToken {
pub(crate) fn unset(&mut self) {
if self.token.is_some() {
self.token = None;
event!(tracing::Level::DEBUG, "unset token");
let _ = self.sender.send(TokenState::Invalid);
}
}

View File

@ -24,6 +24,7 @@ validator = { version = "0.16", features = ["validator_derive", "derive"], optio
opener = "0.6.1"
url = "2.4.1"
rust-s3 = "0.33.0"
log = "0.4.20"
[features]
cloud = ["actix-web", "sqlx", "validator"]

View File

@ -157,13 +157,6 @@ impl AppResponseError {
}
}
pub fn with_message(&self, message: impl Into<Cow<'static, str>>) -> Self {
Self {
code: self.code,
message: message.into(),
}
}
pub fn is_record_not_found(&self) -> bool {
matches!(self.code, ErrorCode::RecordNotFound)
}

View File

@ -41,6 +41,18 @@ pub async fn verify_token(
.begin()
.await
.context("acquire transaction to verify token")?;
// To prevent concurrent creation of the same user with the same workspace resources, we lock
// the user row when `verify_token` is called. This means that if multiple requests try to
// create the same user simultaneously, the first request will acquire the lock, create the user,
// and any subsequent requests will wait for the lock to be released. After the lock is released,
// the other requests will proceed and return the result, ensuring that each user is created only once
// and avoiding duplicate entries.
let lock_key = user_uuid.as_u128() as i64;
sqlx::query!("SELECT pg_advisory_xact_lock($1)", lock_key)
.execute(txn.deref_mut())
.await?;
let is_new = !is_user_exist(txn.deref_mut(), &user_uuid).await?;
if is_new {
let new_uid = id_gen.write().await.next_id();

View File

@ -1,11 +1,13 @@
use actix_http::header::HeaderName;
use std::future::{ready, Ready};
use tracing::{debug, Instrument, Level};
use tracing::{Instrument, Level};
use actix_service::{forward_ready, Service, Transform};
use actix_web::dev::{ServiceRequest, ServiceResponse};
use futures_util::future::LocalBoxFuture;
use reqwest::header::HeaderValue;
const X_REQUEST_ID: &str = "x-request-id";
pub struct RequestIdMiddleware;
impl<S, B> Transform<S, ServiceRequest> for RequestIdMiddleware
@ -41,19 +43,45 @@ where
forward_ready!(service);
fn call(&self, req: ServiceRequest) -> Self::Future {
let request_id = get_request_id(&req).unwrap_or(uuid::Uuid::new_v4().to_string());
debug!("generated request id for: {}", req.path());
fn call(&self, mut req: ServiceRequest) -> Self::Future {
// Skip generate request id for metrics requests
if req.path() == "/metrics" {
let fut = self.service.call(req);
Box::pin(fut)
} else {
let request_id = match get_request_id(&req) {
Some(request_id) => request_id,
None => {
let request_id = uuid::Uuid::new_v4().to_string();
if let Ok(header_value) = HeaderValue::from_str(&request_id) {
req
.headers_mut()
.insert(HeaderName::from_static(X_REQUEST_ID), header_value);
}
request_id
},
};
// Call the next service
let span = tracing::span!(Level::INFO, "request_id", request_id = %request_id);
let res = self.service.call(req);
Box::pin(res.instrument(span))
let span = tracing::span!(Level::INFO, "request_id", request_id = %request_id);
let fut = self.service.call(req);
Box::pin(async move {
let mut res = fut.instrument(span).await?;
// Insert the request id to the response header
if let Ok(header_value) = HeaderValue::from_str(&request_id) {
res
.headers_mut()
.insert(HeaderName::from_static(X_REQUEST_ID), header_value);
}
Ok(res)
})
}
}
}
pub fn get_request_id(req: &ServiceRequest) -> Option<String> {
match req.headers().get(HeaderName::from_static("x-request-id")) {
match req.headers().get(HeaderName::from_static(X_REQUEST_ID)) {
Some(h) => match h.to_str() {
Ok(s) => Some(s.to_owned()),
Err(e) => {