Skip to content

Instantly share code, notes, and snippets.

@zesterer
Last active August 15, 2018 15:59
Show Gist options
  • Save zesterer/a6880b1b9ea5983f92e2cf43f3050415 to your computer and use it in GitHub Desktop.
Save zesterer/a6880b1b9ea5983f92e2cf43f3050415 to your computer and use it in GitHub Desktop.
#![feature(integer_atomics)]
use std::sync::{mpsc, Arc, Mutex, atomic::{AtomicU64, Ordering}};
use std::collections::HashMap;
use std::thread;
use std::marker::PhantomData;
// Msg
trait Msg: Send + Sync + 'static {}
// Conn (stubby)
struct Conn<T: Msg> { phantom: PhantomData<T> }
impl<T: Msg> Conn<T> {
fn send(&self, _msg: T) { unimplemented!() }
fn recv(&self) -> Result<T, ()> { unimplemented!() }
}
// PostOffice
enum Letter<T: Msg> {
OpenBox(u64),
CloseBox(u64),
Msg { uid: u64, payload: T },
}
impl<T: Msg> Msg for Letter<T> {}
struct PostOffice<T: Msg> {
uid_counter: AtomicU64,
conn: Conn<Letter<T>>,
box_sends: Mutex<HashMap<u64, mpsc::Sender<T>>>,
recv: Mutex<mpsc::Receiver<Letter<T>>>,
send: Mutex<mpsc::Sender<Letter<T>>>,
}
impl<T: Msg> PostOffice<T> {
pub fn new() -> PostOffice<T> {
let (send, recv) = mpsc::channel();
PostOffice {
uid_counter: AtomicU64::new(0),
conn: Conn { phantom: PhantomData },
box_sends: Mutex::new(HashMap::new()),
recv: Mutex::new(recv),
send: Mutex::new(send),
}
}
pub fn start(po: Arc<PostOffice<T>>) {
thread::spawn(move || {
let recv = po.recv.lock().unwrap();
while let Ok(letter) = recv.recv() {
po.conn.send(letter);
}
});
}
fn gen_uid(&self) -> u64 {
// Server-side postoffices use even numbers, client-side uses odd numbers (to avoid conflicts)
self.uid_counter.fetch_add(2, Ordering::Relaxed)
}
fn create_pb_with_uid(&self, uid: u64) -> PostBox<T> {
let (send, recv) = mpsc::channel();
self.box_sends.lock().unwrap().insert(uid, send);
PostBox {
uid,
recv,
send: self.send.lock().unwrap().clone(),
}
}
pub fn await_incoming(&self) -> Result<PostBox<T>, ()> {
loop {
match self.conn.recv() {
Ok(Letter::OpenBox(uid)) => return Ok(self.create_pb_with_uid(uid)),
Ok(Letter::CloseBox(uid)) => { self.box_sends.lock().unwrap().remove(&uid); },
Ok(Letter::Msg { uid, payload }) => { self.box_sends.lock().unwrap().get(&uid).map(|s| s.send(payload)); },
Err(e) => return Err(e),
}
}
}
pub fn create_postbox(&self) -> PostBox<T> {
let uid = self.gen_uid();
self.conn.send(Letter::OpenBox(uid));
self.create_pb_with_uid(uid)
}
}
// PostBox
struct PostBox<T: Msg> {
uid: u64,
recv: mpsc::Receiver<T>,
send: mpsc::Sender<Letter<T>>,
}
impl<T: Msg> PostBox<T> {
pub fn send(&self, msg: T) -> Result<(), ()> {
self.send.send(Letter::Msg {
uid: self.uid,
payload: msg,
}).map_err(|_| ())
}
pub fn recv(&self) -> Result<T, ()> {
self.recv.recv().map_err(|_| ())
}
}
impl<T: Msg> Drop for PostBox<T> {
fn drop(&mut self) {
// Ignore failure to send close postbox letter
let _ = self.send.send(Letter::CloseBox(self.uid));
}
}
// Example
impl Msg for () {}
fn main() {
println!("Hello, world!");
// Example usage
let po = Arc::new(PostOffice::<()>::new());
PostOffice::start(po.clone());
while let Ok(pb) = po.await_incoming() {
thread::spawn(move || {
while let Ok(_msg) = pb.recv() {
// Do something with the message here
}
});
}
let pb = po.create_postbox();
pb.send(()).expect("Sending failed");
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment