Last active
September 7, 2023 16:04
-
-
Save matthewjberger/8cbe725cd9ce12160daa485b8293efef to your computer and use it in GitHub Desktop.
An in-process pub/sub broker in rust that is compatible with wasm - https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=e4020497380705bc565b2871972c8138
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::{ | |
cell::RefCell, | |
collections::{HashMap, VecDeque}, | |
rc::{Rc, Weak}, | |
}; | |
use uuid::Uuid; | |
pub type ClientHandle<T> = Rc<RefCell<Client<T>>>; | |
pub struct Client<T> { | |
id: Uuid, | |
event_queue: RefCell<VecDeque<T>>, | |
ring_buffer_size: usize, | |
} | |
impl<T> Default for Client<T> { | |
fn default() -> Self { | |
Self { | |
id: Uuid::new_v4(), | |
event_queue: RefCell::new(VecDeque::new()), | |
ring_buffer_size: 100, | |
} | |
} | |
} | |
impl<T> Client<T> { | |
pub fn new() -> Rc<RefCell<Self>> { | |
Rc::new(RefCell::new(Self::default())) | |
} | |
pub fn with_ring_buffer_size(size: usize) -> Rc<RefCell<Self>> { | |
Rc::new(RefCell::new(Self { | |
ring_buffer_size: size, | |
..Default::default() | |
})) | |
} | |
pub fn id(&self) -> Uuid { | |
self.id | |
} | |
pub fn next_message(&self) -> Option<T> { | |
self.event_queue.borrow_mut().pop_front() | |
} | |
} | |
#[derive(Default)] | |
pub struct Broker<T: Clone> { | |
subscribers: HashMap<String, Vec<Weak<RefCell<Client<T>>>>>, | |
} | |
impl<T: Clone> Broker<T> { | |
pub fn new() -> Self { | |
Self { | |
subscribers: HashMap::new(), | |
} | |
} | |
pub fn subscribe(&mut self, topic: &str, client: &Rc<RefCell<Client<T>>>) { | |
let client_weak = Rc::downgrade(client); | |
self.subscribers | |
.entry(topic.to_string()) | |
.or_default() | |
.push(client_weak); | |
} | |
pub fn unsubscribe(&mut self, topic: &str, client_id: Uuid) -> Result<(), &'static str> { | |
if let Some(subscribers) = self.subscribers.get_mut(topic) { | |
subscribers.retain(|subscriber| { | |
if let Some(subscriber) = subscriber.upgrade() { | |
subscriber.borrow().id() != client_id | |
} else { | |
false | |
} | |
}); | |
Ok(()) | |
} else { | |
Err("TopicNotFound") | |
} | |
} | |
pub fn publish(&mut self, topic: &str, message: T) { | |
if let Some(subscribers) = self.subscribers.get_mut(topic) { | |
// Use retain to filter out the expired weak references | |
subscribers.retain(|subscriber_weak| { | |
if let Some(subscriber_strong) = subscriber_weak.upgrade() { | |
let subscriber = subscriber_strong.borrow_mut(); | |
// Access VecDeque methods by borrowing the RefCell | |
if subscriber.event_queue.borrow().len() == subscriber.ring_buffer_size { | |
subscriber.event_queue.borrow_mut().pop_front(); | |
} | |
subscriber | |
.event_queue | |
.borrow_mut() | |
.push_back(message.clone()); | |
true | |
} else { | |
false // Drop the weak reference if it's no longer valid | |
} | |
}); | |
// Remove the topic entry if there are no subscribers left | |
if subscribers.is_empty() { | |
self.subscribers.remove(topic); | |
} | |
} | |
} | |
} | |
#[cfg(test)] | |
mod tests { | |
use super::{Broker, Client}; | |
#[derive(Debug, Clone, PartialEq)] | |
pub struct Message { | |
content: String, | |
} | |
impl Message { | |
pub fn new(content: &str) -> Self { | |
Self { | |
content: content.to_string(), | |
} | |
} | |
} | |
#[test] | |
fn test_single_client_receive_message() { | |
let mut broker = Broker::new(); | |
let client1 = Client::new(); | |
broker.subscribe("topic1", &client1); | |
broker.publish("topic1", Message::new("hello world")); | |
assert_eq!( | |
client1.borrow().next_message().unwrap().content, | |
"hello world" | |
); | |
} | |
#[test] | |
fn test_multiple_subscribers_receive_message() { | |
let mut broker = Broker::new(); | |
let client1 = Client::new(); | |
let client2 = Client::new(); | |
broker.subscribe("topic1", &client1); | |
broker.subscribe("topic1", &client2); | |
broker.publish("topic1", Message::new("hello world")); | |
assert_eq!( | |
client1.borrow().next_message().unwrap().content, | |
"hello world" | |
); | |
assert_eq!( | |
client2.borrow().next_message().unwrap().content, | |
"hello world" | |
); | |
} | |
#[test] | |
fn test_unsubscribe() { | |
let mut broker = Broker::new(); | |
let client1 = Client::new(); | |
let client2 = Client::new(); | |
broker.subscribe("topic1", &client1); | |
broker.subscribe("topic1", &client2); | |
broker.unsubscribe("topic1", client1.borrow().id()).unwrap(); | |
broker.publish("topic1", Message::new("hello world")); | |
assert_eq!(client1.borrow().next_message(), None); | |
assert_eq!( | |
client2.borrow().next_message().unwrap().content, | |
"hello world" | |
); | |
} | |
#[test] | |
fn test_multiple_topics() { | |
let mut broker = Broker::new(); | |
let client = Client::new(); | |
broker.subscribe("topic1", &client); | |
broker.subscribe("topic2", &client); | |
broker.publish("topic1", Message::new("hello topic1")); | |
broker.publish("topic2", Message::new("hello topic2")); | |
assert_eq!( | |
client.borrow().next_message().unwrap().content, | |
"hello topic1" | |
); | |
assert_eq!( | |
client.borrow().next_message().unwrap().content, | |
"hello topic2" | |
); | |
} | |
#[test] | |
fn test_ring_buffer() { | |
let mut broker = Broker::new(); | |
let client = Client::with_ring_buffer_size(2); // set ring buffer size to 2 | |
broker.subscribe("topic1", &client); | |
broker.publish("topic1", Message::new("message1")); | |
broker.publish("topic1", Message::new("message2")); | |
broker.publish("topic1", Message::new("message3")); | |
// Expecting the oldest message to be discarded due to ring buffer | |
assert_eq!(client.borrow().next_message().unwrap().content, "message2"); | |
assert_eq!(client.borrow().next_message().unwrap().content, "message3"); | |
} | |
#[test] | |
fn usage_example() { | |
// Create a new broker | |
let mut broker = Broker::new(); | |
// Create a client with a specified ring buffer size | |
let client = Client::with_ring_buffer_size(5); | |
// Subscribe the client to a topic | |
broker.subscribe("news", &client); | |
// The broker publishes a message to the topic | |
broker.publish("news", Message::new("Breaking news!")); | |
// The client retrieves the message from its ring buffer | |
assert_eq!( | |
client.borrow().next_message().unwrap().content, | |
"Breaking news!" | |
); | |
} | |
#[test] | |
fn test_weak_reference_cleanup() { | |
let mut broker = Broker::new(); | |
// Subscribe a client to a topic | |
{ | |
let client = Client::new(); | |
broker.subscribe("topic1", &client); | |
// Ensure there's a subscriber for "topic1" | |
assert!(broker.subscribers.contains_key("topic1")); | |
// Simulating a message publish | |
broker.publish("topic1", Message::new("Test")); | |
// Ensure the client received the message | |
assert_eq!(client.borrow().next_message().unwrap().content, "Test"); | |
} // Client goes out of scope here and should be dropped | |
// The weak reference to the client should not be upgradeable now | |
if let Some(subscribers) = broker.subscribers.get("topic1") { | |
assert!(subscribers[0].upgrade().is_none()); | |
} else { | |
panic!("Topic 'topic1' should still exist at this point."); | |
} | |
// Simulating another publish to trigger the weak reference cleanup | |
broker.publish("topic1", Message::new("Test 2")); | |
// Check if weak reference cleanup worked by checking the subscribers for "topic1" | |
assert!(!broker.subscribers.contains_key("topic1")); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment