chore: add metrics to redis collab stream
This commit is contained in:
parent
6290214b1d
commit
cc6b58f73d
|
|
@ -2365,6 +2365,7 @@ dependencies = [
|
||||||
"collab-entity",
|
"collab-entity",
|
||||||
"futures",
|
"futures",
|
||||||
"loole",
|
"loole",
|
||||||
|
"prometheus-client",
|
||||||
"prost 0.13.3",
|
"prost 0.13.3",
|
||||||
"rand 0.8.5",
|
"rand 0.8.5",
|
||||||
"redis 0.25.4",
|
"redis 0.25.4",
|
||||||
|
|
|
||||||
|
|
@ -24,6 +24,7 @@ tokio-util = { version = "0.7" }
|
||||||
prost.workspace = true
|
prost.workspace = true
|
||||||
async-stream.workspace = true
|
async-stream.workspace = true
|
||||||
async-trait.workspace = true
|
async-trait.workspace = true
|
||||||
|
prometheus-client.workspace = true
|
||||||
zstd = "0.13"
|
zstd = "0.13"
|
||||||
loole = "0.4.0"
|
loole = "0.4.0"
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
use crate::collab_update_sink::{AwarenessUpdateSink, CollabUpdateSink};
|
use crate::collab_update_sink::{AwarenessUpdateSink, CollabUpdateSink};
|
||||||
use crate::error::{internal, StreamError};
|
use crate::error::{internal, StreamError};
|
||||||
use crate::lease::{Lease, LeaseAcquisition};
|
use crate::lease::{Lease, LeaseAcquisition};
|
||||||
|
use crate::metrics::CollabStreamMetrics;
|
||||||
use crate::model::{AwarenessStreamUpdate, CollabStreamUpdate, MessageId};
|
use crate::model::{AwarenessStreamUpdate, CollabStreamUpdate, MessageId};
|
||||||
use crate::stream_group::{StreamConfig, StreamGroup};
|
use crate::stream_group::{StreamConfig, StreamGroup};
|
||||||
use crate::stream_router::{StreamRouter, StreamRouterOptions};
|
use crate::stream_router::{StreamRouter, StreamRouterOptions};
|
||||||
|
|
@ -21,14 +22,21 @@ pub struct CollabRedisStream {
|
||||||
impl CollabRedisStream {
|
impl CollabRedisStream {
|
||||||
pub const LEASE_TTL: Duration = Duration::from_secs(60);
|
pub const LEASE_TTL: Duration = Duration::from_secs(60);
|
||||||
|
|
||||||
pub async fn new(redis_client: redis::Client) -> Result<Self, redis::RedisError> {
|
pub async fn new(
|
||||||
|
redis_client: redis::Client,
|
||||||
|
metrics: Arc<CollabStreamMetrics>,
|
||||||
|
) -> Result<Self, redis::RedisError> {
|
||||||
let router_options = StreamRouterOptions {
|
let router_options = StreamRouterOptions {
|
||||||
worker_count: 60,
|
worker_count: 60,
|
||||||
xread_streams: 100,
|
xread_streams: 100,
|
||||||
xread_block_millis: Some(5000),
|
xread_block_millis: Some(5000),
|
||||||
xread_count: None,
|
xread_count: None,
|
||||||
};
|
};
|
||||||
let stream_router = Arc::new(StreamRouter::with_options(&redis_client, router_options)?);
|
let stream_router = Arc::new(StreamRouter::with_options(
|
||||||
|
&redis_client,
|
||||||
|
metrics,
|
||||||
|
router_options,
|
||||||
|
)?);
|
||||||
let connection_manager = redis_client.get_connection_manager().await?;
|
let connection_manager = redis_client.get_connection_manager().await?;
|
||||||
Ok(Self::new_with_connection_manager(
|
Ok(Self::new_with_connection_manager(
|
||||||
connection_manager,
|
connection_manager,
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@ pub mod client;
|
||||||
pub mod collab_update_sink;
|
pub mod collab_update_sink;
|
||||||
pub mod error;
|
pub mod error;
|
||||||
pub mod lease;
|
pub mod lease;
|
||||||
|
pub mod metrics;
|
||||||
pub mod model;
|
pub mod model;
|
||||||
pub mod pubsub;
|
pub mod pubsub;
|
||||||
pub mod stream_group;
|
pub mod stream_group;
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,28 @@
|
||||||
|
use prometheus_client::metrics::counter::Counter;
|
||||||
|
use prometheus_client::registry::Registry;
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct CollabStreamMetrics {
|
||||||
|
/// Incremented each time a new collab stream read task is set (including recurring tasks).
|
||||||
|
pub reads_enqueued: Counter,
|
||||||
|
/// Incremented each time an existing task is consumed (including recurring tasks).
|
||||||
|
pub reads_dequeued: Counter,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CollabStreamMetrics {
|
||||||
|
pub fn register(registry: &mut Registry) -> Self {
|
||||||
|
let metrics = Self::default();
|
||||||
|
let realtime_registry = registry.sub_registry_with_prefix("collab_stream");
|
||||||
|
realtime_registry.register(
|
||||||
|
"reads_enqueued",
|
||||||
|
"Incremented each time a new collab stream read task is set (including recurring tasks).",
|
||||||
|
metrics.reads_enqueued.clone(),
|
||||||
|
);
|
||||||
|
realtime_registry.register(
|
||||||
|
"reads_dequeued",
|
||||||
|
"Incremented each time an existing task is consumed (including recurring tasks).",
|
||||||
|
metrics.reads_dequeued.clone(),
|
||||||
|
);
|
||||||
|
metrics
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -1,3 +1,4 @@
|
||||||
|
use crate::metrics::CollabStreamMetrics;
|
||||||
use loole::{Receiver, Sender};
|
use loole::{Receiver, Sender};
|
||||||
use redis::streams::{StreamReadOptions, StreamReadReply};
|
use redis::streams::{StreamReadOptions, StreamReadReply};
|
||||||
use redis::Client;
|
use redis::Client;
|
||||||
|
|
@ -27,14 +28,19 @@ pub struct StreamRouter {
|
||||||
alive: Arc<AtomicBool>,
|
alive: Arc<AtomicBool>,
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
workers: Vec<Worker>,
|
workers: Vec<Worker>,
|
||||||
|
metrics: Arc<CollabStreamMetrics>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl StreamRouter {
|
impl StreamRouter {
|
||||||
pub fn new(client: &Client) -> Result<Self, RedisError> {
|
pub fn new(client: &Client, metrics: Arc<CollabStreamMetrics>) -> Result<Self, RedisError> {
|
||||||
Self::with_options(client, Default::default())
|
Self::with_options(client, metrics, Default::default())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn with_options(client: &Client, options: StreamRouterOptions) -> Result<Self, RedisError> {
|
pub fn with_options(
|
||||||
|
client: &Client,
|
||||||
|
metrics: Arc<CollabStreamMetrics>,
|
||||||
|
options: StreamRouterOptions,
|
||||||
|
) -> Result<Self, RedisError> {
|
||||||
let alive = Arc::new(AtomicBool::new(true));
|
let alive = Arc::new(AtomicBool::new(true));
|
||||||
let (tx, rx) = loole::unbounded();
|
let (tx, rx) = loole::unbounded();
|
||||||
let mut workers = Vec::with_capacity(options.worker_count);
|
let mut workers = Vec::with_capacity(options.worker_count);
|
||||||
|
|
@ -47,6 +53,7 @@ impl StreamRouter {
|
||||||
rx.clone(),
|
rx.clone(),
|
||||||
alive.clone(),
|
alive.clone(),
|
||||||
&options,
|
&options,
|
||||||
|
metrics.clone(),
|
||||||
);
|
);
|
||||||
workers.push(worker);
|
workers.push(worker);
|
||||||
}
|
}
|
||||||
|
|
@ -55,6 +62,7 @@ impl StreamRouter {
|
||||||
buf: tx,
|
buf: tx,
|
||||||
workers,
|
workers,
|
||||||
alive,
|
alive,
|
||||||
|
metrics,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -63,6 +71,7 @@ impl StreamRouter {
|
||||||
let last_id = last_id.unwrap_or_else(|| "0".to_string());
|
let last_id = last_id.unwrap_or_else(|| "0".to_string());
|
||||||
let h = StreamHandle::new(stream_key, last_id, tx);
|
let h = StreamHandle::new(stream_key, last_id, tx);
|
||||||
self.buf.send(h).unwrap();
|
self.buf.send(h).unwrap();
|
||||||
|
self.metrics.reads_enqueued.inc();
|
||||||
rx
|
rx
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -118,6 +127,7 @@ impl Worker {
|
||||||
rx: Receiver<StreamHandle>,
|
rx: Receiver<StreamHandle>,
|
||||||
alive: Arc<AtomicBool>,
|
alive: Arc<AtomicBool>,
|
||||||
options: &StreamRouterOptions,
|
options: &StreamRouterOptions,
|
||||||
|
metrics: Arc<CollabStreamMetrics>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let mut xread_options = StreamReadOptions::default();
|
let mut xread_options = StreamReadOptions::default();
|
||||||
if let Some(block_millis) = options.xread_block_millis {
|
if let Some(block_millis) = options.xread_block_millis {
|
||||||
|
|
@ -128,7 +138,7 @@ impl Worker {
|
||||||
}
|
}
|
||||||
let count = options.xread_streams;
|
let count = options.xread_streams;
|
||||||
let handle = std::thread::spawn(move || {
|
let handle = std::thread::spawn(move || {
|
||||||
if let Err(err) = Self::process_streams(conn, tx, rx, alive, xread_options, count) {
|
if let Err(err) = Self::process_streams(conn, tx, rx, alive, xread_options, count, metrics) {
|
||||||
tracing::error!("worker {} failed: {}", worker_id, err);
|
tracing::error!("worker {} failed: {}", worker_id, err);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
@ -142,11 +152,13 @@ impl Worker {
|
||||||
alive: Arc<AtomicBool>,
|
alive: Arc<AtomicBool>,
|
||||||
options: StreamReadOptions,
|
options: StreamReadOptions,
|
||||||
count: usize,
|
count: usize,
|
||||||
|
metrics: Arc<CollabStreamMetrics>,
|
||||||
) -> RedisResult<()> {
|
) -> RedisResult<()> {
|
||||||
let mut stream_keys = Vec::with_capacity(count);
|
let mut stream_keys = Vec::with_capacity(count);
|
||||||
let mut message_ids = Vec::with_capacity(count);
|
let mut message_ids = Vec::with_capacity(count);
|
||||||
let mut senders = HashMap::with_capacity(count);
|
let mut senders = HashMap::with_capacity(count);
|
||||||
while alive.load(SeqCst) {
|
while alive.load(SeqCst) {
|
||||||
|
// receive next `count` of stream read requests
|
||||||
if !Self::read_buf(&rx, &mut stream_keys, &mut message_ids, &mut senders) {
|
if !Self::read_buf(&rx, &mut stream_keys, &mut message_ids, &mut senders) {
|
||||||
break; // rx channel has closed
|
break; // rx channel has closed
|
||||||
}
|
}
|
||||||
|
|
@ -158,10 +170,12 @@ impl Worker {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
metrics.reads_dequeued.inc_by(key_count as u64);
|
||||||
let result: StreamReadReply = conn.xread_options(&stream_keys, &message_ids, &options)?;
|
let result: StreamReadReply = conn.xread_options(&stream_keys, &message_ids, &options)?;
|
||||||
|
|
||||||
let mut msgs = 0;
|
let mut msgs = 0;
|
||||||
for stream in result.keys {
|
for stream in result.keys {
|
||||||
|
// for each stream returned from Redis, resolve corresponding subscriber and send messages
|
||||||
let mut remove_sender = false;
|
let mut remove_sender = false;
|
||||||
if let Some((sender, idx)) = senders.get(stream.key.as_str()) {
|
if let Some((sender, idx)) = senders.get(stream.key.as_str()) {
|
||||||
for id in stream.ids {
|
for id in stream.ids {
|
||||||
|
|
@ -170,7 +184,7 @@ impl Worker {
|
||||||
message_ids[*idx].clone_from(&message_id); //TODO: optimize
|
message_ids[*idx].clone_from(&message_id); //TODO: optimize
|
||||||
msgs += 1;
|
msgs += 1;
|
||||||
if let Err(err) = sender.send((message_id, value)) {
|
if let Err(err) = sender.send((message_id, value)) {
|
||||||
tracing::warn!("failed to send: {}", err);
|
tracing::debug!("failed to send: {}", err);
|
||||||
remove_sender = true;
|
remove_sender = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -188,7 +202,8 @@ impl Worker {
|
||||||
key_count
|
key_count
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
Self::schedule_back(&tx, &mut stream_keys, &mut message_ids, &mut senders);
|
let scheduled = Self::schedule_back(&tx, &mut stream_keys, &mut message_ids, &mut senders);
|
||||||
|
metrics.reads_enqueued.inc_by(scheduled as u64);
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
@ -198,21 +213,27 @@ impl Worker {
|
||||||
keys: &mut Vec<StreamKey>,
|
keys: &mut Vec<StreamKey>,
|
||||||
ids: &mut Vec<String>,
|
ids: &mut Vec<String>,
|
||||||
senders: &mut HashMap<&str, (StreamSender, usize)>,
|
senders: &mut HashMap<&str, (StreamSender, usize)>,
|
||||||
) {
|
) -> usize {
|
||||||
let keys = keys.drain(..);
|
let keys = keys.drain(..);
|
||||||
let mut ids = ids.drain(..);
|
let mut ids = ids.drain(..);
|
||||||
|
let mut scheduled = 0;
|
||||||
for key in keys {
|
for key in keys {
|
||||||
if let Some(last_id) = ids.next() {
|
if let Some(last_id) = ids.next() {
|
||||||
if let Some((sender, _)) = senders.remove(key.as_str()) {
|
if let Some((sender, _)) = senders.remove(key.as_str()) {
|
||||||
|
if sender.is_closed() {
|
||||||
|
continue; // sender is already closed
|
||||||
|
}
|
||||||
let h = StreamHandle::new(key, last_id, sender);
|
let h = StreamHandle::new(key, last_id, sender);
|
||||||
if let Err(err) = tx.send(h) {
|
if let Err(err) = tx.send(h) {
|
||||||
tracing::warn!("failed to reschedule: {}", err);
|
tracing::error!("failed to reschedule: {}", err);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
scheduled += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
senders.clear();
|
senders.clear();
|
||||||
|
scheduled
|
||||||
}
|
}
|
||||||
|
|
||||||
fn read_buf(
|
fn read_buf(
|
||||||
|
|
@ -276,19 +297,23 @@ impl StreamHandle {
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use crate::stream_router::StreamRouter;
|
use crate::metrics::CollabStreamMetrics;
|
||||||
|
use crate::stream_router::{StreamRouter, StreamRouterOptions};
|
||||||
use rand::random;
|
use rand::random;
|
||||||
use redis::{Client, Commands, FromRedisValue};
|
use redis::{Client, Commands, FromRedisValue};
|
||||||
|
use std::sync::Arc;
|
||||||
use tokio::task::JoinSet;
|
use tokio::task::JoinSet;
|
||||||
|
|
||||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||||
async fn multi_worker_preexisting_messages() {
|
async fn multi_worker_preexisting_messages() {
|
||||||
const ROUTES_COUNT: usize = 200;
|
const ROUTES_COUNT: usize = 200;
|
||||||
const MSG_PER_ROUTE: usize = 10;
|
const MSG_PER_ROUTE: usize = 10;
|
||||||
|
|
||||||
let mut client = Client::open("redis://127.0.0.1/").unwrap();
|
let mut client = Client::open("redis://127.0.0.1/").unwrap();
|
||||||
let keys = init_streams(&mut client, ROUTES_COUNT, MSG_PER_ROUTE);
|
let keys = init_streams(&mut client, ROUTES_COUNT, MSG_PER_ROUTE);
|
||||||
|
let metrics = Arc::new(CollabStreamMetrics::default());
|
||||||
|
|
||||||
let router = StreamRouter::new(&client).unwrap();
|
let router = StreamRouter::new(&client, metrics).unwrap();
|
||||||
|
|
||||||
let mut join_set = JoinSet::new();
|
let mut join_set = JoinSet::new();
|
||||||
for key in keys {
|
for key in keys {
|
||||||
|
|
@ -313,8 +338,9 @@ mod test {
|
||||||
const MSG_PER_ROUTE: usize = 10;
|
const MSG_PER_ROUTE: usize = 10;
|
||||||
let mut client = Client::open("redis://127.0.0.1/").unwrap();
|
let mut client = Client::open("redis://127.0.0.1/").unwrap();
|
||||||
let keys = init_streams(&mut client, ROUTES_COUNT, 0);
|
let keys = init_streams(&mut client, ROUTES_COUNT, 0);
|
||||||
|
let metrics = Arc::new(CollabStreamMetrics::default());
|
||||||
|
|
||||||
let router = StreamRouter::new(&client).unwrap();
|
let router = StreamRouter::new(&client, metrics).unwrap();
|
||||||
|
|
||||||
let mut join_set = JoinSet::new();
|
let mut join_set = JoinSet::new();
|
||||||
for key in keys.iter() {
|
for key in keys.iter() {
|
||||||
|
|
@ -348,8 +374,9 @@ mod test {
|
||||||
let _: String = client.xadd(&key, "*", &[("data", 1)]).unwrap();
|
let _: String = client.xadd(&key, "*", &[("data", 1)]).unwrap();
|
||||||
let m2: String = client.xadd(&key, "*", &[("data", 2)]).unwrap();
|
let m2: String = client.xadd(&key, "*", &[("data", 2)]).unwrap();
|
||||||
let m3: String = client.xadd(&key, "*", &[("data", 3)]).unwrap();
|
let m3: String = client.xadd(&key, "*", &[("data", 3)]).unwrap();
|
||||||
|
let metrics = Arc::new(CollabStreamMetrics::default());
|
||||||
|
|
||||||
let router = StreamRouter::new(&client).unwrap();
|
let router = StreamRouter::new(&client, metrics).unwrap();
|
||||||
let mut observer = router.observe(key, Some(m2));
|
let mut observer = router.observe(key, Some(m2));
|
||||||
|
|
||||||
let (msg_id, m) = observer.recv().await.unwrap();
|
let (msg_id, m) = observer.recv().await.unwrap();
|
||||||
|
|
@ -357,6 +384,51 @@ mod test {
|
||||||
assert_eq!(u32::from_redis_value(&m["data"]).unwrap(), 3);
|
assert_eq!(u32::from_redis_value(&m["data"]).unwrap(), 3);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||||
|
async fn drop_subscription() {
|
||||||
|
const ROUTES_COUNT: usize = 1;
|
||||||
|
const MSG_PER_ROUTE: usize = 10;
|
||||||
|
|
||||||
|
let mut client = Client::open("redis://127.0.0.1/").unwrap();
|
||||||
|
let mut keys = init_streams(&mut client, ROUTES_COUNT, MSG_PER_ROUTE);
|
||||||
|
let metrics = Arc::new(CollabStreamMetrics::default());
|
||||||
|
|
||||||
|
let router = StreamRouter::with_options(
|
||||||
|
&client,
|
||||||
|
metrics.clone(),
|
||||||
|
StreamRouterOptions {
|
||||||
|
worker_count: 2,
|
||||||
|
xread_streams: 100,
|
||||||
|
xread_block_millis: Some(50),
|
||||||
|
xread_count: None,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let key = keys.pop().unwrap();
|
||||||
|
let mut observer = router.observe(key.clone(), None);
|
||||||
|
for i in 0..MSG_PER_ROUTE {
|
||||||
|
let (_msg_id, map) = observer.recv().await.unwrap();
|
||||||
|
let value = String::from_redis_value(&map["data"]).unwrap();
|
||||||
|
assert_eq!(value, format!("{}-{}", key, i));
|
||||||
|
}
|
||||||
|
// drop observer and wait for worker to release
|
||||||
|
drop(observer);
|
||||||
|
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
|
||||||
|
let enqueued = metrics.reads_enqueued.get();
|
||||||
|
let dequeued = metrics.reads_dequeued.get();
|
||||||
|
assert_eq!(enqueued, dequeued, "dropped observer state");
|
||||||
|
|
||||||
|
// after dropping observer, no new polling task should be rescheduled
|
||||||
|
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
|
||||||
|
assert_eq!(metrics.reads_enqueued.get(), enqueued, "unchanged enqueues");
|
||||||
|
assert_eq!(metrics.reads_dequeued.get(), dequeued, "unchanged dequeues");
|
||||||
|
|
||||||
|
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
|
||||||
|
assert_eq!(metrics.reads_enqueued.get(), enqueued, "unchanged enqueues");
|
||||||
|
assert_eq!(metrics.reads_dequeued.get(), dequeued, "unchanged dequeues");
|
||||||
|
}
|
||||||
|
|
||||||
fn init_streams(client: &mut Client, stream_count: usize, msgs_per_stream: usize) -> Vec<String> {
|
fn init_streams(client: &mut Client, stream_count: usize, msgs_per_stream: usize) -> Vec<String> {
|
||||||
let test_prefix: u32 = random();
|
let test_prefix: u32 = random();
|
||||||
let mut keys = Vec::with_capacity(stream_count);
|
let mut keys = Vec::with_capacity(stream_count);
|
||||||
|
|
|
||||||
|
|
@ -24,6 +24,7 @@ use crate::actix_ws::server::RealtimeServerActor;
|
||||||
use crate::api::{collab_scope, ws_scope};
|
use crate::api::{collab_scope, ws_scope};
|
||||||
use crate::collab::access_control::CollabStorageAccessControlImpl;
|
use crate::collab::access_control::CollabStorageAccessControlImpl;
|
||||||
use access_control::casbin::access::AccessControl;
|
use access_control::casbin::access::AccessControl;
|
||||||
|
use collab_stream::metrics::CollabStreamMetrics;
|
||||||
use collab_stream::stream_router::{StreamRouter, StreamRouterOptions};
|
use collab_stream::stream_router::{StreamRouter, StreamRouterOptions};
|
||||||
use database::file::s3_client_impl::AwsS3BucketClientImpl;
|
use database::file::s3_client_impl::AwsS3BucketClientImpl;
|
||||||
|
|
||||||
|
|
@ -110,8 +111,12 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result<A
|
||||||
let user_cache = UserCache::new(pg_pool.clone()).await;
|
let user_cache = UserCache::new(pg_pool.clone()).await;
|
||||||
|
|
||||||
info!("Connecting to Redis...");
|
info!("Connecting to Redis...");
|
||||||
let (redis_conn_manager, redis_stream_router) =
|
let (redis_conn_manager, redis_stream_router) = get_redis_client(
|
||||||
get_redis_client(config.redis_uri.expose_secret(), config.redis_worker_count).await?;
|
config.redis_uri.expose_secret(),
|
||||||
|
config.redis_worker_count,
|
||||||
|
metrics.collab_stream_metrics.clone(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
// Pg listeners
|
// Pg listeners
|
||||||
info!("Setting up Pg listeners...");
|
info!("Setting up Pg listeners...");
|
||||||
|
|
@ -189,12 +194,14 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result<A
|
||||||
async fn get_redis_client(
|
async fn get_redis_client(
|
||||||
redis_uri: &str,
|
redis_uri: &str,
|
||||||
worker_count: usize,
|
worker_count: usize,
|
||||||
|
metrics: Arc<CollabStreamMetrics>,
|
||||||
) -> Result<(redis::aio::ConnectionManager, Arc<StreamRouter>), Error> {
|
) -> Result<(redis::aio::ConnectionManager, Arc<StreamRouter>), Error> {
|
||||||
info!("Connecting to redis with uri: {}", redis_uri);
|
info!("Connecting to redis with uri: {}", redis_uri);
|
||||||
let client = redis::Client::open(redis_uri).context("failed to connect to redis")?;
|
let client = redis::Client::open(redis_uri).context("failed to connect to redis")?;
|
||||||
|
|
||||||
let router = StreamRouter::with_options(
|
let router = StreamRouter::with_options(
|
||||||
&client,
|
&client,
|
||||||
|
metrics,
|
||||||
StreamRouterOptions {
|
StreamRouterOptions {
|
||||||
worker_count,
|
worker_count,
|
||||||
xread_streams: 100,
|
xread_streams: 100,
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,7 @@ use crate::pg_listener::PgListeners;
|
||||||
use crate::CollabRealtimeMetrics;
|
use crate::CollabRealtimeMetrics;
|
||||||
use access_control::metrics::AccessControlMetrics;
|
use access_control::metrics::AccessControlMetrics;
|
||||||
use app_error::AppError;
|
use app_error::AppError;
|
||||||
|
use collab_stream::metrics::CollabStreamMetrics;
|
||||||
use collab_stream::stream_router::StreamRouter;
|
use collab_stream::stream_router::StreamRouter;
|
||||||
use database::user::{select_all_uid_uuid, select_uid_from_uuid};
|
use database::user::{select_all_uid_uuid, select_uid_from_uuid};
|
||||||
use indexer::metrics::EmbeddingMetrics;
|
use indexer::metrics::EmbeddingMetrics;
|
||||||
|
|
@ -40,6 +41,7 @@ pub struct AppMetrics {
|
||||||
pub access_control_metrics: Arc<AccessControlMetrics>,
|
pub access_control_metrics: Arc<AccessControlMetrics>,
|
||||||
pub realtime_metrics: Arc<CollabRealtimeMetrics>,
|
pub realtime_metrics: Arc<CollabRealtimeMetrics>,
|
||||||
pub collab_metrics: Arc<CollabMetrics>,
|
pub collab_metrics: Arc<CollabMetrics>,
|
||||||
|
pub collab_stream_metrics: Arc<CollabStreamMetrics>,
|
||||||
pub embedding_metrics: Arc<EmbeddingMetrics>,
|
pub embedding_metrics: Arc<EmbeddingMetrics>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -55,12 +57,14 @@ impl AppMetrics {
|
||||||
let access_control_metrics = Arc::new(AccessControlMetrics::register(&mut registry));
|
let access_control_metrics = Arc::new(AccessControlMetrics::register(&mut registry));
|
||||||
let realtime_metrics = Arc::new(CollabRealtimeMetrics::register(&mut registry));
|
let realtime_metrics = Arc::new(CollabRealtimeMetrics::register(&mut registry));
|
||||||
let collab_metrics = Arc::new(CollabMetrics::register(&mut registry));
|
let collab_metrics = Arc::new(CollabMetrics::register(&mut registry));
|
||||||
|
let collab_stream_metrics = Arc::new(CollabStreamMetrics::register(&mut registry));
|
||||||
let embedding_metrics = Arc::new(EmbeddingMetrics::register(&mut registry));
|
let embedding_metrics = Arc::new(EmbeddingMetrics::register(&mut registry));
|
||||||
Self {
|
Self {
|
||||||
registry: Arc::new(registry),
|
registry: Arc::new(registry),
|
||||||
access_control_metrics,
|
access_control_metrics,
|
||||||
realtime_metrics,
|
realtime_metrics,
|
||||||
collab_metrics,
|
collab_metrics,
|
||||||
|
collab_stream_metrics,
|
||||||
embedding_metrics,
|
embedding_metrics,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -42,6 +42,7 @@ use appflowy_collaborate::collab::storage::CollabStorageImpl;
|
||||||
use appflowy_collaborate::command::{CLCommandReceiver, CLCommandSender};
|
use appflowy_collaborate::command::{CLCommandReceiver, CLCommandSender};
|
||||||
use appflowy_collaborate::snapshot::SnapshotControl;
|
use appflowy_collaborate::snapshot::SnapshotControl;
|
||||||
use appflowy_collaborate::CollaborationServer;
|
use appflowy_collaborate::CollaborationServer;
|
||||||
|
use collab_stream::metrics::CollabStreamMetrics;
|
||||||
use collab_stream::stream_router::{StreamRouter, StreamRouterOptions};
|
use collab_stream::stream_router::{StreamRouter, StreamRouterOptions};
|
||||||
use database::file::s3_client_impl::{AwsS3BucketClientImpl, S3BucketStorage};
|
use database::file::s3_client_impl::{AwsS3BucketClientImpl, S3BucketStorage};
|
||||||
use indexer::collab_indexer::IndexerProvider;
|
use indexer::collab_indexer::IndexerProvider;
|
||||||
|
|
@ -248,8 +249,12 @@ pub async fn init_state(config: &Config, rt_cmd_tx: CLCommandSender) -> Result<A
|
||||||
|
|
||||||
// Redis
|
// Redis
|
||||||
info!("Connecting to Redis...");
|
info!("Connecting to Redis...");
|
||||||
let (redis_conn_manager, redis_stream_router) =
|
let (redis_conn_manager, redis_stream_router) = get_redis_client(
|
||||||
get_redis_client(config.redis_uri.expose_secret(), config.redis_worker_count).await?;
|
config.redis_uri.expose_secret(),
|
||||||
|
config.redis_worker_count,
|
||||||
|
metrics.collab_stream_metrics.clone(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
info!("Setup AppFlowy AI: {}", config.appflowy_ai.url());
|
info!("Setup AppFlowy AI: {}", config.appflowy_ai.url());
|
||||||
let appflowy_ai_client = AppFlowyAIClient::new(&config.appflowy_ai.url());
|
let appflowy_ai_client = AppFlowyAIClient::new(&config.appflowy_ai.url());
|
||||||
|
|
@ -387,12 +392,14 @@ fn get_admin_client(
|
||||||
async fn get_redis_client(
|
async fn get_redis_client(
|
||||||
redis_uri: &str,
|
redis_uri: &str,
|
||||||
worker_count: usize,
|
worker_count: usize,
|
||||||
|
metrics: Arc<CollabStreamMetrics>,
|
||||||
) -> Result<(redis::aio::ConnectionManager, Arc<StreamRouter>), Error> {
|
) -> Result<(redis::aio::ConnectionManager, Arc<StreamRouter>), Error> {
|
||||||
info!("Connecting to redis with uri: {}", redis_uri);
|
info!("Connecting to redis with uri: {}", redis_uri);
|
||||||
let client = redis::Client::open(redis_uri).context("failed to connect to redis")?;
|
let client = redis::Client::open(redis_uri).context("failed to connect to redis")?;
|
||||||
|
|
||||||
let router = StreamRouter::with_options(
|
let router = StreamRouter::with_options(
|
||||||
&client,
|
&client,
|
||||||
|
metrics,
|
||||||
StreamRouterOptions {
|
StreamRouterOptions {
|
||||||
worker_count,
|
worker_count,
|
||||||
xread_streams: 100,
|
xread_streams: 100,
|
||||||
|
|
|
||||||
|
|
@ -17,6 +17,7 @@ use appflowy_collaborate::collab::cache::CollabCache;
|
||||||
use appflowy_collaborate::collab::storage::CollabAccessControlStorage;
|
use appflowy_collaborate::collab::storage::CollabAccessControlStorage;
|
||||||
use appflowy_collaborate::metrics::CollabMetrics;
|
use appflowy_collaborate::metrics::CollabMetrics;
|
||||||
use appflowy_collaborate::CollabRealtimeMetrics;
|
use appflowy_collaborate::CollabRealtimeMetrics;
|
||||||
|
use collab_stream::metrics::CollabStreamMetrics;
|
||||||
use collab_stream::stream_router::StreamRouter;
|
use collab_stream::stream_router::StreamRouter;
|
||||||
use database::file::s3_client_impl::{AwsS3BucketClientImpl, S3BucketStorage};
|
use database::file::s3_client_impl::{AwsS3BucketClientImpl, S3BucketStorage};
|
||||||
use database::user::{select_all_uid_uuid, select_uid_from_uuid};
|
use database::user::{select_all_uid_uuid, select_uid_from_uuid};
|
||||||
|
|
@ -128,6 +129,7 @@ pub struct AppMetrics {
|
||||||
pub published_collab_metrics: Arc<PublishedCollabMetrics>,
|
pub published_collab_metrics: Arc<PublishedCollabMetrics>,
|
||||||
pub appflowy_web_metrics: Arc<AppFlowyWebMetrics>,
|
pub appflowy_web_metrics: Arc<AppFlowyWebMetrics>,
|
||||||
pub embedding_metrics: Arc<EmbeddingMetrics>,
|
pub embedding_metrics: Arc<EmbeddingMetrics>,
|
||||||
|
pub collab_stream_metrics: Arc<CollabStreamMetrics>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for AppMetrics {
|
impl Default for AppMetrics {
|
||||||
|
|
@ -146,6 +148,7 @@ impl AppMetrics {
|
||||||
let published_collab_metrics = Arc::new(PublishedCollabMetrics::register(&mut registry));
|
let published_collab_metrics = Arc::new(PublishedCollabMetrics::register(&mut registry));
|
||||||
let appflowy_web_metrics = Arc::new(AppFlowyWebMetrics::register(&mut registry));
|
let appflowy_web_metrics = Arc::new(AppFlowyWebMetrics::register(&mut registry));
|
||||||
let embedding_metrics = Arc::new(EmbeddingMetrics::register(&mut registry));
|
let embedding_metrics = Arc::new(EmbeddingMetrics::register(&mut registry));
|
||||||
|
let collab_stream_metrics = Arc::new(CollabStreamMetrics::register(&mut registry));
|
||||||
Self {
|
Self {
|
||||||
registry: Arc::new(registry),
|
registry: Arc::new(registry),
|
||||||
request_metrics,
|
request_metrics,
|
||||||
|
|
@ -155,6 +158,7 @@ impl AppMetrics {
|
||||||
published_collab_metrics,
|
published_collab_metrics,
|
||||||
appflowy_web_metrics,
|
appflowy_web_metrics,
|
||||||
embedding_metrics,
|
embedding_metrics,
|
||||||
|
collab_stream_metrics,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue