Merge branch 'main' into gotrue-2.159.0

This commit is contained in:
Zack Fu Zi Xiang 2024-08-27 08:58:25 +08:00
commit 44a8bf35c2
No known key found for this signature in database
13 changed files with 92 additions and 43 deletions

10
Cargo.lock generated
View File

@ -2080,7 +2080,7 @@ dependencies = [
[[package]]
name = "collab"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=6f98f054a8306a8791b65bf1825091ab3fe60166#6f98f054a8306a8791b65bf1825091ab3fe60166"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=ec4f92941942f20c84dce142cd2eeafd44ca7362#ec4f92941942f20c84dce142cd2eeafd44ca7362"
dependencies = [
"anyhow",
"arc-swap",
@ -2105,7 +2105,7 @@ dependencies = [
[[package]]
name = "collab-document"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=6f98f054a8306a8791b65bf1825091ab3fe60166#6f98f054a8306a8791b65bf1825091ab3fe60166"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=ec4f92941942f20c84dce142cd2eeafd44ca7362#ec4f92941942f20c84dce142cd2eeafd44ca7362"
dependencies = [
"anyhow",
"arc-swap",
@ -2125,7 +2125,7 @@ dependencies = [
[[package]]
name = "collab-entity"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=6f98f054a8306a8791b65bf1825091ab3fe60166#6f98f054a8306a8791b65bf1825091ab3fe60166"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=ec4f92941942f20c84dce142cd2eeafd44ca7362#ec4f92941942f20c84dce142cd2eeafd44ca7362"
dependencies = [
"anyhow",
"bytes",
@ -2144,7 +2144,7 @@ dependencies = [
[[package]]
name = "collab-folder"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=6f98f054a8306a8791b65bf1825091ab3fe60166#6f98f054a8306a8791b65bf1825091ab3fe60166"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=ec4f92941942f20c84dce142cd2eeafd44ca7362#ec4f92941942f20c84dce142cd2eeafd44ca7362"
dependencies = [
"anyhow",
"arc-swap",
@ -2228,7 +2228,7 @@ dependencies = [
[[package]]
name = "collab-user"
version = "0.2.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=6f98f054a8306a8791b65bf1825091ab3fe60166#6f98f054a8306a8791b65bf1825091ab3fe60166"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=ec4f92941942f20c84dce142cd2eeafd44ca7362#ec4f92941942f20c84dce142cd2eeafd44ca7362"
dependencies = [
"anyhow",
"collab",

View File

@ -283,11 +283,11 @@ debug = true
[patch.crates-io]
# It's diffcult to resovle different version with the same crate used in AppFlowy Frontend and the Client-API crate.
# So using patch to workaround this issue.
collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "6f98f054a8306a8791b65bf1825091ab3fe60166" }
collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "6f98f054a8306a8791b65bf1825091ab3fe60166" }
collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "6f98f054a8306a8791b65bf1825091ab3fe60166" }
collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "6f98f054a8306a8791b65bf1825091ab3fe60166" }
collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "6f98f054a8306a8791b65bf1825091ab3fe60166" }
collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "ec4f92941942f20c84dce142cd2eeafd44ca7362" }
collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "ec4f92941942f20c84dce142cd2eeafd44ca7362" }
collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "ec4f92941942f20c84dce142cd2eeafd44ca7362" }
collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "ec4f92941942f20c84dce142cd2eeafd44ca7362" }
collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "ec4f92941942f20c84dce142cd2eeafd44ca7362" }
[features]
history = []

View File

@ -316,6 +316,7 @@ pub enum ErrorCode {
#[cfg(feature = "sqlx_error")]
SqlxArgEncodingError = 1035,
InvalidContentType = 1036,
SingleUploadLimitExceeded = 1037,
}
impl ErrorCode {

View File

@ -25,10 +25,11 @@ use std::future::Future;
use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC};
use std::pin::Pin;
use std::sync::atomic::Ordering;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio_retry::strategy::{ExponentialBackoff, FixedInterval};
use tokio_retry::{Retry, RetryIf};
use tokio_retry::{Condition, RetryIf};
use tracing::{event, info, instrument, trace};
pub use infra::file_util::ChunkedBytes;
@ -138,7 +139,7 @@ impl Client {
// 2 seconds, 4 seconds, 8 seconds
let retry_strategy = ExponentialBackoff::from_millis(2).factor(1000).take(3);
let action = GetCollabAction::new(self.clone(), params);
Retry::spawn(retry_strategy, action).await
RetryIf::spawn(retry_strategy, action, RetryGetCollabCondition).await
}
#[instrument(level = "debug", skip_all, err)]
@ -391,3 +392,10 @@ where
Ok(Bytes::from(chunk))
}
struct RetryGetCollabCondition;
impl Condition<AppResponseError> for RetryGetCollabCondition {
fn should_retry(&mut self, error: &AppResponseError) -> bool {
!error.is_record_not_found()
}
}

View File

@ -18,6 +18,7 @@ use tokio_stream::StreamExt;
use tracing::{debug, error, event, instrument, trace};
use app_error::AppError;
use authentication::jwt::{authorization_from_token, UserUuid};
use collab_rt_entity::user::{AFUserChange, RealtimeUser, UserMessage};
use collab_rt_entity::{HttpRealtimeMessage, RealtimeMessage};
use shared_entity::response::{AppResponse, AppResponseError};
@ -31,7 +32,6 @@ use crate::compression::{
decompress, CompressionType, X_COMPRESSION_BUFFER_SIZE, X_COMPRESSION_TYPE,
};
use crate::state::AppState;
use authentication::jwt::{authorization_from_token, UserUuid};
pub fn ws_scope() -> Scope {
web::scope("/ws").service(web::resource("/v1").route(web::get().to(establish_ws_connection_v1)))
@ -76,6 +76,10 @@ pub async fn establish_ws_connection_v1(
},
};
if client_version < state.config.websocket.min_client_version {
return Err(AppError::Connect("Client version is too low".to_string()).into());
}
start_connect(
&request,
payload,

View File

@ -4,7 +4,7 @@ use std::time::Duration;
use anyhow::anyhow;
use collab::entity::EncodedCollab;
use collab_entity::CollabType;
use sqlx::{PgPool, Transaction};
use sqlx::{Error, PgPool, Transaction};
use tokio::time::sleep;
use tracing::{event, instrument, Level};
use uuid::Uuid;
@ -100,19 +100,21 @@ impl CollabDiskCache {
.await?;
},
Err(e) => {
// Handle non-retryable errors immediately
if matches!(e, sqlx::Error::RowNotFound) {
let msg = format!("Can't find the row for query: {:?}", query);
return Err(AppError::RecordNotFound(msg));
}
// Increment attempts and retry if below MAX_ATTEMPTS and the error is retryable
if attempts < MAX_ATTEMPTS - 1 && matches!(e, sqlx::Error::PoolTimedOut) {
attempts += 1;
sleep(Duration::from_millis(500 * attempts as u64)).await;
continue;
} else {
return Err(e.into());
match e {
Error::RowNotFound => {
let msg = format!("Can't find the row for query: {:?}", query);
return Err(AppError::RecordNotFound(msg));
},
_ => {
// Increment attempts and retry if below MAX_ATTEMPTS and the error is retryable
if attempts < MAX_ATTEMPTS - 1 && matches!(e, sqlx::Error::PoolTimedOut) {
attempts += 1;
sleep(Duration::from_millis(500 * attempts as u64)).await;
continue;
} else {
return Err(e.into());
}
},
}
},
}

View File

@ -3,6 +3,7 @@ use std::str::FromStr;
use anyhow::Context;
use secrecy::Secret;
use semver::Version;
use serde::Deserialize;
use sqlx::postgres::{PgConnectOptions, PgSslMode};
@ -70,6 +71,7 @@ impl AISettings {
pub struct WebsocketSetting {
pub heartbeat_interval: u8,
pub client_timeout: u8,
pub min_client_version: Version,
}
#[derive(Clone, Debug)]
@ -140,6 +142,7 @@ pub fn get_configuration() -> Result<Config, anyhow::Error> {
websocket: WebsocketSetting {
heartbeat_interval: get_env_var("APPFLOWY_WEBSOCKET_HEARTBEAT_INTERVAL", "6").parse()?,
client_timeout: get_env_var("APPFLOWY_WEBSOCKET_CLIENT_TIMEOUT", "60").parse()?,
min_client_version: get_env_var("APPFLOWY_WEBSOCKET_CLIENT_MIN_VERSION", "0.5.0").parse()?,
},
db_settings: DatabaseSetting {
pg_conn_opts: PgConnectOptions::from_str(&get_env_var(

View File

@ -8,7 +8,7 @@ use tracing::{instrument, trace, warn};
use access_control::collab::RealtimeAccessControl;
use collab_rt_entity::user::RealtimeUser;
use collab_rt_entity::{ClientCollabMessage, ServerCollabMessage, SinkMessage};
use collab_rt_entity::{AckCode, ClientCollabMessage, ServerCollabMessage, SinkMessage};
use collab_rt_entity::{CollabAck, RealtimeMessage};
use database::collab::CollabStorage;
@ -155,7 +155,11 @@ where
let origin = first_message.origin().clone();
let msg_id = first_message.msg_id();
let object_id = first_message.object_id().to_string();
let ack = CollabAck::new(origin, object_id, msg_id, 0);
// when the group with given id is not found and the the first message is not init sync.
// Return AckCode::CannotApplyUpdate to the client and then client will send the init sync message.
let ack =
CollabAck::new(origin, object_id, msg_id, 0).with_code(AckCode::CannotApplyUpdate);
entry
.value()
.send_message(ServerCollabMessage::ClientAck(ack).into())

View File

@ -11,7 +11,7 @@ use collab_entity::CollabType;
use dashmap::DashMap;
use futures_util::{SinkExt, StreamExt};
use tokio::sync::{mpsc, RwLock};
use tracing::{error, event, trace};
use tracing::{error, event, info, trace};
use yrs::updates::decoder::Decode;
use yrs::updates::encoder::Encode;
use yrs::Update;
@ -200,7 +200,26 @@ impl CollabGroup {
modified_at.elapsed().as_secs() > 60 * 60
} else {
let elapsed_secs = modified_at.elapsed().as_secs();
elapsed_secs > self.timeout_secs() && self.subscribers.is_empty()
if elapsed_secs > self.timeout_secs() {
// Mark the group as inactive if it has been inactive for more than 3 hours, regardless of the number of subscribers.
// Otherwise, return `true` only if there are no subscribers remaining in the group.
// If a client modifies a group that has already been marked as inactive (removed),
// the client will automatically send an initialization sync to reinitialize the group.
const MAXIMUM_SECS: u64 = 3 * 60 * 60;
if elapsed_secs > MAXIMUM_SECS {
info!(
"Group:{} is inactive for {} seconds, subscribers: {}",
self.object_id,
modified_at.elapsed().as_secs(),
self.subscribers.len()
);
true
} else {
self.subscribers.is_empty()
}
} else {
false
}
}
}

View File

@ -64,7 +64,7 @@ impl IndexerProvider {
fn get_unindexed_collabs(
&self,
) -> Pin<Box<dyn Stream<Item = Result<UnindexedCollab, anyhow::Error>>>> {
) -> Pin<Box<dyn Stream<Item = Result<UnindexedCollab, anyhow::Error>> + Send>> {
let db = self.db.clone();
Box::pin(try_stream! {
let collabs = get_collabs_without_embeddings(&db).await?;

View File

@ -1,11 +1,10 @@
use anyhow::Result;
use dashmap::mapref::entry::Entry;
use dashmap::DashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Weak};
use std::time::Duration;
use anyhow::Result;
use dashmap::mapref::entry::Entry;
use dashmap::DashMap;
use tokio::sync::Notify;
use tokio::time::interval;
use tracing::{error, info, trace};
@ -254,7 +253,7 @@ where
}
fn spawn_handle_unindexed_collabs(indexer_provider: Arc<IndexerProvider>) {
tokio::task::spawn_local(IndexerProvider::handle_unindexed_collabs(indexer_provider));
tokio::spawn(IndexerProvider::handle_unindexed_collabs(indexer_provider));
}
fn spawn_period_check_inactive_group<S, AC>(

View File

@ -26,7 +26,7 @@ use crate::state::AppState;
pub fn ws_scope() -> Scope {
web::scope("/ws")
.service(establish_ws_connection)
//.service(establish_ws_connection)
.service(web::resource("/v1").route(web::get().to(establish_ws_connection_v1)))
}
const MAX_FRAME_SIZE: usize = 65_536; // 64 KiB
@ -86,6 +86,10 @@ pub async fn establish_ws_connection_v1(
},
};
if client_version < state.config.websocket.min_client_version {
return Err(AppError::Connect("Client version is too low".to_string()).into());
}
start_connect(
&request,
payload,

View File

@ -1,11 +1,14 @@
use anyhow::Context;
use infra::env_util::get_env_var;
use secrecy::{ExposeSecret, Secret};
use serde::Deserialize;
use sqlx::postgres::{PgConnectOptions, PgSslMode};
use std::fmt::Display;
use std::str::FromStr;
use anyhow::Context;
use secrecy::{ExposeSecret, Secret};
use semver::Version;
use serde::Deserialize;
use sqlx::postgres::{PgConnectOptions, PgSslMode};
use infra::env_util::get_env_var;
#[derive(Clone, Debug)]
pub struct Config {
pub app_env: Environment,
@ -166,6 +169,7 @@ pub fn get_configuration() -> Result<Config, anyhow::Error> {
websocket: WebsocketSetting {
heartbeat_interval: get_env_var("APPFLOWY_WEBSOCKET_HEARTBEAT_INTERVAL", "6").parse()?,
client_timeout: get_env_var("APPFLOWY_WEBSOCKET_CLIENT_TIMEOUT", "60").parse()?,
min_client_version: get_env_var("APPFLOWY_WEBSOCKET_CLIENT_MIN_VERSION", "0.5.0").parse()?,
},
redis_uri: get_env_var("APPFLOWY_REDIS_URI", "redis://localhost:6379").into(),
s3: S3Setting {
@ -239,4 +243,5 @@ impl FromStr for Environment {
pub struct WebsocketSetting {
pub heartbeat_interval: u8,
pub client_timeout: u8,
pub min_client_version: Version,
}