Last active
October 14, 2023 23:59
-
-
Save matthewjberger/c3df3e35e151b61cfd409d9e1c3dd118 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//! Tokio Async Topic-based Pub/Sub Messaging | |
//! | |
//! This module provides a broker-client communication system where | |
//! multiple clients can communicate with each other through a central broker. | |
//! The broker is responsible for routing messages between clients based on topics. | |
//! | |
//! # Structures | |
//! | |
//! - `Broker`: Represents the central message router. It manages topics and routes messages between subscribers. | |
//! - `Client`: Represents a client that can subscribe to topics, send, and receive messages through the broker. | |
//! - `Message`: Represents different types of message actions like subscribing to a topic or publishing content. | |
//! | |
//! # Communication Model | |
//! | |
//! Clients communicate with the broker through channels provided by the `tokio` library. | |
//! When a client wishes to send a message or subscribe to a topic, it sends a `Message` to the broker. | |
//! The broker processes these messages, updates its internal state, and forwards appropriate messages to the intended recipients. | |
//! | |
//! # Usage | |
//! | |
//! 1. Create an instance of the `Broker`. | |
//! 2. Create multiple `Client` instances, passing the broker's transmitter channel. | |
//! 3. Clients can now `publish` messages to topics and `receive` messages from other clients on subscribed topics. | |
use std::collections::HashMap; | |
use tokio::sync::{broadcast, mpsc}; | |
#[derive(Debug, Clone)] | |
enum Message { | |
Subscribe(String, mpsc::Sender<broadcast::Receiver<String>>), | |
Publish(String, String), // Topic and content | |
} | |
struct Broker { | |
topics: HashMap<String, broadcast::Sender<String>>, | |
} | |
impl Broker { | |
fn new() -> Self { | |
let topics = HashMap::new(); | |
Broker { topics } | |
} | |
async fn run(&mut self, mut worker_rx: mpsc::Receiver<Message>) { | |
while let Some(msg) = worker_rx.recv().await { | |
match msg { | |
Message::Subscribe(topic, tx) => { | |
let broadcast_tx = self | |
.topics | |
.entry(topic) | |
.or_insert_with(|| { | |
let (broadcast_tx, _) = broadcast::channel(32); | |
broadcast_tx | |
}) | |
.clone(); | |
let _ = tx.send(broadcast_tx.subscribe()).await; | |
} | |
Message::Publish(topic, content) => { | |
if let Some(broadcast_tx) = self.topics.get(&topic) { | |
let _ = broadcast_tx.send(content); | |
} | |
} | |
} | |
} | |
} | |
} | |
struct Client { | |
broker_tx: mpsc::Sender<Message>, | |
subscriptions: HashMap<String, broadcast::Receiver<String>>, | |
} | |
impl Client { | |
fn new(broker_tx: mpsc::Sender<Message>) -> Self { | |
let subscriptions = HashMap::new(); | |
Client { | |
broker_tx, | |
subscriptions, | |
} | |
} | |
async fn subscribe(&mut self, topic: String) { | |
let (tx, mut rx) = mpsc::channel(1); | |
let _ = self.broker_tx.send(Message::Subscribe(topic.clone(), tx)).await; | |
if let Some(subscription) = rx.recv().await { | |
self.subscriptions.insert(topic, subscription); | |
} | |
} | |
async fn publish(&self, topic: String, message: String) { | |
let _ = self.broker_tx.send(Message::Publish(topic, message)).await; | |
} | |
async fn receive(&mut self, topic: &str) -> Option<String> { | |
if let Some(rx) = self.subscriptions.get_mut(topic) { | |
rx.recv().await.ok() | |
} else { | |
None | |
} | |
} | |
} | |
#[cfg(test)] | |
mod tests { | |
use super::*; | |
use tokio::time::{sleep, Duration}; | |
#[tokio::test] | |
async fn test_topic_subscription() { | |
let mut broker = Broker::new(); | |
let (worker_tx, worker_rx) = mpsc::channel(32); | |
tokio::spawn(async move { | |
broker.run(worker_rx).await; | |
}); | |
let mut client1 = Client::new(worker_tx.clone()); | |
let mut client2 = Client::new(worker_tx.clone()); | |
client1.subscribe("topic1".to_string()).await; | |
client2.subscribe("topic1".to_string()).await; | |
client1.publish("topic1".to_string(), "Hello!".to_string()).await; | |
let msg_from_client2 = client2.receive("topic1").await; | |
assert_eq!(msg_from_client2, Some("Hello!".to_string())); | |
} | |
#[tokio::test] | |
async fn test_multiple_topics() { | |
let mut broker = Broker::new(); | |
let (worker_tx, worker_rx) = mpsc::channel(32); | |
tokio::spawn(async move { | |
broker.run(worker_rx).await; | |
}); | |
let mut client = Client::new(worker_tx.clone()); | |
client.subscribe("topic1".to_string()).await; | |
client.subscribe("topic2".to_string()).await; | |
client.publish("topic1".to_string(), "Message1".to_string()).await; | |
client.publish("topic2".to_string(), "Message2".to_string()).await; | |
let msg1 = client.receive("topic1").await; | |
let msg2 = client.receive("topic2").await; | |
assert_eq!(msg1, Some("Message1".to_string())); | |
assert_eq!(msg2, Some("Message2".to_string())); | |
} | |
#[tokio::test] | |
async fn test_no_subscription_no_receive() { | |
let mut broker = Broker::new(); | |
let (worker_tx, worker_rx) = mpsc::channel(32); | |
tokio::spawn(async move { | |
broker.run(worker_rx).await; | |
}); | |
let mut client = Client::new(worker_tx.clone()); | |
// Client doesn't subscribe to any topic | |
client.publish("topic1".to_string(), "Hello!".to_string()).await; | |
let msg = client.receive("topic1").await; | |
assert_eq!(msg, None); | |
} | |
#[tokio::test] | |
async fn test_multiple_clients_multiple_topics() { | |
let mut broker = Broker::new(); | |
let (worker_tx, worker_rx) = mpsc::channel(32); | |
tokio::spawn(async move { | |
broker.run(worker_rx).await; | |
}); | |
let mut client1 = Client::new(worker_tx.clone()); | |
let mut client2 = Client::new(worker_tx.clone()); | |
let mut client3 = Client::new(worker_tx.clone()); | |
client1.subscribe("topic1".to_string()).await; | |
client2.subscribe("topic2".to_string()).await; | |
client3.subscribe("topic1".to_string()).await; | |
client3.subscribe("topic2".to_string()).await; | |
client1.publish("topic1".to_string(), "Message1".to_string()).await; | |
client2.publish("topic2".to_string(), "Message2".to_string()).await; | |
let msg1_client3 = client3.receive("topic1").await; | |
let msg2_client3 = client3.receive("topic2").await; | |
assert_eq!(msg1_client3, Some("Message1".to_string())); | |
assert_eq!(msg2_client3, Some("Message2".to_string())); | |
} | |
#[tokio::test] | |
async fn test_message_ordering() { | |
let mut broker = Broker::new(); | |
let (worker_tx, worker_rx) = mpsc::channel(32); | |
tokio::spawn(async move { | |
broker.run(worker_rx).await; | |
}); | |
let mut client1 = Client::new(worker_tx.clone()); | |
let mut client2 = Client::new(worker_tx.clone()); | |
client2.subscribe("topic1".to_string()).await; | |
client1.publish("topic1".to_string(), "Message1".to_string()).await; | |
sleep(Duration::from_millis(50)).await; // Introducing a delay to ensure ordering | |
client1.publish("topic1".to_string(), "Message2".to_string()).await; | |
let msg1 = client2.receive("topic1").await; | |
let msg2 = client2.receive("topic1").await; | |
assert_eq!(msg1, Some("Message1".to_string())); | |
assert_eq!(msg2, Some("Message2".to_string())); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment