Last active
July 16, 2024 04:48
-
-
Save mayakerostasia/0eedb3d90686513b36c2c4d04ee3a769 to your computer and use it in GitHub Desktop.
Rust Message bus
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#![allow(unused)] | |
use std::collections::HashMap; | |
use std::collections::VecDeque; | |
use std::thread::JoinHandle; | |
use tokio::sync::Mutex; use std::sync::Arc; | |
type MessageQueue = VecDeque<Message>; | |
type MailboxMap = HashMap<String, MessageQueue>; | |
type MessageQueuePointer = Arc<Mutex<MessageQueue>>; | |
type MailboxMapPointer = Arc<Mutex<MailboxMap>>; | |
type DistribPointer = Arc<Mutex<Distributor>>; | |
// The grandaddy poobah | |
struct MessageBus { | |
// _inc_msq_q: MessageQueue, | |
// _rsp_q: MessageQueue, | |
// _sub_mb: MailboxMap, | |
// _pub_mb: MailboxMap, | |
incoming_message_queue: MessageQueuePointer, | |
response_queue: MessageQueuePointer, | |
subscriber_mailboxes: MailboxMapPointer, | |
publisher_mailboxes: MailboxMapPointer, | |
distributor: DistribPointer, | |
response_distributor: DistribPointer | |
} | |
impl MessageBus { | |
fn new() -> Self { | |
let mut _inc_msq_q: MessageQueue = VecDeque::new(); | |
let mut incoming_message_queue: MessageQueuePointer = Arc::new(Mutex::new(_inc_msq_q)); | |
let mut _sub_mb: MailboxMap = HashMap::new(); | |
let mut subscriber_mailboxes: MailboxMapPointer = Arc::new(Mutex::new(_sub_mb)); | |
let mut distributor: Distributor = Distributor::new( | |
"distributor", | |
incoming_message_queue.clone(), | |
subscriber_mailboxes.clone() | |
); | |
let mut _rsp_q: MessageQueue = VecDeque::new(); | |
let mut response_queue: MessageQueuePointer = Arc::new(Mutex::new(_rsp_q)); | |
let mut _pub_mb: MailboxMap = HashMap::new(); | |
let mut publisher_mailboxes: MailboxMapPointer = Arc::new(Mutex::new(_pub_mb)); | |
let mut response_distrib: Distributor = Distributor::new( | |
"response_distributor", | |
response_queue.clone(), | |
publisher_mailboxes.clone(), | |
); | |
Self { | |
// _inc_msq_q: _inc_msq_q.clone(), | |
// _rsp_q: _rsp_q.clone(), | |
// _sub_mb: _sub_mb.clone(), | |
// _pub_mb: _pub_mb.clone(), | |
incoming_message_queue: incoming_message_queue, | |
response_queue: response_queue, | |
subscriber_mailboxes: subscriber_mailboxes, | |
publisher_mailboxes: publisher_mailboxes, | |
distributor: Arc::new(Mutex::new(distributor)), | |
response_distributor: Arc::new(Mutex::new(response_distrib)), | |
} | |
} | |
async fn publish(&mut self, publisher: Publisher) { | |
let mut lock = self.publisher_mailboxes.lock().await; | |
lock.insert(publisher.name, VecDeque::new()); | |
} | |
async fn subscribe(&mut self, subscriber: Subscriber) { | |
let mut lock = self.subscriber_mailboxes.lock().await; | |
lock.insert(subscriber.name, VecDeque::new()); | |
} | |
async fn send(&mut self, message: Message) { | |
let mut lock = self.incoming_message_queue.lock().await; | |
lock.push_back(message); | |
} | |
} | |
struct Distributor { | |
_name: String, | |
inc_msg_queue: MessageQueuePointer, | |
endpoint_map: MailboxMapPointer, | |
} | |
impl Distributor { | |
fn new(name: &str, inc_msg_queue: MessageQueuePointer, endpoint_map: MailboxMapPointer) -> Self { | |
let cls = Self { | |
_name: name.to_string(), | |
inc_msg_queue: inc_msg_queue, | |
endpoint_map: endpoint_map, | |
}; | |
// tokio::spawn(async { | |
// cls.distribute(); | |
// }); | |
cls | |
} | |
async fn distribute(&mut self) { | |
loop { | |
// println!("Distributing..."); | |
let mut msg_q = self.inc_msg_queue.lock().await; | |
if let Some(message) = msg_q.pop_front() { | |
println!("Message (Popping off Inbox Q) : {:?}", message); | |
let mut endpoint_map = self.endpoint_map.lock().await; | |
if let Some(endpoint) = endpoint_map.get(&message.to) { | |
let mut endpoint = endpoint.to_owned(); | |
println!("Message (in distrib) : {:?}", message); | |
endpoint.push_back(message.clone()); | |
} | |
} | |
} | |
// tokio::spawn (async move { | |
// loop { | |
// if let Some(message) = &mut self.inc_msg_queue.pop_front() { | |
// if let Some(endpoint) = self.endpoint_map.get(&message.to) { | |
// endpoint.push_back(message); | |
// } | |
// } | |
// } | |
// }); | |
} | |
} | |
// A message is a string - for now | |
#[derive(Debug, Clone)] | |
struct Message { | |
message: String, | |
to: String, | |
from: String, | |
} | |
// Basically who's going to be RECEIVING messages | |
struct Subscriber { | |
name: String | |
} | |
// Basically who's going to be SENDING messages | |
struct Publisher { | |
name: String | |
} | |
async fn run_distrib(distrib: DistribPointer) -> tokio::task::JoinHandle<()> { | |
let handle = tokio::spawn( async move { | |
let mut lock = distrib.lock().await; | |
lock.distribute().await; | |
}); | |
handle | |
} | |
#[tokio::main] | |
async fn main() { | |
let mut bus = MessageBus::new(); | |
let mut subscriber = Subscriber { | |
name: "subscriber".to_string() | |
}; | |
let mut publisher = Publisher { | |
name: "publisher".to_string() | |
}; | |
bus.subscribe(subscriber); | |
bus.publish(publisher);; | |
let mut message = Message { | |
message: "Hello, world!".to_string(), | |
to: "subscriber".to_string(), | |
from: "publisher".to_string(), | |
}; | |
bus.send(message.clone()).await; | |
println!("Message: {:?}", message); | |
run_distrib(bus.distributor.clone()).await; | |
run_distrib(bus.response_distributor.clone()).await; | |
println!("Message: {:?}", message); | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment