Last active
August 29, 2015 14:20
-
-
Save timonv/5cdc56bf671cee69d3fa to your computer and use it in GitHub Desktop.
Dispatcher
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use self::DispatchType::{ChangeCurrentChannel, OutgoingMessage, IncomingMessage}; | |
use std::collections::HashMap; | |
use std::sync::{mpsc, Arc, Mutex}; | |
use std::thread; | |
#[derive(PartialEq, Debug, Clone)] | |
enum DispatchType { | |
ChangeCurrentChannel, | |
OutgoingMessage, | |
IncomingMessage | |
} | |
#[derive(Clone)] | |
struct DispatchMessage { | |
dispatch_type: DispatchType, | |
payload: String | |
} | |
struct Dispatcher { | |
subscribers: HashMap<&'static str, Vec<mpsc::Sender<DispatchMessage>>>, | |
broadcasters: Vec<mpsc::Receiver<DispatchMessage>> | |
} | |
impl Dispatcher { | |
pub fn new() -> Dispatcher { | |
Dispatcher { subscribers: HashMap::new(), broadcasters: vec![] } | |
} | |
pub fn register_broadcaster(&mut self, broadcaster: &mut Broadcast) { | |
let handle = broadcaster.broadcast_handle(); | |
self.broadcasters.push(handle); | |
} | |
pub fn register_subscriber(&mut self, subscriber: &Subscribe) { | |
let sender = subscriber.subscribe(); | |
let type_key = type_to_str(&subscriber.what_subscribe()); | |
let new = match self.subscribers.get_mut(type_key) { | |
Some(others) => { | |
others.push(sender); | |
None | |
}, | |
None => { | |
Some(vec![sender]) | |
} | |
}; | |
new.and_then(|new_senders| self.subscribers.insert(type_key, new_senders)); | |
} | |
pub fn start(&self) { | |
let shared_subscribers = Arc::new(self.subscribers); | |
for ref broadcaster in &self.broadcasters { | |
let shared_broadcaster = Arc::new(broadcaster); | |
let broadcaster = shared_broadcaster.clone(); | |
let subscribers = shared_subscribers.clone(); | |
thread::spawn(move || { | |
loop { | |
let message = &broadcaster.recv().ok().expect("Couldn't receive message in broadcaster"); | |
match subscribers.get(type_to_str(&message.dispatch_type)) { | |
Some(ref subs) => { | |
for sub in subs.iter() { sub.send(*message).unwrap(); } | |
}, | |
None => () | |
} | |
} | |
}); | |
} | |
} | |
fn num_broadcasters(&self) -> usize { | |
self.broadcasters.len() | |
} | |
fn num_subscribers(&self, dispatch_type: DispatchType) -> usize { | |
match self.subscribers.get(type_to_str(&dispatch_type)) { | |
Some(subscribers) => subscribers.len(), | |
None => 0 | |
} | |
} | |
} | |
fn type_to_str(dispatch_type: &DispatchType) -> &'static str { | |
match *dispatch_type { | |
OutgoingMessage => "OutgoingMessage", | |
ChangeCurrentChannel => "ChangeCurrentChannel", | |
IncomingMessage => "IncomingMessage" | |
} | |
} | |
trait Broadcast { | |
fn broadcast(&self, dispatch_type: DispatchType, payload: String); | |
fn broadcast_handle(&mut self) -> mpsc::Receiver<DispatchMessage>; | |
} | |
trait Subscribe { | |
fn subscribe(&self) -> mpsc::Sender<DispatchMessage>; | |
fn what_subscribe(&self) -> DispatchType; | |
} | |
#[cfg(test)] | |
mod test { | |
use std::sync::mpsc; | |
use super::{ Dispatcher, Broadcast, Subscribe, DispatchMessage}; | |
use super::DispatchType::{self, OutgoingMessage}; | |
#[test] | |
fn test_register_broadcaster() { | |
let mut dispatcher = Dispatcher::new(); | |
let mut brd = TestBroadcaster::new(); | |
assert_eq!(dispatcher.num_broadcasters(), 0); | |
dispatcher.register_broadcaster(&mut brd); | |
assert_eq!(dispatcher.num_broadcasters(), 1); | |
} | |
#[test] | |
fn test_register_subscriber() { | |
let mut dispatcher = Dispatcher::new(); | |
let sub = TestSubscriber::new(); | |
assert_eq!(dispatcher.num_subscribers(OutgoingMessage), 0); | |
dispatcher.register_subscriber(&sub); | |
assert_eq!(dispatcher.num_subscribers(OutgoingMessage), 1); | |
} | |
#[test] | |
fn test_register_multiple_subscribers() { | |
let mut dispatcher = Dispatcher::new(); | |
let sub = TestSubscriber::new(); | |
let sub2 = TestSubscriber::new(); | |
assert_eq!(dispatcher.num_subscribers(OutgoingMessage), 0); | |
dispatcher.register_subscriber(&sub); | |
dispatcher.register_subscriber(&sub2); | |
assert_eq!(dispatcher.num_subscribers(OutgoingMessage), 2); | |
} | |
#[test] | |
fn test_broadcast_simple_message() { | |
let mut dispatcher = Dispatcher::new(); | |
let sub = TestSubscriber::new(); | |
let mut brd = TestBroadcaster::new(); | |
dispatcher.register_broadcaster(&mut brd); | |
dispatcher.register_subscriber(&sub); | |
dispatcher.start(); | |
brd.broadcast(OutgoingMessage, "Hello world!".to_string()); | |
let message = sub.receiver.recv().unwrap(); | |
assert_eq!(message.dispatch_type, OutgoingMessage); | |
assert_eq!(message.payload, "Hello world!"); | |
} | |
struct TestBroadcaster { | |
sender: Option<mpsc::Sender<DispatchMessage>> | |
} | |
impl TestBroadcaster { | |
fn new() -> TestBroadcaster { | |
TestBroadcaster { sender: None } | |
} | |
} | |
impl Broadcast for TestBroadcaster { | |
fn broadcast_handle(&mut self) -> mpsc::Receiver<DispatchMessage> { | |
let (tx, rx) = mpsc::channel::<DispatchMessage>(); | |
self.sender = Some(tx); | |
rx | |
} | |
fn broadcast(&self, dispatch_type: DispatchType, payload: String) { | |
let message = DispatchMessage { dispatch_type: dispatch_type, payload: payload }; | |
match self.sender { | |
Some(ref s) => { s.send(message); }, | |
None => () | |
}; | |
} | |
} | |
struct TestSubscriber { | |
receiver: mpsc::Receiver<DispatchMessage>, | |
sender: mpsc::Sender<DispatchMessage> | |
} | |
impl TestSubscriber { | |
fn new() -> TestSubscriber { | |
let(tx, rx) = mpsc::channel::<DispatchMessage>(); | |
TestSubscriber { receiver: rx, sender: tx } | |
} | |
} | |
impl Subscribe for TestSubscriber { | |
fn subscribe(&self) -> mpsc::Sender<DispatchMessage> { | |
self.sender.clone() | |
} | |
fn what_subscribe(&self) -> DispatchType { | |
OutgoingMessage | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment