chore: merge with main
This commit is contained in:
commit
7c49359278
|
|
@ -1,4 +1,4 @@
|
|||
use anyhow::{anyhow, Error};
|
||||
use anyhow::anyhow;
|
||||
use collab::core::awareness;
|
||||
use std::future::Future;
|
||||
use std::iter::Take;
|
||||
|
|
@ -130,19 +130,18 @@ impl CollabBroadcast {
|
|||
pub fn subscribe<Sink, Stream, E>(
|
||||
&self,
|
||||
subscriber_origin: CollabOrigin,
|
||||
sink: Sink,
|
||||
mut sink: Sink,
|
||||
mut stream: Stream,
|
||||
modified_at: Arc<Mutex<Instant>>,
|
||||
) -> Subscription
|
||||
where
|
||||
Sink: SinkExt<CollabMessage> + Send + Sync + Unpin + 'static,
|
||||
Sink: SinkExt<CollabMessage> + Clone + Send + Sync + Unpin + 'static,
|
||||
Stream: StreamExt<Item = Result<CollabMessage, E>> + Send + Sync + Unpin + 'static,
|
||||
<Sink as futures_util::Sink<CollabMessage>>::Error: std::error::Error + Send + Sync,
|
||||
E: Into<anyhow::Error> + Send + Sync + 'static,
|
||||
{
|
||||
let cloned_origin = subscriber_origin.clone();
|
||||
trace!("[realtime]: new subscriber: {}", subscriber_origin);
|
||||
let sink = Arc::new(Mutex::new(sink));
|
||||
// Receive a update from the document observer and forward the update to all
|
||||
// connected subscribers using its Sink.
|
||||
let sink_stop_tx = {
|
||||
|
|
@ -199,7 +198,7 @@ impl CollabBroadcast {
|
|||
match result {
|
||||
Some(Ok(collab_msg)) => {
|
||||
if object_id == collab_msg.object_id() && collab_msg.payload().is_some() {
|
||||
handle_user_collab_message(&object_id, &sink, &collab_msg, &collab).await;
|
||||
handle_client_collab_message(&object_id, &mut sink, &collab_msg, &collab).await;
|
||||
if let Ok(mut modified_at) = modified_at.try_lock() {
|
||||
*modified_at = Instant::now();
|
||||
}
|
||||
|
|
@ -226,9 +225,10 @@ impl CollabBroadcast {
|
|||
}
|
||||
}
|
||||
|
||||
async fn handle_user_collab_message<Sink>(
|
||||
/// Handle the message sent from the client
|
||||
async fn handle_client_collab_message<Sink>(
|
||||
object_id: &str,
|
||||
sink: &Arc<Mutex<Sink>>,
|
||||
sink: &mut Sink,
|
||||
collab_msg: &CollabMessage,
|
||||
collab: &MutexCollab,
|
||||
) where
|
||||
|
|
@ -247,13 +247,11 @@ async fn handle_user_collab_message<Sink>(
|
|||
Ok(msg) => {
|
||||
let cloned_collab = collab.clone();
|
||||
let cloned_origin = origin.clone();
|
||||
let result = tokio::task::spawn_blocking(move || {
|
||||
handle_collab_message(&cloned_origin, &ServerSyncProtocol, &cloned_collab, msg)
|
||||
})
|
||||
.await;
|
||||
let result =
|
||||
handle_collab_message(&cloned_origin, &ServerSyncProtocol, &cloned_collab, msg);
|
||||
|
||||
match result {
|
||||
Ok(Ok(payload)) => match origin.as_ref() {
|
||||
Ok(payload) => match origin.as_ref() {
|
||||
None => warn!("Client message does not have a origin"),
|
||||
Some(origin) => {
|
||||
if let Some(msg_id) = collab_msg.msg_id() {
|
||||
|
|
@ -266,22 +264,14 @@ async fn handle_user_collab_message<Sink>(
|
|||
);
|
||||
|
||||
trace!("Send response to client: {}", resp);
|
||||
match sink.try_lock() {
|
||||
Ok(mut sink) => {
|
||||
if let Err(err) = sink.send(resp.into()).await {
|
||||
trace!("fail to send response to client: {}", err);
|
||||
}
|
||||
},
|
||||
Err(err) => error!("Requires sink lock failed: {:?}", err),
|
||||
if let Err(err) = sink.send(resp.into()).await {
|
||||
trace!("fail to send response to client: {}", err);
|
||||
}
|
||||
}
|
||||
},
|
||||
},
|
||||
Ok(Err(err)) => {
|
||||
error!("object id:{} =>{}", object_id, err);
|
||||
},
|
||||
Err(err) => {
|
||||
error!("internal error when handle user ws message: {}", err);
|
||||
error!("object id:{} =>{}", object_id, err);
|
||||
},
|
||||
}
|
||||
},
|
||||
|
|
@ -356,14 +346,14 @@ fn gen_awareness_update_message(
|
|||
Ok(update)
|
||||
}
|
||||
|
||||
pub struct SinkCollabMessageAction<'a, Sink> {
|
||||
pub sink: &'a Arc<tokio::sync::Mutex<Sink>>,
|
||||
pub struct SinkCollabMessageAction<'a, Sink: Clone> {
|
||||
pub sink: &'a Sink,
|
||||
pub message: CollabMessage,
|
||||
}
|
||||
|
||||
impl<'a, Sink> SinkCollabMessageAction<'a, Sink>
|
||||
where
|
||||
Sink: SinkExt<CollabMessage> + Send + Sync + Unpin + 'a,
|
||||
Sink: SinkExt<CollabMessage> + Clone + Send + Sync + Unpin + 'a,
|
||||
{
|
||||
pub fn run(self) -> Retry<Take<FixedInterval>, SinkCollabMessageAction<'a, Sink>> {
|
||||
let retry_strategy = FixedInterval::new(Duration::from_secs(2)).take(5);
|
||||
|
|
@ -373,19 +363,16 @@ where
|
|||
|
||||
impl<'a, Sink> Action for SinkCollabMessageAction<'a, Sink>
|
||||
where
|
||||
Sink: SinkExt<CollabMessage> + Send + Sync + Unpin + 'a,
|
||||
Sink: SinkExt<CollabMessage> + Clone + Send + Sync + Unpin + 'a,
|
||||
{
|
||||
type Future = Pin<Box<dyn Future<Output = Result<Self::Item, Self::Error>> + Send + Sync + 'a>>;
|
||||
type Item = ();
|
||||
type Error = RealtimeError;
|
||||
|
||||
fn run(&mut self) -> Self::Future {
|
||||
let sink = self.sink.clone();
|
||||
let mut sink = self.sink.clone();
|
||||
let message = self.message.clone();
|
||||
Box::pin(async move {
|
||||
let mut sink = sink
|
||||
.try_lock()
|
||||
.map_err(|err| RealtimeError::Internal(Error::from(err)))?;
|
||||
sink
|
||||
.send(message)
|
||||
.await
|
||||
|
|
|
|||
|
|
@ -228,7 +228,7 @@ where
|
|||
sink: Sink,
|
||||
stream: Stream,
|
||||
) where
|
||||
Sink: SinkExt<CollabMessage> + Send + Sync + Unpin + 'static,
|
||||
Sink: SinkExt<CollabMessage> + Clone + Send + Sync + Unpin + 'static,
|
||||
Stream: StreamExt<Item = Result<CollabMessage, E>> + Send + Sync + Unpin + 'static,
|
||||
<Sink as futures_util::Sink<CollabMessage>>::Error: std::error::Error + Send + Sync,
|
||||
E: Into<Error> + Send + Sync + 'static,
|
||||
|
|
|
|||
|
|
@ -46,8 +46,8 @@ impl RealtimeMetrics {
|
|||
}
|
||||
|
||||
pub fn record_mem_cache_usage(&self, size_in_bytes: usize) {
|
||||
let size_in_mb = size_in_bytes / (1024 * 1024);
|
||||
trace!("[metrics]: mem_cache_usage: {} MB", size_in_mb);
|
||||
let size_in_mb = size_in_bytes / 1024;
|
||||
trace!("[metrics]: mem_cache_usage: {} KB", size_in_mb);
|
||||
self.mem_cache_usage.set(size_in_mb as i64);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ use std::fmt::Debug;
|
|||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct UnboundedSenderSink<T>(pub tokio::sync::mpsc::UnboundedSender<T>);
|
||||
|
||||
impl<T> UnboundedSenderSink<T> {
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ use anyhow::anyhow;
|
|||
use dashmap::DashMap;
|
||||
use sqlx::PgPool;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
/// Manages access control.
|
||||
|
|
@ -32,6 +32,7 @@ use tokio::sync::broadcast;
|
|||
#[derive(Clone)]
|
||||
pub struct AccessControl {
|
||||
enforcer: Arc<AFEnforcer>,
|
||||
#[allow(dead_code)]
|
||||
access_control_metrics: Arc<AccessControlMetrics>,
|
||||
}
|
||||
|
||||
|
|
@ -97,12 +98,7 @@ impl AccessControl {
|
|||
where
|
||||
A: ToCasbinAction,
|
||||
{
|
||||
let start = Instant::now();
|
||||
let result = self.enforcer.enforce(uid, obj, act).await;
|
||||
self
|
||||
.access_control_metrics
|
||||
.record_enforce_duration(start.elapsed().as_millis() as u64);
|
||||
result
|
||||
self.enforcer.enforce(uid, obj, act).await
|
||||
}
|
||||
|
||||
pub async fn get_access_level(&self, uid: &i64, oid: &str) -> Option<AFAccessLevel> {
|
||||
|
|
@ -124,6 +120,38 @@ impl AccessControl {
|
|||
}
|
||||
}
|
||||
|
||||
/// policy in db:
|
||||
/// p = 1, 123, 1 (1 mean AFRole::Owner)
|
||||
/// p = 1, 456, 50 (50 mean AFAccessLevel::FullAccess)
|
||||
///
|
||||
/// role_definition in db:
|
||||
/// g = _, _
|
||||
/// af role:
|
||||
/// ["1", "delete"], ["1", "write"], ["1", "read"],
|
||||
/// ["2", "write"], ["2", "read"],
|
||||
/// ["3", "read"],
|
||||
/// af access level:
|
||||
/// ["10", "read"],
|
||||
/// ["20", "read"],
|
||||
/// ["30", "read"], ["30", "write"],
|
||||
/// ["50", "read"], ["50", "write"], ["50", "delete"]
|
||||
///
|
||||
/// matchers:
|
||||
/// r.sub == p.sub && p.obj == r.obj && g(p.act, r.act)
|
||||
///
|
||||
/// Example:
|
||||
/// request:
|
||||
/// 1. api/workspace/123, user=1, workspace_id=123 GET
|
||||
/// r = sub = 1, obj = 123, act =read
|
||||
/// p = sub = 1, obj = 123, act = 1
|
||||
///
|
||||
/// Evaluation:
|
||||
/// 1. Subject Match: r.sub == p.sub
|
||||
/// 2. Object Match: p.obj == r.obj
|
||||
/// 3. Action Permission: g(p.act, r.act) => g(1, read) => ["1", "read"]
|
||||
/// Result:
|
||||
/// Allow
|
||||
///
|
||||
pub const MODEL_CONF: &str = r###"
|
||||
[request_definition]
|
||||
r = sub, obj, act
|
||||
|
|
@ -133,13 +161,12 @@ p = sub, obj, act
|
|||
|
||||
[role_definition]
|
||||
g = _, _ # rule for action
|
||||
g2 = _, _ # rule for collab object id
|
||||
|
||||
[policy_effect]
|
||||
e = some(where (p.eft == allow))
|
||||
|
||||
[matchers]
|
||||
m = r.sub == p.sub && g2(p.obj, r.obj) && g(p.act, r.act)
|
||||
m = r.sub == p.sub && p.obj == r.obj && g(p.act, r.act)
|
||||
"###;
|
||||
|
||||
/// Represents the entity stored at the index of the access control policy.
|
||||
|
|
|
|||
|
|
@ -134,7 +134,6 @@ impl Adapter for PgAdapter {
|
|||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Grouping definition `g` of type `g`. See `model.conf`
|
||||
model.add_policies("g", "g", grouping_policies);
|
||||
self
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
use crate::biz::casbin::access_control::AccessControl;
|
||||
use crate::biz::casbin::access_control::{AccessControl, Action};
|
||||
use crate::biz::casbin::access_control::{ActionType, ObjectType};
|
||||
use actix_http::Method;
|
||||
use app_error::AppError;
|
||||
|
|
@ -69,31 +69,28 @@ impl CollabAccessControl for CollabAccessControlImpl {
|
|||
|
||||
async fn can_access_http_method(
|
||||
&self,
|
||||
_uid: &i64,
|
||||
_oid: &str,
|
||||
_method: &Method,
|
||||
uid: &i64,
|
||||
oid: &str,
|
||||
method: &Method,
|
||||
) -> Result<bool, AppError> {
|
||||
Ok(true)
|
||||
// let action = Action::from(method);
|
||||
// self
|
||||
// .access_control
|
||||
// .enforce(uid, &ObjectType::Collab(oid), action)
|
||||
// .await
|
||||
let action = Action::from(method);
|
||||
self
|
||||
.access_control
|
||||
.enforce(uid, &ObjectType::Collab(oid), action)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn can_send_collab_update(&self, _uid: &i64, _oid: &str) -> Result<bool, AppError> {
|
||||
Ok(true)
|
||||
// self
|
||||
// .access_control
|
||||
// .enforce(uid, &ObjectType::Collab(oid), Action::Write)
|
||||
// .await
|
||||
async fn can_send_collab_update(&self, uid: &i64, oid: &str) -> Result<bool, AppError> {
|
||||
self
|
||||
.access_control
|
||||
.enforce(uid, &ObjectType::Collab(oid), Action::Write)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn can_receive_collab_update(&self, _uid: &i64, _oid: &str) -> Result<bool, AppError> {
|
||||
Ok(true)
|
||||
// self
|
||||
// .access_control
|
||||
// .enforce(uid, &ObjectType::Collab(oid), Action::Read)
|
||||
// .await
|
||||
async fn can_receive_collab_update(&self, uid: &i64, oid: &str) -> Result<bool, AppError> {
|
||||
self
|
||||
.access_control
|
||||
.enforce(uid, &ObjectType::Collab(oid), Action::Read)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -160,6 +160,7 @@ impl AFEnforcer {
|
|||
.get_filtered_policy(POLICY_FIELD_INDEX_OBJECT, vec![obj.to_object_id()]);
|
||||
|
||||
if policies_for_object.is_empty() {
|
||||
self.enforcer_result_cache.insert(policy_key, true);
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -240,7 +240,7 @@ where
|
|||
Some(encoded_collab) => {
|
||||
event!(
|
||||
tracing::Level::DEBUG,
|
||||
"Get encoded collab:{} from redis",
|
||||
"Get encoded collab:{} from cache",
|
||||
params.object_id
|
||||
);
|
||||
Ok(encoded_collab)
|
||||
|
|
|
|||
|
|
@ -120,7 +120,9 @@ async fn test_collab_access_control_when_obj_not_exist(pool: PgPool) -> anyhow::
|
|||
let user = create_user(&pool).await?;
|
||||
|
||||
for method in [Method::GET, Method::POST, Method::PUT, Method::DELETE] {
|
||||
assert_can_access_http_method(&collab_access_control, &user.uid, "fake_id", method, true).await;
|
||||
assert_can_access_http_method(&collab_access_control, &user.uid, "fake_id", method, true)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
|
@ -160,7 +162,8 @@ async fn test_collab_access_control_access_http_method(pool: PgPool) -> anyhow::
|
|||
method,
|
||||
true,
|
||||
)
|
||||
.await;
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
assert!(
|
||||
|
|
@ -178,7 +181,8 @@ async fn test_collab_access_control_access_http_method(pool: PgPool) -> anyhow::
|
|||
Method::GET,
|
||||
true,
|
||||
)
|
||||
.await;
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// guest should not have write access
|
||||
assert_can_access_http_method(
|
||||
|
|
@ -188,7 +192,8 @@ async fn test_collab_access_control_access_http_method(pool: PgPool) -> anyhow::
|
|||
Method::POST,
|
||||
false,
|
||||
)
|
||||
.await;
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(
|
||||
!collab_access_control
|
||||
|
|
@ -242,7 +247,6 @@ async fn test_collab_access_control_send_receive_collab_update(pool: PgPool) ->
|
|||
.await;
|
||||
|
||||
// Need to wait for the listener(spawn_listen_on_workspace_member_change) to receive the event
|
||||
//
|
||||
sleep(Duration::from_secs(2)).await;
|
||||
|
||||
assert!(
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
use actix_http::Method;
|
||||
use anyhow::Context;
|
||||
use anyhow::{Context, Error};
|
||||
use app_error::ErrorCode;
|
||||
use appflowy_cloud::biz;
|
||||
use appflowy_cloud::biz::casbin::{CollabAccessControlImpl, WorkspaceAccessControlImpl};
|
||||
|
|
@ -276,7 +276,7 @@ pub async fn assert_can_access_http_method(
|
|||
object_id: &str,
|
||||
method: Method,
|
||||
expected: bool,
|
||||
) {
|
||||
) -> Result<(), Error> {
|
||||
let timeout_duration = Duration::from_secs(10);
|
||||
let retry_interval = Duration::from_millis(300);
|
||||
let mut retries = 0usize;
|
||||
|
|
@ -307,9 +307,8 @@ pub async fn assert_can_access_http_method(
|
|||
}
|
||||
};
|
||||
|
||||
timeout(timeout_duration, operation)
|
||||
.await
|
||||
.expect("Operation timed out");
|
||||
timeout(timeout_duration, operation).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn add_workspace_members_in_tx(
|
||||
|
|
|
|||
|
|
@ -434,6 +434,9 @@ async fn multiple_user_with_read_and_write_permission_edit_same_collab_test() {
|
|||
expected_json.insert(index.to_string(), s);
|
||||
}
|
||||
|
||||
// wait 5 seconds to make sure all the server broadcast the updates to all the clients
|
||||
sleep(Duration::from_secs(5)).await;
|
||||
|
||||
// all the clients should have the same collab object
|
||||
assert_json_include!(
|
||||
actual: json!(expected_json),
|
||||
|
|
|
|||
|
|
@ -467,7 +467,7 @@ async fn post_realtime_message_test() {
|
|||
let task = tokio::spawn(async move {
|
||||
let mut new_user = TestClient::new_user().await;
|
||||
// sleep 2 secs to make sure it do not trigger register user too fast in gotrue
|
||||
sleep(Duration::from_secs(i % 3)).await;
|
||||
sleep(Duration::from_secs(i % 5)).await;
|
||||
|
||||
let object_id = Uuid::new_v4().to_string();
|
||||
let workspace_id = new_user.workspace_id().await;
|
||||
|
|
|
|||
|
|
@ -64,7 +64,7 @@ async fn sign_up_oauth_not_available() {
|
|||
#[tokio::test]
|
||||
async fn concurrent_user_sign_up_test() {
|
||||
let mut tasks = Vec::new();
|
||||
for _i in 0..50 {
|
||||
for _i in 0..30 {
|
||||
let task = tokio::spawn(async move {
|
||||
let _ = TestClient::new_user().await;
|
||||
tokio::time::sleep(Duration::from_millis(300)).await;
|
||||
|
|
|
|||
Loading…
Reference in New Issue