Skip to content

Instantly share code, notes, and snippets.

@matthewjberger
Created July 4, 2024 22:48
Show Gist options
  • Save matthewjberger/9aad1f87fa292f4a4eb699445c4e4813 to your computer and use it in GitHub Desktop.
Save matthewjberger/9aad1f87fa292f4a4eb699445c4e4813 to your computer and use it in GitHub Desktop.
An in-process pub/sub message broker in rust using async_std
// [dependencies]
// async-std = { version = "1.12.0", features = ["attributes"] }
// futures = "0.3.30"
use async_std::{
future::timeout,
sync::{Arc, Mutex},
};
use futures::{
channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
futures::stream::StreamExt,
};
use std::{collections::HashMap, time::Duration};
pub type Subscriber<T> = UnboundedSender<T>;
pub struct Broker<T> {
subscribers: Arc<Mutex<HashMap<String, Vec<Subscriber<T>>>>>,
}
impl<T: Clone + Send + 'static> Broker<T> {
pub fn new() -> Self {
Self {
subscribers: Arc::new(Mutex::new(HashMap::new())),
}
}
pub async fn subscribe(&self, topic: &str, tx: Subscriber<T>) {
let mut subscribers = self.subscribers.lock().await;
subscribers
.entry(topic.to_string())
.or_insert_with(Vec::new)
.push(tx);
}
pub async fn publish(&self, topic: &str, msg: T) {
let subscribers = self.subscribers.lock().await;
if let Some(subs) = subscribers.get(topic) {
for sub in subs {
let _ = sub.unbounded_send(msg.clone());
}
}
}
}
pub struct Client<T> {
broker: Arc<Broker<T>>,
receiver: Arc<Mutex<UnboundedReceiver<T>>>,
sender: Subscriber<T>,
}
impl<T: Clone + Send + 'static> Client<T> {
pub fn new(broker: Arc<Broker<T>>) -> Self {
let (tx, rx) = unbounded();
Self {
broker,
receiver: Arc::new(Mutex::new(rx)),
sender: tx,
}
}
pub async fn subscribe(&self, topic: &str) {
self.broker.subscribe(topic, self.sender.clone()).await;
}
pub async fn next(&self) -> Option<T> {
let mut receiver = self.receiver.lock().await;
receiver.next().await
}
pub async fn try_next_message(&self, duration: Duration) -> Option<T> {
let mut receiver = self.receiver.lock().await;
match timeout(duration, receiver.next()).await {
Ok(Some(msg)) => Some(msg),
Ok(None) | Err(_) => None,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
#[async_std::test]
async fn test_pubsub() {
let broker = Arc::new(Broker::new());
let client1 = Client::new(broker.clone());
let client2 = Client::new(broker.clone());
client1.subscribe("topic1").await;
client2.subscribe("topic1").await;
broker.publish("topic1", "message1").await;
assert_eq!(client1.next().await.unwrap(), "message1");
assert_eq!(client2.next().await.unwrap(), "message1");
}
#[async_std::test]
async fn test_try_next_message() {
let broker = Arc::new(Broker::new());
let client = Client::new(broker.clone());
client.subscribe("topic1").await;
// Test timeout with no message published
let msg = client.try_next_message(Duration::from_secs(1)).await;
assert!(msg.is_none());
// Test with a message published
broker.publish("topic1", "message1").await;
let msg = client.try_next_message(Duration::from_secs(1)).await;
assert_eq!(msg.unwrap(), "message1");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment