Created
September 4, 2024 20:54
-
-
Save matthewjberger/0e94c0c6e3a42a8a33b75fc9c8fe95cc to your computer and use it in GitHub Desktop.
Pub/sub broker messaging in-process without an async executor using Send + Sync compatible types
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::{ | |
collections::HashMap, | |
sync::{Arc, RwLock, Weak}, | |
}; | |
use std::{ | |
collections::VecDeque, | |
sync::{RwLockReadGuard, RwLockWriteGuard}, | |
}; | |
use uuid::Uuid; | |
pub type ClientHandle<T> = Arc<RwLock<Client<T>>>; | |
pub struct Client<T: Clone> { | |
id: Uuid, | |
event_queue: RwLock<VecDeque<T>>, | |
ring_buffer_size: usize, | |
} | |
impl<T: Clone> Default for Client<T> { | |
fn default() -> Self { | |
Self { | |
id: Uuid::new_v4(), | |
event_queue: RwLock::new(VecDeque::new()), | |
ring_buffer_size: 100, | |
} | |
} | |
} | |
impl<T: Clone> Client<T> { | |
pub fn new() -> Arc<RwLock<Self>> { | |
Arc::new(RwLock::new(Self::default())) | |
} | |
pub fn event_queue(&self) -> Option<RwLockReadGuard<VecDeque<T>>> { | |
self.event_queue.read().ok() | |
} | |
pub fn event_queue_mut(&self) -> Option<RwLockWriteGuard<VecDeque<T>>> { | |
self.event_queue.write().ok() | |
} | |
pub fn with_ring_buffer_size(size: usize) -> Arc<RwLock<Self>> { | |
Arc::new(RwLock::new(Self { | |
ring_buffer_size: size, | |
..Default::default() | |
})) | |
} | |
pub fn id(&self) -> Uuid { | |
self.id | |
} | |
pub fn ring_buffer_size(&self) -> usize { | |
self.ring_buffer_size | |
} | |
pub fn next_message(&self) -> Option<T> { | |
if let Ok(mut queue) = self.event_queue.write() { | |
queue.pop_front() | |
} else { | |
None | |
} | |
} | |
pub fn peek_message(&self) -> Option<T> { | |
if let Ok(queue) = self.event_queue.read() { | |
queue.front().cloned() | |
} else { | |
None | |
} | |
} | |
} | |
#[derive(Default)] | |
pub struct Broker<T: Clone> { | |
subscribers: HashMap<String, Vec<Weak<RwLock<Client<T>>>>>, | |
} | |
impl<T: Clone> Broker<T> { | |
pub fn new() -> Self { | |
Self { | |
subscribers: HashMap::new(), | |
} | |
} | |
pub fn subscribe(&mut self, topic: &str, client: &Arc<RwLock<Client<T>>>) { | |
let client_weak = Arc::downgrade(client); | |
self.subscribers | |
.entry(topic.to_string()) | |
.or_default() | |
.push(client_weak); | |
} | |
pub fn unsubscribe(&mut self, topic: &str, client_id: Uuid) -> Result<(), &'static str> { | |
if let Some(subscribers) = self.subscribers.get_mut(topic) { | |
subscribers.retain(|subscriber| { | |
if let Some(handle) = subscriber.upgrade() { | |
if let Ok(subscriber) = handle.read() { | |
subscriber.id() != client_id | |
} else { | |
false | |
} | |
} else { | |
false | |
} | |
}); | |
Ok(()) | |
} else { | |
Err("TopicNotFound") | |
} | |
} | |
pub fn publish(&mut self, topic: &str, message: T) { | |
if let Some(subscribers) = self.subscribers.get_mut(topic) { | |
// Use retain to filter out the expired weak references | |
subscribers.retain(|subscriber_weak| { | |
if let Some(subscriber_strong) = subscriber_weak.upgrade() { | |
// Attempt to acquire a write lock on the subscriber | |
if let Ok(subscriber) = subscriber_strong.write() { | |
let ring_buffer_size = subscriber.ring_buffer_size(); | |
let event_queue_len = subscriber.event_queue().map_or(0, |q| q.len()); | |
// Ensure there's space in the ring buffer | |
if event_queue_len == ring_buffer_size { | |
if let Some(mut event_queue) = subscriber.event_queue_mut() { | |
event_queue.pop_front(); | |
} | |
} | |
// Push the message into the event queue | |
if let Some(mut event_queue) = subscriber.event_queue_mut() { | |
event_queue.push_back(message.clone()); | |
} | |
true | |
} else { | |
false // Lock acquisition failed, drop the weak reference | |
} | |
} else { | |
false // Drop the weak reference if it's no longer valid | |
} | |
}); | |
// Remove the topic entry if there are no subscribers left | |
if subscribers.is_empty() { | |
self.subscribers.remove(topic); | |
} | |
} | |
} | |
} | |
#[cfg(test)] | |
mod tests { | |
use super::{Broker, Client}; | |
#[derive(Debug, Clone, PartialEq)] | |
pub struct Message { | |
content: String, | |
} | |
impl Message { | |
pub fn new(content: &str) -> Self { | |
Self { | |
content: content.to_string(), | |
} | |
} | |
} | |
#[test] | |
fn test_single_client_receive_message() { | |
let mut broker = Broker::new(); | |
let client1 = Client::new(); | |
broker.subscribe("topic1", &client1); | |
broker.publish("topic1", Message::new("hello world")); | |
assert_eq!( | |
client1.read().unwrap().next_message().unwrap().content, | |
"hello world" | |
); | |
} | |
#[test] | |
fn test_multiple_subscribers_receive_message() { | |
let mut broker = Broker::new(); | |
let client1 = Client::new(); | |
let client2 = Client::new(); | |
broker.subscribe("topic1", &client1); | |
broker.subscribe("topic1", &client2); | |
broker.publish("topic1", Message::new("hello world")); | |
assert_eq!( | |
client1.read().unwrap().next_message().unwrap().content, | |
"hello world" | |
); | |
assert_eq!( | |
client2.read().unwrap().next_message().unwrap().content, | |
"hello world" | |
); | |
} | |
#[test] | |
fn test_unsubscribe() { | |
let mut broker = Broker::new(); | |
let client1 = Client::new(); | |
let client2 = Client::new(); | |
broker.subscribe("topic1", &client1); | |
broker.subscribe("topic1", &client2); | |
broker | |
.unsubscribe("topic1", client1.read().unwrap().id()) | |
.unwrap(); | |
broker.publish("topic1", Message::new("hello world")); | |
assert_eq!(client1.read().unwrap().next_message(), None); | |
assert_eq!( | |
client2.read().unwrap().next_message().unwrap().content, | |
"hello world" | |
); | |
} | |
#[test] | |
fn test_multiple_topics() { | |
let mut broker = Broker::new(); | |
let client = Client::new(); | |
broker.subscribe("topic1", &client); | |
broker.subscribe("topic2", &client); | |
broker.publish("topic1", Message::new("hello topic1")); | |
broker.publish("topic2", Message::new("hello topic2")); | |
assert_eq!( | |
client.read().unwrap().next_message().unwrap().content, | |
"hello topic1" | |
); | |
assert_eq!( | |
client.read().unwrap().next_message().unwrap().content, | |
"hello topic2" | |
); | |
} | |
#[test] | |
fn test_ring_buffer() { | |
let mut broker = Broker::new(); | |
let client = Client::with_ring_buffer_size(2); // set ring buffer size to 2 | |
broker.subscribe("topic1", &client); | |
broker.publish("topic1", Message::new("message1")); | |
broker.publish("topic1", Message::new("message2")); | |
broker.publish("topic1", Message::new("message3")); | |
// Expecting the oldest message to be discarded due to ring buffer | |
assert_eq!( | |
client.read().unwrap().next_message().unwrap().content, | |
"message2" | |
); | |
assert_eq!( | |
client.read().unwrap().next_message().unwrap().content, | |
"message3" | |
); | |
} | |
#[test] | |
fn usage_example() { | |
// Create a new broker | |
let mut broker = Broker::new(); | |
// Create a client with a specified ring buffer size | |
let client = Client::with_ring_buffer_size(5); | |
// Subscribe the client to a topic | |
broker.subscribe("news", &client); | |
// The broker publishes a message to the topic | |
broker.publish("news", Message::new("Breaking news!")); | |
// The client retrieves the message from its ring buffer | |
assert_eq!( | |
client.read().unwrap().next_message().unwrap().content, | |
"Breaking news!" | |
); | |
} | |
#[test] | |
fn test_weak_reference_cleanup() { | |
let mut broker = Broker::new(); | |
// Subscribe a client to a topic | |
{ | |
let client = Client::new(); | |
broker.subscribe("topic1", &client); | |
// Ensure there's a subscriber for "topic1" | |
assert!(broker.subscribers.contains_key("topic1")); | |
// Simulating a message publish | |
broker.publish("topic1", Message::new("Test")); | |
// Ensure the client received the message | |
assert_eq!( | |
client.read().unwrap().next_message().unwrap().content, | |
"Test" | |
); | |
} // Client goes out of scope here and should be dropped | |
// The weak reference to the client should not be upgradeable now | |
if let Some(subscribers) = broker.subscribers.get("topic1") { | |
assert!(subscribers[0].upgrade().is_none()); | |
} else { | |
panic!("Topic 'topic1' should still exist at this point."); | |
} | |
// Simulating another publish to trigger the weak reference cleanup | |
broker.publish("topic1", Message::new("Test 2")); | |
// Check if weak reference cleanup worked by checking the subscribers for "topic1" | |
assert!(!broker.subscribers.contains_key("topic1")); | |
} | |
#[test] | |
fn test_peek_message() { | |
let mut broker = Broker::new(); | |
let client = Client::new(); | |
broker.subscribe("topic1", &client); | |
broker.publish("topic1", Message::new("peek this")); | |
// Peek the message | |
assert_eq!( | |
client.read().unwrap().peek_message().unwrap().content, | |
"peek this" | |
); | |
// Check if the message is still in the queue | |
assert_eq!( | |
client.read().unwrap().next_message().unwrap().content, | |
"peek this" | |
); | |
// Ensure the message queue is now empty after calling `next_message` | |
assert!(client.read().unwrap().next_message().is_none()); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment