Skip to content

Instantly share code, notes, and snippets.

@jamwt
Created July 20, 2015 23:40
Show Gist options
  • Save jamwt/b9df06a87411ea2846aa to your computer and use it in GitHub Desktop.
Save jamwt/b9df06a87411ea2846aa to your computer and use it in GitHub Desktop.
extern crate eventual;
extern crate mio;
use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
use std::thread;
use eventual::Async;
static COUNTER: AtomicUsize = ATOMIC_USIZE_INIT;
struct Reactor {
sends: HashMap<usize, eventual::Sender<(), ()>>,
}
impl Reactor {
fn new(sends: Vec<eventual::Sender<(), ()>>) -> Self {
let hm = sends.into_iter().enumerate().collect();
Reactor {
sends: hm,
}
}
}
impl mio::Handler for Reactor {
type Message = usize;
fn notify(
&mut self,
_event_loop: &mut mio::EventLoop<Self>,
relay: Self::Message) {
let send = self.sends.remove(&relay).unwrap();
match send.send(()).await() {
Ok(new) => {
self.sends.insert(relay, new);
},
_ => panic!("wtf"),
}
}
// Not germane...
type Timeout = ();
fn timeout(
&mut self,
_event_loop: &mut mio::EventLoop<Self>,
_msg: Self::Timeout) {
unimplemented!();
}
fn ready(
&mut self,
_event_loop: &mut mio::EventLoop<Self>,
_token: mio::Token,
_events: mio::EventSet) {
unimplemented!();
}
fn interrupted(
&mut self,
_event_loop: &mut mio::EventLoop<Self>) {
unimplemented!();
}
}
fn main() {
let threads = 4;
let injections = 4;
let mut sends = vec![];
let mut streams = vec![];
for _ in 0..threads {
let (send, stream) = eventual::Stream::pair();
sends.push(send);
streams.push(stream);
}
let mut event_loop = mio::EventLoop::new().unwrap();
let mut reactor = Reactor::new(sends);
for stream in streams {
let channel = event_loop.channel();
thread::spawn(move || {
stream.each(move |_| {
let incr = COUNTER.fetch_add(1, Ordering::SeqCst);
if incr % 100000 == 0 {
println!("@{}", incr);
}
if incr % 7 == 0 {
let inner_channel = channel.clone();
thread::spawn(move || {
let mut sum = 1;
for i in (1..50000) {
sum = sum * i;
}
if incr % 1000 == 0 {
println!("the sum is {}", sum);
}
inner_channel.send(incr % threads).unwrap();
});
} else {
channel.send(incr % threads).unwrap();
}
}).fire();
});
}
let channel = event_loop.channel();
for start in 0..injections {
channel.send(start % threads).unwrap();
}
event_loop.run(&mut reactor).unwrap();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment