Last active
August 15, 2018 15:59
-
-
Save zesterer/a6880b1b9ea5983f92e2cf43f3050415 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#![feature(integer_atomics)] | |
use std::sync::{mpsc, Arc, Mutex, atomic::{AtomicU64, Ordering}}; | |
use std::collections::HashMap; | |
use std::thread; | |
use std::marker::PhantomData; | |
// Msg | |
trait Msg: Send + Sync + 'static {} | |
// Conn (stubby) | |
struct Conn<T: Msg> { phantom: PhantomData<T> } | |
impl<T: Msg> Conn<T> { | |
fn send(&self, _msg: T) { unimplemented!() } | |
fn recv(&self) -> Result<T, ()> { unimplemented!() } | |
} | |
// PostOffice | |
enum Letter<T: Msg> { | |
OpenBox(u64), | |
CloseBox(u64), | |
Msg { uid: u64, payload: T }, | |
} | |
impl<T: Msg> Msg for Letter<T> {} | |
struct PostOffice<T: Msg> { | |
uid_counter: AtomicU64, | |
conn: Conn<Letter<T>>, | |
box_sends: Mutex<HashMap<u64, mpsc::Sender<T>>>, | |
recv: Mutex<mpsc::Receiver<Letter<T>>>, | |
send: Mutex<mpsc::Sender<Letter<T>>>, | |
} | |
impl<T: Msg> PostOffice<T> { | |
pub fn new() -> PostOffice<T> { | |
let (send, recv) = mpsc::channel(); | |
PostOffice { | |
uid_counter: AtomicU64::new(0), | |
conn: Conn { phantom: PhantomData }, | |
box_sends: Mutex::new(HashMap::new()), | |
recv: Mutex::new(recv), | |
send: Mutex::new(send), | |
} | |
} | |
pub fn start(po: Arc<PostOffice<T>>) { | |
thread::spawn(move || { | |
let recv = po.recv.lock().unwrap(); | |
while let Ok(letter) = recv.recv() { | |
po.conn.send(letter); | |
} | |
}); | |
} | |
fn gen_uid(&self) -> u64 { | |
// Server-side postoffices use even numbers, client-side uses odd numbers (to avoid conflicts) | |
self.uid_counter.fetch_add(2, Ordering::Relaxed) | |
} | |
fn create_pb_with_uid(&self, uid: u64) -> PostBox<T> { | |
let (send, recv) = mpsc::channel(); | |
self.box_sends.lock().unwrap().insert(uid, send); | |
PostBox { | |
uid, | |
recv, | |
send: self.send.lock().unwrap().clone(), | |
} | |
} | |
pub fn await_incoming(&self) -> Result<PostBox<T>, ()> { | |
loop { | |
match self.conn.recv() { | |
Ok(Letter::OpenBox(uid)) => return Ok(self.create_pb_with_uid(uid)), | |
Ok(Letter::CloseBox(uid)) => { self.box_sends.lock().unwrap().remove(&uid); }, | |
Ok(Letter::Msg { uid, payload }) => { self.box_sends.lock().unwrap().get(&uid).map(|s| s.send(payload)); }, | |
Err(e) => return Err(e), | |
} | |
} | |
} | |
pub fn create_postbox(&self) -> PostBox<T> { | |
let uid = self.gen_uid(); | |
self.conn.send(Letter::OpenBox(uid)); | |
self.create_pb_with_uid(uid) | |
} | |
} | |
// PostBox | |
struct PostBox<T: Msg> { | |
uid: u64, | |
recv: mpsc::Receiver<T>, | |
send: mpsc::Sender<Letter<T>>, | |
} | |
impl<T: Msg> PostBox<T> { | |
pub fn send(&self, msg: T) -> Result<(), ()> { | |
self.send.send(Letter::Msg { | |
uid: self.uid, | |
payload: msg, | |
}).map_err(|_| ()) | |
} | |
pub fn recv(&self) -> Result<T, ()> { | |
self.recv.recv().map_err(|_| ()) | |
} | |
} | |
impl<T: Msg> Drop for PostBox<T> { | |
fn drop(&mut self) { | |
// Ignore failure to send close postbox letter | |
let _ = self.send.send(Letter::CloseBox(self.uid)); | |
} | |
} | |
// Example | |
impl Msg for () {} | |
fn main() { | |
println!("Hello, world!"); | |
// Example usage | |
let po = Arc::new(PostOffice::<()>::new()); | |
PostOffice::start(po.clone()); | |
while let Ok(pb) = po.await_incoming() { | |
thread::spawn(move || { | |
while let Ok(_msg) = pb.recv() { | |
// Do something with the message here | |
} | |
}); | |
} | |
let pb = po.create_postbox(); | |
pb.send(()).expect("Sending failed"); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment