use crate::error::StreamError; use async_trait::async_trait; use redis::aio::ConnectionManager; use redis::Value; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tracing::log; const RELEASE_SCRIPT: &str = r#" if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("DEL", KEYS[1]) else return 0 end "#; pub struct LeaseAcquisition { conn: Option, stream_key: String, token: u128, } impl LeaseAcquisition { pub async fn release(&mut self) -> Result { if let Some(conn) = self.conn.take() { Self::release_internal(conn, &self.stream_key, self.token).await } else { Ok(false) } } async fn release_internal>( mut conn: ConnectionManager, stream_key: S, token: u128, ) -> Result { let script = redis::Script::new(RELEASE_SCRIPT); let result: i32 = script .key(stream_key.as_ref()) .arg(token.to_le_bytes().as_slice()) .invoke_async(&mut conn) .await?; Ok(result == 1) } } impl Drop for LeaseAcquisition { fn drop(&mut self) { if let Some(conn) = self.conn.take() { let stream_key = self.stream_key.clone(); let token = self.token; tokio::spawn(async move { if let Err(err) = Self::release_internal(conn, stream_key, token).await { log::error!("error while releasing lease (drop): {}", err); } }); } } } /// This is Redlock algorithm implementation. /// See: https://redis.io/docs/latest/commands/set#patterns #[async_trait] pub trait Lease { /// Attempt to acquire lease on a stream for a given time-to-live. /// Returns `None` if the lease could not be acquired. async fn lease( &self, stream_key: String, ttl: Duration, ) -> Result, StreamError>; } #[async_trait] impl Lease for ConnectionManager { async fn lease( &self, stream_key: String, ttl: Duration, ) -> Result, StreamError> { let mut conn = self.clone(); let ttl = ttl.as_millis() as u64; let token = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_millis(); tracing::trace!("acquiring lease `{}` for {}ms", stream_key, ttl); let result: Value = redis::cmd("SET") .arg(&stream_key) .arg(token.to_le_bytes().as_slice()) .arg("NX") .arg("PX") .arg(ttl) .query_async(&mut conn) .await?; match result { Value::Okay => Ok(Some(LeaseAcquisition { conn: Some(conn), stream_key, token, })), o => { tracing::trace!("lease locked: {:?}", o); Ok(None) }, } } } #[cfg(test)] mod test { use crate::lease::Lease; use redis::Client; #[tokio::test] async fn lease_acquisition() { let redis_client = Client::open("redis://localhost:6379").unwrap(); let conn = redis_client.get_connection_manager().await.unwrap(); let l1 = conn .lease("stream1".into(), std::time::Duration::from_secs(1)) .await .unwrap(); assert!(l1.is_some(), "should successfully acquire lease"); let l2 = conn .lease("stream1".into(), std::time::Duration::from_secs(1)) .await .unwrap(); assert!(l2.is_none(), "should fail to acquire lease"); l1.unwrap().release().await.unwrap(); let l3 = conn .lease("stream1".into(), std::time::Duration::from_secs(1)) .await .unwrap(); assert!( l3.is_some(), "should successfully acquire lease after it was released" ); } }