Skip to content

Instantly share code, notes, and snippets.

@matthewjberger
Last active October 14, 2023 23:59
Show Gist options
  • Save matthewjberger/c3df3e35e151b61cfd409d9e1c3dd118 to your computer and use it in GitHub Desktop.
Save matthewjberger/c3df3e35e151b61cfd409d9e1c3dd118 to your computer and use it in GitHub Desktop.
//! Tokio Async Topic-based Pub/Sub Messaging
//!
//! This module provides a broker-client communication system where
//! multiple clients can communicate with each other through a central broker.
//! The broker is responsible for routing messages between clients based on topics.
//!
//! # Structures
//!
//! - `Broker`: Represents the central message router. It manages topics and routes messages between subscribers.
//! - `Client`: Represents a client that can subscribe to topics, send, and receive messages through the broker.
//! - `Message`: Represents different types of message actions like subscribing to a topic or publishing content.
//!
//! # Communication Model
//!
//! Clients communicate with the broker through channels provided by the `tokio` library.
//! When a client wishes to send a message or subscribe to a topic, it sends a `Message` to the broker.
//! The broker processes these messages, updates its internal state, and forwards appropriate messages to the intended recipients.
//!
//! # Usage
//!
//! 1. Create an instance of the `Broker`.
//! 2. Create multiple `Client` instances, passing the broker's transmitter channel.
//! 3. Clients can now `publish` messages to topics and `receive` messages from other clients on subscribed topics.
use std::collections::HashMap;
use tokio::sync::{broadcast, mpsc};
#[derive(Debug, Clone)]
enum Message {
Subscribe(String, mpsc::Sender<broadcast::Receiver<String>>),
Publish(String, String), // Topic and content
}
struct Broker {
topics: HashMap<String, broadcast::Sender<String>>,
}
impl Broker {
fn new() -> Self {
let topics = HashMap::new();
Broker { topics }
}
async fn run(&mut self, mut worker_rx: mpsc::Receiver<Message>) {
while let Some(msg) = worker_rx.recv().await {
match msg {
Message::Subscribe(topic, tx) => {
let broadcast_tx = self
.topics
.entry(topic)
.or_insert_with(|| {
let (broadcast_tx, _) = broadcast::channel(32);
broadcast_tx
})
.clone();
let _ = tx.send(broadcast_tx.subscribe()).await;
}
Message::Publish(topic, content) => {
if let Some(broadcast_tx) = self.topics.get(&topic) {
let _ = broadcast_tx.send(content);
}
}
}
}
}
}
struct Client {
broker_tx: mpsc::Sender<Message>,
subscriptions: HashMap<String, broadcast::Receiver<String>>,
}
impl Client {
fn new(broker_tx: mpsc::Sender<Message>) -> Self {
let subscriptions = HashMap::new();
Client {
broker_tx,
subscriptions,
}
}
async fn subscribe(&mut self, topic: String) {
let (tx, mut rx) = mpsc::channel(1);
let _ = self.broker_tx.send(Message::Subscribe(topic.clone(), tx)).await;
if let Some(subscription) = rx.recv().await {
self.subscriptions.insert(topic, subscription);
}
}
async fn publish(&self, topic: String, message: String) {
let _ = self.broker_tx.send(Message::Publish(topic, message)).await;
}
async fn receive(&mut self, topic: &str) -> Option<String> {
if let Some(rx) = self.subscriptions.get_mut(topic) {
rx.recv().await.ok()
} else {
None
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::time::{sleep, Duration};
#[tokio::test]
async fn test_topic_subscription() {
let mut broker = Broker::new();
let (worker_tx, worker_rx) = mpsc::channel(32);
tokio::spawn(async move {
broker.run(worker_rx).await;
});
let mut client1 = Client::new(worker_tx.clone());
let mut client2 = Client::new(worker_tx.clone());
client1.subscribe("topic1".to_string()).await;
client2.subscribe("topic1".to_string()).await;
client1.publish("topic1".to_string(), "Hello!".to_string()).await;
let msg_from_client2 = client2.receive("topic1").await;
assert_eq!(msg_from_client2, Some("Hello!".to_string()));
}
#[tokio::test]
async fn test_multiple_topics() {
let mut broker = Broker::new();
let (worker_tx, worker_rx) = mpsc::channel(32);
tokio::spawn(async move {
broker.run(worker_rx).await;
});
let mut client = Client::new(worker_tx.clone());
client.subscribe("topic1".to_string()).await;
client.subscribe("topic2".to_string()).await;
client.publish("topic1".to_string(), "Message1".to_string()).await;
client.publish("topic2".to_string(), "Message2".to_string()).await;
let msg1 = client.receive("topic1").await;
let msg2 = client.receive("topic2").await;
assert_eq!(msg1, Some("Message1".to_string()));
assert_eq!(msg2, Some("Message2".to_string()));
}
#[tokio::test]
async fn test_no_subscription_no_receive() {
let mut broker = Broker::new();
let (worker_tx, worker_rx) = mpsc::channel(32);
tokio::spawn(async move {
broker.run(worker_rx).await;
});
let mut client = Client::new(worker_tx.clone());
// Client doesn't subscribe to any topic
client.publish("topic1".to_string(), "Hello!".to_string()).await;
let msg = client.receive("topic1").await;
assert_eq!(msg, None);
}
#[tokio::test]
async fn test_multiple_clients_multiple_topics() {
let mut broker = Broker::new();
let (worker_tx, worker_rx) = mpsc::channel(32);
tokio::spawn(async move {
broker.run(worker_rx).await;
});
let mut client1 = Client::new(worker_tx.clone());
let mut client2 = Client::new(worker_tx.clone());
let mut client3 = Client::new(worker_tx.clone());
client1.subscribe("topic1".to_string()).await;
client2.subscribe("topic2".to_string()).await;
client3.subscribe("topic1".to_string()).await;
client3.subscribe("topic2".to_string()).await;
client1.publish("topic1".to_string(), "Message1".to_string()).await;
client2.publish("topic2".to_string(), "Message2".to_string()).await;
let msg1_client3 = client3.receive("topic1").await;
let msg2_client3 = client3.receive("topic2").await;
assert_eq!(msg1_client3, Some("Message1".to_string()));
assert_eq!(msg2_client3, Some("Message2".to_string()));
}
#[tokio::test]
async fn test_message_ordering() {
let mut broker = Broker::new();
let (worker_tx, worker_rx) = mpsc::channel(32);
tokio::spawn(async move {
broker.run(worker_rx).await;
});
let mut client1 = Client::new(worker_tx.clone());
let mut client2 = Client::new(worker_tx.clone());
client2.subscribe("topic1".to_string()).await;
client1.publish("topic1".to_string(), "Message1".to_string()).await;
sleep(Duration::from_millis(50)).await; // Introducing a delay to ensure ordering
client1.publish("topic1".to_string(), "Message2".to_string()).await;
let msg1 = client2.receive("topic1").await;
let msg2 = client2.receive("topic1").await;
assert_eq!(msg1, Some("Message1".to_string()));
assert_eq!(msg2, Some("Message2".to_string()));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment