Merge branch 'main' into stateless

This commit is contained in:
Bartosz Sypytkowski 2024-12-23 05:21:15 +01:00
commit 59e8281010
26 changed files with 372 additions and 396 deletions

View File

@ -11,7 +11,7 @@ env:
SQLX_VERSION: 0.7.1
SQLX_FEATURES: "rustls,postgres"
SQLX_OFFLINE: true
RUST_TOOLCHAIN: "1.80"
RUST_TOOLCHAIN: "1.81"
jobs:
test:

View File

@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "\n select c.workspace_id, c.oid, c.partition_key\n from af_collab c\n join af_workspace w on c.workspace_id = w.workspace_id\n where not coalesce(w.settings['disable_search_indexding']::boolean, false)\n and not exists (\n select 1\n from af_collab_embeddings em\n where em.oid = c.oid and em.partition_key = 0)",
"query": "\n select c.workspace_id, c.oid, c.partition_key\n from af_collab c\n join af_workspace w on c.workspace_id = w.workspace_id\n where not coalesce(w.settings['disable_search_indexding']::boolean, false)\n and not exists (\n select 1 from af_collab_embeddings em\n where em.oid = c.oid and em.partition_key = 0\n )\n ",
"describe": {
"columns": [
{
@ -28,5 +28,5 @@
false
]
},
"hash": "26f293af01281f6f4a99fd69d3a4acc1a556dd3628873fa3ce3e8eaa18ccda1b"
"hash": "ad216288cbbe83aba35b5d04705ee5964f1da4f3839c4725a6784c13f2245379"
}

537
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -90,7 +90,7 @@ async-trait.workspace = true
prometheus-client.workspace = true
itertools = "0.11"
uuid.workspace = true
tokio-tungstenite = { version = "0.20.1", features = ["native-tls"] }
tokio-tungstenite = { version = "0.26.1", features = ["native-tls"] }
dotenvy.workspace = true
url = "2.5.0"
brotli.workspace = true
@ -155,7 +155,7 @@ console-subscriber = { version = "0.4.1", optional = true }
base64.workspace = true
md5.workspace = true
nanoid = "0.4.0"
http = "0.2.12"
http.workspace = true
[dev-dependencies]
flate2 = "1.0"
@ -268,7 +268,7 @@ sqlx = { version = "0.8.1", default-features = false }
dashmap = "5.5.3"
futures = "0.3.30"
async-stream = "0.3.5"
reqwest = "0.11.27"
reqwest = "0.12.9"
lazy_static = "1.4.0"
tonic = "0.12.3"
prost = "0.13.3"
@ -288,6 +288,8 @@ chrono = { version = "0.4.39", features = [
"serde",
"clock",
], default-features = false }
http = "0.2.12"
tokio-tungstenite = "0.20"
# collaboration
yrs = { version = "0.21.3", features = ["sync"] }
@ -317,13 +319,13 @@ lto = false
[patch.crates-io]
# It's diffcult to resovle different version with the same crate used in AppFlowy Frontend and the Client-API crate.
# So using patch to workaround this issue.
collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2443178e4249354c094867875f300cc924cbe0e2" }
collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2443178e4249354c094867875f300cc924cbe0e2" }
collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2443178e4249354c094867875f300cc924cbe0e2" }
collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2443178e4249354c094867875f300cc924cbe0e2" }
collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2443178e4249354c094867875f300cc924cbe0e2" }
collab-database = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2443178e4249354c094867875f300cc924cbe0e2" }
collab-importer = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "2443178e4249354c094867875f300cc924cbe0e2" }
collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c45a2120361f94bbedb787cdd2192a38c94c7f5f" }
collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c45a2120361f94bbedb787cdd2192a38c94c7f5f" }
collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c45a2120361f94bbedb787cdd2192a38c94c7f5f" }
collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c45a2120361f94bbedb787cdd2192a38c94c7f5f" }
collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c45a2120361f94bbedb787cdd2192a38c94c7f5f" }
collab-database = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c45a2120361f94bbedb787cdd2192a38c94c7f5f" }
collab-importer = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c45a2120361f94bbedb787cdd2192a38c94c7f5f" }
[features]
history = []

View File

@ -1,5 +1,5 @@
# Using cargo-chef to manage Rust build cache effectively
FROM lukemathwalker/cargo-chef:latest-rust-1.79 as chef
FROM lukemathwalker/cargo-chef:latest-rust-1.81 as chef
WORKDIR /app
RUN apt update && apt install lld clang -y

View File

@ -26,7 +26,7 @@ redis = { version = "0.25.2", features = [
] }
uuid = { workspace = true, features = ["v4"] }
dotenvy = "0.15"
reqwest = "0.11.27"
reqwest.workspace = true
tower-service = "0.3"
tower-http = { version = "0.5", features = ["fs"] }
tower = "0.4"

View File

@ -1,6 +1,6 @@
# User should build from parent directory
FROM lukemathwalker/cargo-chef:latest-rust-1.78 as chef
FROM lukemathwalker/cargo-chef:latest-rust-1.81 as chef
WORKDIR /app

View File

@ -6,7 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
reqwest = { version = "0.12", features = [
reqwest = { workspace = true, features = [
"json",
"rustls-tls",
"cookies",

View File

@ -30,7 +30,7 @@ chrono = "0.4"
client-websocket = { workspace = true, features = ["native-tls"] }
semver = "1.0.22"
zstd = { version = "0.13.2" }
tokio-tungstenite.workspace = true
collab = { workspace = true, optional = true }
yrs = { workspace = true, optional = true }

View File

@ -7,13 +7,13 @@ use app_error::gotrue::GoTrueError;
use client_websocket::{connect_async, WebSocketStream};
use gotrue::grant::{Grant, RefreshTokenGrant};
use parking_lot::RwLock;
use reqwest::header::HeaderMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Weak};
use std::time::Duration;
use tokio_retry::strategy::FixedInterval;
use tokio_retry::{Action, Condition, RetryIf};
use tokio_tungstenite::tungstenite::http::HeaderMap;
use tracing::{debug, info, trace};
pub(crate) struct RefreshTokenAction {

View File

@ -7,11 +7,13 @@ use std::time::Duration;
use futures_util::stream::{SplitSink, SplitStream};
use futures_util::{SinkExt, StreamExt};
use parking_lot::RwLock;
use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION};
// use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION};
use semver::Version;
use tokio::sync::broadcast::{channel, Receiver, Sender};
use tokio::sync::oneshot;
use tokio::sync::Mutex;
use tokio_tungstenite::tungstenite::http::header::AUTHORIZATION;
use tokio_tungstenite::tungstenite::http::{HeaderMap, HeaderValue};
use tracing::{error, info, trace, warn};
use crate::ping::ServerFixIntervalPing;
@ -493,7 +495,7 @@ impl Display for ConnectInfo {
}
impl From<ConnectInfo> for HeaderMap {
fn from(info: ConnectInfo) -> Self {
fn from(info: ConnectInfo) -> HeaderMap {
let mut headers = HeaderMap::new();
headers.insert(
"device-id",

View File

@ -36,7 +36,7 @@ impl From<Error> for WSError {
Error::Protocol(ProtocolError::SendAfterClosing) => WSError::Close(value.to_string()),
Error::Http(resp) => {
let status = resp.status();
if status == StatusCode::UNAUTHORIZED || status == StatusCode::NOT_FOUND {
if status == StatusCode::UNAUTHORIZED.as_u16() || status == StatusCode::NOT_FOUND.as_u16() {
WSError::AuthError("Unauthorized websocket connection".to_string())
} else {
WSError::TungsteniteError(value)

View File

@ -19,7 +19,6 @@ __rustls-tls = []
[dependencies]
thiserror = "1"
http = "0.2"
httparse = "1.8"
futures-util = { version = "0.3", default-features = false, features = [
"sink",
@ -27,7 +26,7 @@ futures-util = { version = "0.3", default-features = false, features = [
] }
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio-tungstenite = "0.20"
tokio-tungstenite.workspace = true
tokio = { workspace = true, features = ["net"] }
[target.'cfg(target_arch = "wasm32")'.dependencies]

View File

@ -1,6 +1,7 @@
use http::{header::HeaderName, Response};
use std::{io, result, str, string};
use thiserror::Error;
use tokio_tungstenite::tungstenite::http;
/// These error types are copy-pasted from the tokio_tungstenite crate.
pub type Result<T, E = Error> = result::Result<T, E>;

View File

@ -6,12 +6,12 @@ mod native;
mod web;
pub use error::{Error, ProtocolError, Result};
use http::HeaderMap;
pub use message::coding::*;
pub use message::CloseFrame;
pub use message::Message;
#[cfg(not(target_arch = "wasm32"))]
use native as ws;
use tokio_tungstenite::tungstenite::http::HeaderMap;
#[cfg(target_arch = "wasm32")]
use web as ws;
pub use ws::WebSocketStream;

View File

@ -1,8 +1,8 @@
use futures_util::{Sink, Stream, StreamExt};
use http::HeaderMap;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
use tokio_tungstenite::tungstenite::http::HeaderMap;
use tokio_tungstenite::{
tungstenite::{
error::*,

View File

@ -1,7 +1,9 @@
use collab_entity::CollabType;
use futures_util::stream::BoxStream;
use futures_util::StreamExt;
use pgvector::Vector;
use sqlx::postgres::{PgHasArrayType, PgTypeInfo};
use sqlx::{Error, Executor, Postgres, Transaction};
use sqlx::{Error, Executor, PgPool, Postgres, Transaction};
use std::ops::DerefMut;
use uuid::Uuid;
@ -109,35 +111,29 @@ pub async fn upsert_collab_embeddings(
Ok(())
}
pub async fn get_collabs_without_embeddings<'a, E>(
executor: E,
) -> Result<Vec<CollabId>, sqlx::Error>
where
E: Executor<'a, Database = Postgres>,
{
let oids = sqlx::query!(
pub fn get_collabs_without_embeddings(pg_pool: &PgPool) -> BoxStream<sqlx::Result<CollabId>> {
// atm. get only documents
sqlx::query!(
r#"
select c.workspace_id, c.oid, c.partition_key
from af_collab c
join af_workspace w on c.workspace_id = w.workspace_id
where not coalesce(w.settings['disable_search_indexding']::boolean, false)
and not exists (
select 1
from af_collab_embeddings em
where em.oid = c.oid and em.partition_key = 0)"# // atm. get only documents
)
.fetch_all(executor)
.await?;
Ok(
oids
.into_iter()
.map(|r| CollabId {
collab_type: CollabType::from(r.partition_key),
workspace_id: r.workspace_id,
object_id: r.oid,
})
.collect(),
select c.workspace_id, c.oid, c.partition_key
from af_collab c
join af_workspace w on c.workspace_id = w.workspace_id
where not coalesce(w.settings['disable_search_indexding']::boolean, false)
and not exists (
select 1 from af_collab_embeddings em
where em.oid = c.oid and em.partition_key = 0
)
"#
)
.fetch(pg_pool)
.map(|row| {
row.map(|r| CollabId {
collab_type: CollabType::from(r.partition_key),
workspace_id: r.workspace_id,
object_id: r.oid,
})
})
.boxed()
}
#[derive(Debug, Clone)]

View File

@ -1,2 +1,2 @@
[toolchain]
channel = "1.80.0"
channel = "1.81.0"

View File

@ -3,7 +3,7 @@
# Generate the current dependency list
cargo tree > current_deps.txt
BASELINE_COUNT=722
BASELINE_COUNT=747
CURRENT_COUNT=$(cat current_deps.txt | wc -l)
echo "Expected dependency count (baseline): $BASELINE_COUNT"

View File

@ -1,5 +1,5 @@
# Using cargo-chef to manage Rust build cache effectively
FROM lukemathwalker/cargo-chef:latest-rust-1.78 as chef
FROM lukemathwalker/cargo-chef:latest-rust-1.81 as chef
WORKDIR /app
RUN apt update && apt install lld clang -y

View File

@ -4,11 +4,9 @@ use crate::indexer::vector::embedder::Embedder;
use crate::indexer::vector::open_ai;
use crate::indexer::{Indexer, IndexerProvider};
use crate::thread_pool_no_abort::{ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
use actix::dev::Stream;
use anyhow::anyhow;
use app_error::AppError;
use appflowy_ai_client::dto::{EmbeddingRequest, OpenAIEmbeddingResponse};
use async_stream::try_stream;
use bytes::Bytes;
use collab::core::collab::DataSource;
use collab::core::origin::CollabOrigin;
@ -20,10 +18,10 @@ use database::collab::{CollabStorage, GetCollabOrigin};
use database::index::{get_collabs_without_embeddings, upsert_collab_embeddings};
use database::workspace::select_workspace_settings;
use database_entity::dto::{AFCollabEmbeddedChunk, CollabParams};
use futures_util::stream::BoxStream;
use futures_util::StreamExt;
use rayon::prelude::*;
use sqlx::PgPool;
use std::pin::Pin;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
@ -34,6 +32,7 @@ use uuid::Uuid;
pub struct IndexerScheduler {
indexer_provider: Arc<IndexerProvider>,
pg_pool: PgPool,
#[allow(dead_code)]
storage: Arc<dyn CollabStorage>,
threads: Arc<ThreadPoolNoAbort>,
#[allow(dead_code)]
@ -108,7 +107,7 @@ impl IndexerScheduler {
this.pg_pool.clone(),
this.metrics.clone(),
));
tokio::spawn(handle_unindexed_collabs(this.clone()));
// tokio::spawn(handle_unindexed_collabs(this.clone()));
}
this
@ -395,12 +394,13 @@ fn should_embed(
should_embed
}
#[allow(dead_code)]
async fn handle_unindexed_collabs(scheduler: Arc<IndexerScheduler>) {
// wait for 30 seconds before starting indexing
tokio::time::sleep(tokio::time::Duration::from_secs(30)).await;
let mut i = 0;
let mut stream = get_unindexed_collabs(&scheduler.pg_pool, scheduler.storage.clone());
let mut stream = get_unindexed_collabs(&scheduler.pg_pool, scheduler.storage.clone()).await;
let record_tx = scheduler.schedule_tx.clone();
let start = Instant::now();
while let Some(result) = stream.next().await {
@ -439,41 +439,46 @@ async fn handle_unindexed_collabs(scheduler: Arc<IndexerScheduler>) {
)
}
fn get_unindexed_collabs(
pub async fn get_unindexed_collabs(
pg_pool: &PgPool,
storage: Arc<dyn CollabStorage>,
) -> Pin<Box<dyn Stream<Item = Result<UnindexedCollab, anyhow::Error>> + Send>> {
let db = pg_pool.clone();
Box::pin(try_stream! {
let collabs = get_collabs_without_embeddings(&db).await?;
if !collabs.is_empty() {
info!("found {} unindexed collabs", collabs.len());
}
for cid in collabs {
match &cid.collab_type {
CollabType::Document => {
let collab = storage
.get_encode_collab(GetCollabOrigin::Server, cid.clone().into(), false)
.await?;
) -> BoxStream<Result<UnindexedCollab, anyhow::Error>> {
let cloned_storage = storage.clone();
get_collabs_without_embeddings(pg_pool)
.map(move |result| {
let storage = cloned_storage.clone();
async move {
match result {
Ok(cid) => match cid.collab_type {
CollabType::Document => {
let collab = storage
.get_encode_collab(GetCollabOrigin::Server, cid.clone().into(), false)
.await?;
yield UnindexedCollab {
workspace_id: cid.workspace_id,
object_id: cid.object_id,
collab_type: cid.collab_type,
collab,
};
},
CollabType::Database
| CollabType::WorkspaceDatabase
| CollabType::Folder
| CollabType::DatabaseRow
| CollabType::UserAwareness
| CollabType::Unknown => { /* atm. only document types are supported */ },
Ok(Some(UnindexedCollab {
workspace_id: cid.workspace_id,
object_id: cid.object_id,
collab_type: cid.collab_type,
collab,
}))
},
_ => Ok::<_, anyhow::Error>(None),
},
Err(e) => Err(e.into()),
}
}
}
})
})
.filter_map(|future| async {
match future.await {
Ok(Some(unindexed_collab)) => Some(Ok(unindexed_collab)),
Ok(None) => None,
Err(e) => Some(Err(e)),
}
})
.boxed()
}
#[allow(dead_code)]
async fn index_unindexd_collab(
embedder: Embedder,
indexer_provider: &Arc<IndexerProvider>,

View File

@ -1,4 +1,4 @@
FROM lukemathwalker/cargo-chef:latest-rust-1.77 as chef
FROM lukemathwalker/cargo-chef:latest-rust-1.81 as chef
# Set the initial working directory
WORKDIR /app

View File

@ -61,5 +61,5 @@ mailer.workspace = true
md5.workspace = true
base64.workspace = true
prometheus-client = "0.22.3"
reqwest = "0.12.5"
reqwest.workspace = true
zstd.workspace = true

View File

@ -1,4 +1,4 @@
FROM lukemathwalker/cargo-chef:latest-rust-1.78 as chef
FROM lukemathwalker/cargo-chef:latest-rust-1.81 as chef
# Set the initial working directory
WORKDIR /app

View File

@ -1,4 +1,5 @@
use actix_multipart::form::{bytes::Bytes as MPBytes, MultipartForm};
use actix_web::http::StatusCode;
use actix_web::{
web::{self, Data, Json},
HttpResponse, Result, Scope,
@ -11,7 +12,6 @@ use database_entity::dto::{
TemplateCreators, TemplateHomePage, TemplateHomePageQueryParams, TemplateWithPublishInfo,
Templates, UpdateTemplateCategoryParams, UpdateTemplateCreatorParams, UpdateTemplateParams,
};
use reqwest::StatusCode;
use shared_entity::response::{AppResponse, JsonAppResponse};
use uuid::Uuid;

View File

@ -836,25 +836,23 @@ async fn batch_create_collab_handler(
start.elapsed()
);
if state
.indexer_scheduler
.can_index_workspace(&workspace_id)
.await?
{
let indexed_collabs: Vec<_> = collab_params_list
.iter()
.filter(|p| state.indexer_scheduler.is_indexing_enabled(&p.collab_type))
.map(IndexedCollab::from)
.collect();
if !indexed_collabs.is_empty() {
let len = indexed_collabs.len();
state
.indexer_scheduler
.index_encoded_collabs(&workspace_id, indexed_collabs)?;
tracing::info!("scheduled indexing for {} collabs", len);
}
}
// if state
// .indexer_scheduler
// .can_index_workspace(&workspace_id)
// .await?
// {
// let indexed_collabs: Vec<_> = collab_params_list
// .iter()
// .filter(|p| state.indexer_scheduler.is_indexing_enabled(&p.collab_type))
// .map(IndexedCollab::from)
// .collect();
//
// if !indexed_collabs.is_empty() {
// state
// .indexer_scheduler
// .index_encoded_collabs(&workspace_id, indexed_collabs)?;
// }
// }
let start = Instant::now();
state