Skip to content

Instantly share code, notes, and snippets.

@matthewjberger
Last active September 7, 2023 16:04
Show Gist options
  • Save matthewjberger/8cbe725cd9ce12160daa485b8293efef to your computer and use it in GitHub Desktop.
Save matthewjberger/8cbe725cd9ce12160daa485b8293efef to your computer and use it in GitHub Desktop.
An in-process pub/sub broker in rust that is compatible with wasm - https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=e4020497380705bc565b2871972c8138
use std::{
cell::RefCell,
collections::{HashMap, VecDeque},
rc::{Rc, Weak},
};
use uuid::Uuid;
pub type ClientHandle<T> = Rc<RefCell<Client<T>>>;
pub struct Client<T> {
id: Uuid,
event_queue: RefCell<VecDeque<T>>,
ring_buffer_size: usize,
}
impl<T> Default for Client<T> {
fn default() -> Self {
Self {
id: Uuid::new_v4(),
event_queue: RefCell::new(VecDeque::new()),
ring_buffer_size: 100,
}
}
}
impl<T> Client<T> {
pub fn new() -> Rc<RefCell<Self>> {
Rc::new(RefCell::new(Self::default()))
}
pub fn with_ring_buffer_size(size: usize) -> Rc<RefCell<Self>> {
Rc::new(RefCell::new(Self {
ring_buffer_size: size,
..Default::default()
}))
}
pub fn id(&self) -> Uuid {
self.id
}
pub fn next_message(&self) -> Option<T> {
self.event_queue.borrow_mut().pop_front()
}
}
#[derive(Default)]
pub struct Broker<T: Clone> {
subscribers: HashMap<String, Vec<Weak<RefCell<Client<T>>>>>,
}
impl<T: Clone> Broker<T> {
pub fn new() -> Self {
Self {
subscribers: HashMap::new(),
}
}
pub fn subscribe(&mut self, topic: &str, client: &Rc<RefCell<Client<T>>>) {
let client_weak = Rc::downgrade(client);
self.subscribers
.entry(topic.to_string())
.or_default()
.push(client_weak);
}
pub fn unsubscribe(&mut self, topic: &str, client_id: Uuid) -> Result<(), &'static str> {
if let Some(subscribers) = self.subscribers.get_mut(topic) {
subscribers.retain(|subscriber| {
if let Some(subscriber) = subscriber.upgrade() {
subscriber.borrow().id() != client_id
} else {
false
}
});
Ok(())
} else {
Err("TopicNotFound")
}
}
pub fn publish(&mut self, topic: &str, message: T) {
if let Some(subscribers) = self.subscribers.get_mut(topic) {
// Use retain to filter out the expired weak references
subscribers.retain(|subscriber_weak| {
if let Some(subscriber_strong) = subscriber_weak.upgrade() {
let subscriber = subscriber_strong.borrow_mut();
// Access VecDeque methods by borrowing the RefCell
if subscriber.event_queue.borrow().len() == subscriber.ring_buffer_size {
subscriber.event_queue.borrow_mut().pop_front();
}
subscriber
.event_queue
.borrow_mut()
.push_back(message.clone());
true
} else {
false // Drop the weak reference if it's no longer valid
}
});
// Remove the topic entry if there are no subscribers left
if subscribers.is_empty() {
self.subscribers.remove(topic);
}
}
}
}
#[cfg(test)]
mod tests {
use super::{Broker, Client};
#[derive(Debug, Clone, PartialEq)]
pub struct Message {
content: String,
}
impl Message {
pub fn new(content: &str) -> Self {
Self {
content: content.to_string(),
}
}
}
#[test]
fn test_single_client_receive_message() {
let mut broker = Broker::new();
let client1 = Client::new();
broker.subscribe("topic1", &client1);
broker.publish("topic1", Message::new("hello world"));
assert_eq!(
client1.borrow().next_message().unwrap().content,
"hello world"
);
}
#[test]
fn test_multiple_subscribers_receive_message() {
let mut broker = Broker::new();
let client1 = Client::new();
let client2 = Client::new();
broker.subscribe("topic1", &client1);
broker.subscribe("topic1", &client2);
broker.publish("topic1", Message::new("hello world"));
assert_eq!(
client1.borrow().next_message().unwrap().content,
"hello world"
);
assert_eq!(
client2.borrow().next_message().unwrap().content,
"hello world"
);
}
#[test]
fn test_unsubscribe() {
let mut broker = Broker::new();
let client1 = Client::new();
let client2 = Client::new();
broker.subscribe("topic1", &client1);
broker.subscribe("topic1", &client2);
broker.unsubscribe("topic1", client1.borrow().id()).unwrap();
broker.publish("topic1", Message::new("hello world"));
assert_eq!(client1.borrow().next_message(), None);
assert_eq!(
client2.borrow().next_message().unwrap().content,
"hello world"
);
}
#[test]
fn test_multiple_topics() {
let mut broker = Broker::new();
let client = Client::new();
broker.subscribe("topic1", &client);
broker.subscribe("topic2", &client);
broker.publish("topic1", Message::new("hello topic1"));
broker.publish("topic2", Message::new("hello topic2"));
assert_eq!(
client.borrow().next_message().unwrap().content,
"hello topic1"
);
assert_eq!(
client.borrow().next_message().unwrap().content,
"hello topic2"
);
}
#[test]
fn test_ring_buffer() {
let mut broker = Broker::new();
let client = Client::with_ring_buffer_size(2); // set ring buffer size to 2
broker.subscribe("topic1", &client);
broker.publish("topic1", Message::new("message1"));
broker.publish("topic1", Message::new("message2"));
broker.publish("topic1", Message::new("message3"));
// Expecting the oldest message to be discarded due to ring buffer
assert_eq!(client.borrow().next_message().unwrap().content, "message2");
assert_eq!(client.borrow().next_message().unwrap().content, "message3");
}
#[test]
fn usage_example() {
// Create a new broker
let mut broker = Broker::new();
// Create a client with a specified ring buffer size
let client = Client::with_ring_buffer_size(5);
// Subscribe the client to a topic
broker.subscribe("news", &client);
// The broker publishes a message to the topic
broker.publish("news", Message::new("Breaking news!"));
// The client retrieves the message from its ring buffer
assert_eq!(
client.borrow().next_message().unwrap().content,
"Breaking news!"
);
}
#[test]
fn test_weak_reference_cleanup() {
let mut broker = Broker::new();
// Subscribe a client to a topic
{
let client = Client::new();
broker.subscribe("topic1", &client);
// Ensure there's a subscriber for "topic1"
assert!(broker.subscribers.contains_key("topic1"));
// Simulating a message publish
broker.publish("topic1", Message::new("Test"));
// Ensure the client received the message
assert_eq!(client.borrow().next_message().unwrap().content, "Test");
} // Client goes out of scope here and should be dropped
// The weak reference to the client should not be upgradeable now
if let Some(subscribers) = broker.subscribers.get("topic1") {
assert!(subscribers[0].upgrade().is_none());
} else {
panic!("Topic 'topic1' should still exist at this point.");
}
// Simulating another publish to trigger the weak reference cleanup
broker.publish("topic1", Message::new("Test 2"));
// Check if weak reference cleanup worked by checking the subscribers for "topic1"
assert!(!broker.subscribers.contains_key("topic1"));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment