Feat/ws test (#4)

* test: ws test

* test: update

* test: update

* test: sync update

* feat: ws test
This commit is contained in:
Nathan.fooo 2023-05-10 20:26:30 +08:00 committed by GitHub
parent 18e950a829
commit 90ae1d5fb6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 424 additions and 90 deletions

27
Cargo.lock generated
View File

@ -442,13 +442,14 @@ dependencies = [
"actix-web-flash-messages",
"anyhow",
"argon2",
"assert-json-diff",
"async-stream",
"bincode",
"bytes",
"chrono",
"collab",
"collab-client-ws",
"collab-persistence",
"collab-sync",
"collab-plugins",
"config",
"dashmap",
"derive_more",
@ -466,6 +467,7 @@ dependencies = [
"serde_json",
"snowflake",
"sqlx",
"tempfile",
"thiserror",
"token",
"tokio",
@ -536,6 +538,16 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "assert-json-diff"
version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47e4f2b81832e72834d7518d8487a0396a28cc408186a2e8854c0f98011faf12"
dependencies = [
"serde",
"serde_json",
]
[[package]]
name = "async-stream"
version = "0.3.5"
@ -812,6 +824,7 @@ dependencies = [
[[package]]
name = "collab"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=af4941#af4941ba5394157869eca56d4c937dbec1f0a0e3"
dependencies = [
"anyhow",
"bytes",
@ -828,8 +841,10 @@ dependencies = [
[[package]]
name = "collab-client-ws"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=af4941#af4941ba5394157869eca56d4c937dbec1f0a0e3"
dependencies = [
"bytes",
"collab-sync",
"futures-util",
"serde",
"serde_json",
@ -844,6 +859,7 @@ dependencies = [
[[package]]
name = "collab-persistence"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=af4941#af4941ba5394157869eca56d4c937dbec1f0a0e3"
dependencies = [
"bincode",
"chrono",
@ -863,11 +879,14 @@ dependencies = [
[[package]]
name = "collab-plugins"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=af4941#af4941ba5394157869eca56d4c937dbec1f0a0e3"
dependencies = [
"collab",
"collab-client-ws",
"collab-persistence",
"collab-sync",
"futures-util",
"tokio",
"tracing",
"y-sync",
"yrs",
@ -876,6 +895,7 @@ dependencies = [
[[package]]
name = "collab-sync"
version = "0.1.0"
source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=af4941#af4941ba5394157869eca56d4c937dbec1f0a0e3"
dependencies = [
"bytes",
"collab",
@ -3621,14 +3641,13 @@ dependencies = [
"actix-web-actors",
"bytes",
"collab",
"collab-persistence",
"collab-plugins",
"collab-sync",
"dashmap",
"futures-util",
"parking_lot 0.12.1",
"secrecy",
"serde",
"serde_json",
"thiserror",
"tokio",
"tokio-stream",

View File

@ -51,8 +51,6 @@ bytes = "1.4.0"
bincode = "1.3.3"
dashmap = "5.4"
rcgen = { version = "0.10.0", features = ["pem", "x509-parser"] }
collab-sync = {version = "0.1.0"}
collab-persistence = {version = "0.1.0"}
# tracing
tracing = { version = "0.1.37" }
@ -66,16 +64,19 @@ sqlx = { version = "0.6", default-features = false, features = ["runtime-actix-r
token = { path = "./crates/token" }
snowflake = { path = "./crates/snowflake" }
websocket = { path = "./crates/websocket" }
collab-plugins = { version = "0.1.0", features = ["sync", "disk_rocksdb"] }
[dev-dependencies]
once_cell = "1.7.2"
collab = { version = "0.1.0" }
collab-client-ws = { version = "0.1.0" }
tempfile = "3.4.0"
assert-json-diff = "2.0.2"
[[bin]]
name = "appflowy_server"
path = "src/main.rs"
[lib]
path = "src/lib.rs"
@ -87,14 +88,14 @@ members = [
]
[patch.crates-io]
collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "4a12ed" }
collab-client-ws = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "4a12ed" }
collab-sync= { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "4a12ed" }
collab-persistence = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "4a12ed" }
collab-plugins = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "4a12ed" }
collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "af4941" }
collab-client-ws = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "af4941" }
collab-sync= { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "af4941" }
collab-persistence = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "af4941" }
collab-plugins = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "af4941" }
#collab = { path = "./crates/AppFlowy-Collab/collab" }
#collab-client-ws = { path = "./crates/AppFlowy-Collab/collab-client-ws" }
#collab-sync = { path = "./crates/AppFlowy-Collab/collab-sync" }
#collab-persistence = { path = "./crates/AppFlowy-Collab/collab-persistence" }
#collab-sync = { path = "./crates/AppFlowy-Collab/collab-sync" }
#collab-plugins = { path = "./crates/AppFlowy-Collab/collab-plugins"}

View File

@ -9,6 +9,7 @@ edition = "2021"
actix = "0.13"
actix-web-actors = { version = "4.2.0" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0.30"
bytes = "1.0"
secrecy = { version = "0.8", features = ["serde"] }
@ -20,6 +21,4 @@ tokio = { version = "1.26", features = ["sync"] }
dashmap = "5.4.0"
collab = { version = "0.1.0"}
collab-sync = { version = "0.1.0"}
collab-persistence = { version = "0.1.0"}
collab-plugins = { version = "0.1.0", features = ["disk_rocksdb"]}
collab-plugins = { version = "0.1.0", features = ["disk_rocksdb", "sync"]}

View File

@ -0,0 +1,40 @@
use crate::error::WSError;
use futures_util::Sink;
use std::fmt::Debug;
use std::pin::Pin;
use std::task::{Context, Poll};
pub struct UnboundedSenderSink<T>(pub tokio::sync::mpsc::UnboundedSender<T>);
impl<T> UnboundedSenderSink<T> {
pub fn new(tx: tokio::sync::mpsc::UnboundedSender<T>) -> Self {
Self(tx)
}
}
impl<T> Sink<T> for UnboundedSenderSink<T>
where
T: Send + Sync + 'static + Debug,
{
type Error = WSError;
fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// An unbounded channel can always accept messages without blocking, so we always return Ready.
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
let _ = self.0.send(item);
Ok(())
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// There is no buffering in an unbounded channel, so we always return Ready.
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// An unbounded channel is closed by dropping the sender, so we don't need to do anything here.
Poll::Ready(Ok(()))
}
}

View File

@ -1,5 +1,5 @@
use crate::entities::{ClientMessage, Connect, Disconnect, ServerMessage, WSUser};
use crate::error::WSError;
use crate::entities::{ClientMessage, Connect, Disconnect, ServerMessage, WSMessage, WSUser};
use crate::CollabServer;
use actix::{
fut, Actor, ActorContext, ActorFutureExt, Addr, AsyncContext, ContextFutureSpawner, Handler,
@ -7,13 +7,12 @@ use actix::{
};
use actix_web_actors::ws;
use bytes::Bytes;
use std::ops::Deref;
use collab_sync::msg::CollabMessage;
use futures_util::Sink;
use collab_plugins::sync::msg::CollabMessage;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
@ -48,10 +47,13 @@ impl CollabSession {
});
}
fn send_to_server(&self, bytes: Bytes) {
match CollabMessage::from_vec(bytes.to_vec()) {
Ok(collab_msg) => {
fn forward_binary_to_ws_server(&self, bytes: Bytes) {
match WSMessage::from_vec(bytes.to_vec()) {
Ok(ws_message) => {
tracing::trace!("[WSClient]: forward message to server");
let collab_msg = CollabMessage::from_vec(&ws_message.payload).unwrap();
self.server.do_send(ClientMessage {
business_id: ws_message.business_id,
user: self.user.clone(),
collab_msg,
});
@ -83,7 +85,7 @@ impl Actor for CollabSession {
tracing::trace!("Send connect message to server success")
},
_ => {
tracing::error!("Send connect message to server failed");
tracing::error!("🔴Send connect message to server failed");
ctx.stop();
},
}
@ -103,8 +105,9 @@ impl Actor for CollabSession {
impl Handler<ServerMessage> for CollabSession {
type Result = ();
fn handle(&mut self, msg: ServerMessage, ctx: &mut Self::Context) {
ctx.binary(msg.collab_msg);
fn handle(&mut self, server_msg: ServerMessage, ctx: &mut Self::Context) {
tracing::trace!("[WSClient]: forward message to client");
ctx.binary(WSMessage::from(server_msg));
}
}
@ -129,7 +132,7 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for CollabSession {
},
ws::Message::Text(_) => {},
ws::Message::Binary(bytes) => {
self.send_to_server(bytes);
self.forward_binary_to_ws_server(bytes);
},
ws::Message::Close(reason) => {
ctx.close(reason);
@ -145,24 +148,10 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for CollabSession {
/// A helper struct that wraps the [Recipient] type to implement the [Sink] trait
pub struct ClientSink(pub Recipient<ServerMessage>);
impl Deref for ClientSink {
type Target = Recipient<ServerMessage>;
impl Sink<CollabMessage> for ClientSink {
type Error = WSError;
fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, item: CollabMessage) -> Result<(), Self::Error> {
self.0.do_send(ServerMessage { collab_msg: item });
Ok(())
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
fn deref(&self) -> &Self::Target {
&self.0
}
}

View File

@ -1,9 +1,10 @@
use crate::error::WSError;
use actix::{Message, Recipient};
use collab_sync::msg::CollabMessage;
use bytes::Bytes;
use collab_plugins::sync::msg::CollabMessage;
use secrecy::{ExposeSecret, Secret};
use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};
use std::hash::{Hash, Hasher};
use std::sync::Arc;
@ -12,6 +13,12 @@ pub struct WSUser {
pub user_id: Secret<String>,
}
impl Display for WSUser {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str(self.user_id.expose_secret())
}
}
impl Hash for WSUser {
fn hash<H: Hasher>(&self, state: &mut H) {
let uid: &String = self.user_id.expose_secret();
@ -45,6 +52,7 @@ pub struct Disconnect {
#[derive(Debug, Message, Clone)]
#[rtype(result = "()")]
pub struct ClientMessage {
pub business_id: String,
pub user: Arc<WSUser>,
pub collab_msg: CollabMessage,
}
@ -52,5 +60,61 @@ pub struct ClientMessage {
#[derive(Debug, Message, Clone)]
#[rtype(result = "()")]
pub struct ServerMessage {
pub collab_msg: CollabMessage,
pub business_id: String,
pub payload: Vec<u8>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WSMessage {
pub business_id: String,
pub payload: Vec<u8>,
}
impl WSMessage {
pub fn from_vec(bytes: Vec<u8>) -> Result<Self, serde_json::Error> {
serde_json::from_slice(&bytes)
}
}
impl From<WSMessage> for Bytes {
fn from(msg: WSMessage) -> Self {
let bytes = serde_json::to_vec(&msg).unwrap_or_default();
Bytes::from(bytes)
}
}
impl From<ServerMessage> for WSMessage {
fn from(server_msg: ServerMessage) -> Self {
Self {
business_id: server_msg.business_id,
payload: server_msg.payload,
}
}
}
impl From<CollabMessage> for WSMessage {
fn from(msg: CollabMessage) -> Self {
Self {
business_id: msg.business_id().to_string(),
payload: msg.to_vec(),
}
}
}
impl From<CollabMessage> for ServerMessage {
fn from(msg: CollabMessage) -> Self {
Self {
business_id: msg.business_id().to_string(),
payload: msg.to_vec(),
}
}
}
impl From<ClientMessage> for WSMessage {
fn from(client_msg: ClientMessage) -> Self {
Self {
business_id: client_msg.business_id,
payload: client_msg.collab_msg.to_vec(),
}
}
}

View File

@ -1,7 +1,7 @@
#[derive(Debug, thiserror::Error)]
pub enum WSError {
#[error(transparent)]
Persistence(#[from] collab_persistence::error::PersistenceError),
Persistence(#[from] collab_plugins::disk::error::PersistenceError),
#[error("Internal failure: {0}")]
Internal(#[from] Box<dyn std::error::Error + Send + Sync>),

View File

@ -1,3 +1,4 @@
mod channel_ext;
mod client;
pub mod entities;
mod error;

View File

@ -1,26 +1,27 @@
use crate::entities::{ClientMessage, Connect, Disconnect, WSUser};
use crate::entities::{ClientMessage, Connect, Disconnect, ServerMessage, WSMessage, WSUser};
use crate::error::WSError;
use crate::ClientSink;
use crate::channel_ext::UnboundedSenderSink;
use actix::{Actor, Context, Handler, ResponseFuture};
use collab::core::collab::MutexCollab;
use collab::core::origin::CollabOrigin;
use collab_persistence::kv::rocks_kv::RocksCollabDB;
use collab_persistence::kv::KVStore;
use collab_plugins::disk_plugin::rocksdb_server::RocksdbServerDiskPlugin;
use collab_sync::server::{
use collab_plugins::disk::keys::make_collab_id_key;
use collab_plugins::disk::kv::rocks_kv::RocksCollabDB;
use collab_plugins::disk::kv::KVStore;
use collab_plugins::disk::rocksdb_server::RocksdbServerDiskPlugin;
use collab_plugins::sync::msg::CollabMessage;
use collab_plugins::sync::server::{
CollabBroadcast, CollabGroup, CollabIDGen, CollabId, NonZeroNodeId, COLLAB_ID_LEN,
};
use dashmap::DashMap;
use parking_lot::{Mutex, RwLock};
use std::collections::HashMap;
use collab_persistence::keys::make_collab_id_key;
use collab_sync::msg::CollabMessage;
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
#[derive(Clone)]
pub struct CollabServer {
@ -30,7 +31,7 @@ pub struct CollabServer {
/// Memory cache for fast lookup of collab_id from object_id
collab_id_by_object_id: Arc<DashMap<String, CollabId>>,
collab_groups: Arc<RwLock<HashMap<CollabId, CollabGroup>>>,
client_streams: Arc<RwLock<HashMap<Arc<WSUser>, ClientStream>>>,
client_streams: Arc<RwLock<HashMap<Arc<WSUser>, WSClientStream>>>,
}
impl CollabServer {
@ -46,6 +47,7 @@ impl CollabServer {
})
}
/// Create a new collab id for the object id.
fn create_collab_id(&self, object_id: &str) -> Result<CollabId, WSError> {
let collab_id = self.collab_id_gen.lock().next_id();
let collab_key = make_collab_id_key(object_id.as_ref());
@ -56,6 +58,8 @@ impl CollabServer {
Ok(collab_id)
}
/// Get the collab id for the object
/// If the object doesn't have a collab id, return None
fn get_collab_id(&self, object_id: &str) -> Option<CollabId> {
let collab_key = make_collab_id_key(object_id.as_ref());
let read_txn = self.db.read_txn();
@ -66,6 +70,7 @@ impl CollabServer {
Some(CollabId::from_be_bytes(bytes))
}
/// Get or create a collab id if the object doesn't have one
fn get_or_create_collab_id(&self, object_id: &str) -> Result<CollabId, WSError> {
let collab_id = self.get_collab_id(object_id);
if let Some(collab_id) = collab_id {
@ -81,6 +86,7 @@ impl CollabServer {
}
}
/// Create the collab group for the object if it doesn't exist
fn create_group_if_need(&self, collab_id: CollabId, object_id: &str) {
if self.collab_groups.read().contains_key(&collab_id) {
return;
@ -108,10 +114,17 @@ impl Actor for CollabServer {
impl Handler<Connect> for CollabServer {
type Result = Result<(), WSError>;
fn handle(&mut self, msg: Connect, _ctx: &mut Context<Self>) -> Self::Result {
let (stream_tx, rx) = tokio::sync::mpsc::channel(100);
let stream = ClientStream::new(ClientSink(msg.socket), ReceiverStream::new(rx), stream_tx);
self.client_streams.write().insert(msg.user, stream);
fn handle(&mut self, new_conn: Connect, _ctx: &mut Context<Self>) -> Self::Result {
tracing::trace!("[WSServer]: {} connect", new_conn.user);
// When receive a new connection, create a new [ClientStream] that holds the connection's websocket
let (stream_tx, stream_rx) = tokio::sync::mpsc::channel(1000);
let stream = WSClientStream::new(
ClientSink(new_conn.socket),
ReceiverStream::new(stream_rx),
stream_tx,
);
self.client_streams.write().insert(new_conn.user, stream);
Ok(())
}
}
@ -119,6 +132,7 @@ impl Handler<Connect> for CollabServer {
impl Handler<Disconnect> for CollabServer {
type Result = Result<(), WSError>;
fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) -> Self::Result {
tracing::trace!("[WSServer]: {} disconnect", msg.user);
self.client_streams.write().remove(&msg.user);
Ok(())
}
@ -127,13 +141,16 @@ impl Handler<Disconnect> for CollabServer {
impl Handler<ClientMessage> for CollabServer {
type Result = ResponseFuture<()>;
fn handle(&mut self, msg: ClientMessage, _ctx: &mut Context<Self>) -> Self::Result {
let object_id = msg.collab_msg.object_id();
fn handle(&mut self, client_msg: ClientMessage, _ctx: &mut Context<Self>) -> Self::Result {
let object_id = client_msg.collab_msg.object_id();
// Get the collab_id for the object_id. If the object_id is not exist, create a new collab_id for it.
// Also create a new [CollabGroup] for the collab_id if it is not exist.
if let Ok(collab_id) = self.get_or_create_collab_id(object_id) {
if let Some(collab_group) = self.collab_groups.write().get_mut(&collab_id) {
if let Some(stream) = self.client_streams.write().get_mut(&msg.user) {
if let Some((sink, stream)) = stream.split() {
let origin = match msg.collab_msg.origin() {
if let Some(client_stream) = self.client_streams.write().get_mut(&client_msg.user) {
// If the client's stream is not subscribed to the collab group, subscribe it.
if let Some((sink, stream)) = client_stream.split() {
let origin = match client_msg.collab_msg.origin() {
None => CollabOrigin::Empty,
Some(client) => client.clone(),
};
@ -147,8 +164,19 @@ impl Handler<ClientMessage> for CollabServer {
let client_streams = self.client_streams.clone();
Box::pin(async move {
if let Some(client_stream) = client_streams.read().get(&msg.user) {
let _ = client_stream.stream_tx.send(Ok(msg.collab_msg)).await;
if let Some(client_stream) = client_streams.read().get(&client_msg.user) {
tracing::trace!(
"[WSServer]: receives client message: {:?}",
client_msg.collab_msg.msg_id()
);
match client_stream
.stream_tx
.send(Ok(WSMessage::from(client_msg)))
.await
{
Ok(_) => {},
Err(e) => tracing::trace!("send error: {:?}", e),
}
}
})
} else {
@ -163,17 +191,17 @@ impl actix::Supervised for CollabServer {
}
}
pub struct ClientStream {
pub struct WSClientStream {
sink: Option<ClientSink>,
stream: Option<ReceiverStream<Result<CollabMessage, WSError>>>,
stream_tx: Sender<Result<CollabMessage, WSError>>,
stream: Option<ReceiverStream<Result<WSMessage, WSError>>>,
stream_tx: Sender<Result<WSMessage, WSError>>,
}
impl ClientStream {
impl WSClientStream {
pub fn new(
sink: ClientSink,
stream: ReceiverStream<Result<CollabMessage, WSError>>,
stream_tx: Sender<Result<CollabMessage, WSError>>,
stream: ReceiverStream<Result<WSMessage, WSError>>,
stream_tx: Sender<Result<WSMessage, WSError>>,
) -> Self {
Self {
sink: Some(sink),
@ -182,9 +210,40 @@ impl ClientStream {
}
}
pub fn split(&mut self) -> Option<(ClientSink, ReceiverStream<Result<CollabMessage, WSError>>)> {
let sink = self.sink.take()?;
let stream = self.stream.take()?;
#[allow(clippy::type_complexity)]
pub fn split<T>(&mut self) -> Option<(UnboundedSenderSink<T>, ReceiverStream<Result<T, WSError>>)>
where
T: TryFrom<WSMessage, Error = WSError> + Into<ServerMessage> + Send + Sync + 'static,
{
let client_sink = self.sink.take()?;
let mut stream = self.stream.take()?;
// forward sink
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<T>();
tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
client_sink.do_send(msg.into());
}
});
let sink = UnboundedSenderSink::<T>::new(tx);
// forward stream
let (tx, rx) = tokio::sync::mpsc::channel(100);
tokio::spawn(async move {
while let Some(Ok(msg)) = stream.next().await {
let _ = tx.send(T::try_from(msg)).await;
}
});
let stream = ReceiverStream::new(rx);
Some((sink, stream))
}
}
impl TryFrom<WSMessage> for CollabMessage {
type Error = WSError;
fn try_from(value: WSMessage) -> Result<Self, Self::Error> {
CollabMessage::from_vec(&value.payload).map_err(|e| WSError::Internal(Box::new(e)))
}
}

View File

@ -5,6 +5,7 @@ use actix_web::web::{Data, Path, Payload};
use actix_web::{get, web, HttpRequest, HttpResponse, Result, Scope};
use actix_web_actors::ws;
use secrecy::Secret;
use websocket::entities::WSUser;
use websocket::{CollabServer, CollabSession};
@ -20,12 +21,13 @@ pub async fn establish_ws_connection(
state: Data<State>,
server: Data<Addr<CollabServer>>,
) -> Result<HttpResponse> {
tracing::trace!("{:?}", request);
let user = LoggedUser::from_token(&state.config.application.server_key, token.as_str())?;
let client = CollabSession::new(user.into(), server.get_ref().clone());
match ws::start(client, &request, payload) {
Ok(response) => Ok(response),
Err(e) => {
tracing::error!("ws connection error: {:?}", e);
tracing::error!("🔴ws connection error: {:?}", e);
Err(e)
},
}

View File

@ -12,7 +12,7 @@ use actix_web::{dev::Server, web, web::Data, App, HttpServer};
use actix::Actor;
use collab_persistence::kv::rocks_kv::RocksCollabDB;
use collab_plugins::disk::kv::rocks_kv::RocksCollabDB;
use openssl::ssl::{SslAcceptor, SslAcceptorBuilder, SslFiletype, SslMethod};
use openssl::x509::X509;
use secrecy::{ExposeSecret, Secret};

View File

@ -1,8 +1,8 @@
use crate::component::auth::LoggedUser;
use crate::config::config::Config;
use chrono::{DateTime, Utc};
use collab_persistence::kv::rocks_kv::RocksCollabDB;
use collab_plugins::disk::kv::rocks_kv::RocksCollabDB;
use snowflake::Snowflake;
use sqlx::PgPool;
use std::collections::BTreeMap;

View File

@ -3,6 +3,7 @@ use tracing::subscriber::set_global_default;
use tracing::Subscriber;
use tracing_bunyan_formatter::{BunyanFormattingLayer, JsonStorageLayer};
use tracing_log::LogTracer;
use tracing_subscriber::fmt::MakeWriter;
use tracing_subscriber::{layer::SubscriberExt, EnvFilter};
@ -15,11 +16,18 @@ pub fn get_subscriber<Sink>(
where
Sink: for<'a> MakeWriter<'a> + Send + Sync + 'static,
{
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(env_filter));
// let env_filter = EnvFilter::new(env_filter);
let env_filter = match EnvFilter::try_from_default_env() {
Ok(env_filter) => {
dbg!("Using default env filter");
env_filter
},
Err(_) => EnvFilter::new(env_filter),
};
let formatting_layer = BunyanFormattingLayer::new(name, sink);
tracing_subscriber::fmt()
.with_ansi(true)
.with_target(true)
.with_max_level(tracing::Level::TRACE)
.finish()
.with(env_filter)
.with(JsonStorageLayer)

View File

@ -2,9 +2,17 @@ use appflowy_server::application::{init_state, Application};
use appflowy_server::config::config::{get_configuration, DatabaseSetting};
use appflowy_server::state::State;
use appflowy_server::telemetry::{get_subscriber, init_subscriber};
use collab::core::collab::MutexCollab;
use collab::core::origin::CollabOrigin;
use collab_plugins::disk::keys::make_collab_id_key;
use collab_plugins::disk::rocksdb_server::RocksdbServerDiskPlugin;
use collab_plugins::sync::server::{CollabId, COLLAB_ID_LEN};
use once_cell::sync::Lazy;
use reqwest::Certificate;
use std::path::PathBuf;
use std::sync::Arc;
use appflowy_server::component::auth::{RegisterResponse, HEADER_TOKEN};
use sqlx::types::Uuid;
@ -12,11 +20,13 @@ use sqlx::{Connection, Executor, PgConnection, PgPool};
// Ensure that the `tracing` stack is only initialised once using `once_cell`
static TRACING: Lazy<()> = Lazy::new(|| {
let level = "trace".to_string();
let level = "debug".to_string();
let mut filters = vec![];
filters.push(format!("appflowy_server={}", level));
filters.push(format!("collab_client_ws={}", level));
filters.push(format!("hyper={}", level));
filters.push(format!("websocket={}", level));
filters.push(format!("collab_sync={}", level));
// filters.push(format!("hyper={}", level));
filters.push(format!("actix_web={}", level));
let subscriber_name = "test".to_string();
@ -89,6 +99,25 @@ impl TestServer {
.await
.expect("Change password failed")
}
pub fn get_doc(&self, object_id: &str) -> serde_json::Value {
let collab = MutexCollab::new(CollabOrigin::Empty, object_id, vec![]);
let collab_id = self.collab_id_from_object_id(object_id);
let plugin = RocksdbServerDiskPlugin::new(collab_id, self.state.rocksdb.clone()).unwrap();
collab.lock().add_plugin(Arc::new(plugin));
collab.initial();
let collab = collab.lock();
collab.to_json_value()
}
pub fn collab_id_from_object_id(&self, object_id: &str) -> CollabId {
let read_txn = self.state.rocksdb.read_txn();
let collab_key = make_collab_id_key(object_id.as_ref());
let value = read_txn.get(collab_key.as_ref()).unwrap().unwrap();
let mut bytes = [0; COLLAB_ID_LEN];
bytes[0..COLLAB_ID_LEN].copy_from_slice(value.as_ref());
CollabId::from_be_bytes(bytes)
}
}
pub async fn spawn_server() -> TestServer {

90
tests/ws/client.rs Normal file
View File

@ -0,0 +1,90 @@
use collab::core::collab::MutexCollab;
use collab::core::origin::{CollabClient, CollabOrigin};
use collab_client_ws::{WSClient, WSMessageHandler};
use collab_plugins::disk::kv::rocks_kv::RocksCollabDB;
use collab_plugins::disk::rocksdb::RocksdbDiskPlugin;
use collab_plugins::sync::SyncPlugin;
use std::net::SocketAddr;
use std::ops::Deref;
use std::path::PathBuf;
use std::sync::Arc;
use tempfile::TempDir;
pub async fn spawn_client(
uid: i64,
object_id: &str,
address: String,
) -> std::io::Result<TestClient> {
let ws_client = WSClient::new(address, 100);
let addr = ws_client.connect().await.unwrap().unwrap();
let origin = origin_from_tcp_stream(&addr);
let handler = ws_client.subscribe("collab".to_string()).await.unwrap();
//
let (sink, stream) = (handler.sink(), handler.stream());
let collab = Arc::new(MutexCollab::new(origin.clone(), object_id, vec![]));
let sync_plugin = SyncPlugin::new(origin, object_id, collab.clone(), sink, stream);
collab.lock().add_plugin(Arc::new(sync_plugin));
// disk
let tempdir = TempDir::new().unwrap();
let db_path = tempdir.into_path();
let db = Arc::new(RocksCollabDB::open(db_path.clone()).unwrap());
let disk_plugin = RocksdbDiskPlugin::new(uid, db.clone()).unwrap();
collab.lock().add_plugin(Arc::new(disk_plugin));
collab.initial();
let cleaner = Cleaner::new(db_path);
Ok(TestClient {
ws_client,
db,
collab,
cleaner,
handlers: vec![handler],
})
}
fn origin_from_tcp_stream(addr: &SocketAddr) -> CollabOrigin {
let origin = CollabClient::new(addr.port() as i64, &addr.to_string());
CollabOrigin::Client(origin)
}
pub struct TestClient {
#[allow(dead_code)]
ws_client: WSClient,
pub db: Arc<RocksCollabDB>,
pub collab: Arc<MutexCollab>,
#[allow(dead_code)]
cleaner: Cleaner,
#[allow(dead_code)]
handlers: Vec<Arc<WSMessageHandler>>,
}
struct Cleaner(PathBuf);
impl Cleaner {
fn new(dir: PathBuf) -> Self {
Cleaner(dir)
}
fn cleanup(dir: &PathBuf) {
let _ = std::fs::remove_dir_all(dir);
}
}
impl Drop for Cleaner {
fn drop(&mut self) {
Self::cleanup(&self.0)
}
}
impl Deref for TestClient {
type Target = Arc<MutexCollab>;
fn deref(&self) -> &Self::Target {
&self.collab
}
}

View File

@ -1 +1,2 @@
mod client;
mod test;

32
tests/ws/test.rs Normal file
View File

@ -0,0 +1,32 @@
use crate::util::{spawn_server, TestUser};
use crate::ws::client::spawn_client;
use serde_json::json;
use std::time::Duration;
#[actix_rt::test]
async fn ws_conn_test() {
let server = spawn_server().await;
let test_user = TestUser::generate();
let token = test_user.register(&server).await;
let address = format!("{}/{}", server.ws_addr, token);
let client = spawn_client(1, "1", address).await.unwrap();
wait_a_sec().await;
{
let collab = client.lock();
collab.insert("1", "a");
}
wait_a_sec().await;
let value = server.get_doc("1");
assert_json_diff::assert_json_eq!(
value,
json!({
"1": "a"
})
);
}
async fn wait_a_sec() {
tokio::time::sleep(Duration::from_secs(2)).await;
}