Created
May 13, 2015 03:16
-
-
Save zonyitoo/a68c6331d88f37745b59 to your computer and use it in GitHub Desktop.
Rust Coroutine with work-stealing scheduler demo
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#![feature(scoped)] | |
extern crate coroutine; | |
extern crate num_cpus; | |
extern crate deque; | |
#[macro_use] extern crate log; | |
extern crate env_logger; | |
use std::thread; | |
use std::sync::mpsc::{channel, Sender, Receiver, TryRecvError}; | |
use std::sync::{Mutex, Once, ONCE_INIT}; | |
use std::mem; | |
use std::cell::UnsafeCell; | |
use coroutine::{spawn, sched}; | |
use coroutine::coroutine::{State, Handle}; | |
use deque::{BufferPool, Stealer, Worker, Stolen}; | |
static mut THREAD_HANDLES: *const Mutex<Vec<(Sender<SchedMessage>, Stealer<Handle>)>> = | |
0 as *const Mutex<Vec<(Sender<SchedMessage>, Stealer<Handle>)>>; | |
static THREAD_HANDLES_ONCE: Once = ONCE_INIT; | |
fn schedulers() -> &'static Mutex<Vec<(Sender<SchedMessage>, Stealer<Handle>)>> { | |
unsafe { | |
THREAD_HANDLES_ONCE.call_once(|| { | |
let handles: Box<Mutex<Vec<(Sender<SchedMessage>, Stealer<Handle>)>>> = | |
Box::new(Mutex::new(Vec::new())); | |
THREAD_HANDLES = mem::transmute(handles); | |
}); | |
& *THREAD_HANDLES | |
} | |
} | |
thread_local!(static SCHEDULER: UnsafeCell<Scheduler> = UnsafeCell::new(Scheduler::new())); | |
enum SchedMessage { | |
NewNeighbor(Sender<SchedMessage>, Stealer<Handle>), | |
} | |
struct Scheduler { | |
workqueue: Worker<Handle>, | |
workstealer: Stealer<Handle>, | |
commchannel: Receiver<SchedMessage>, | |
neighbors: Vec<(Sender<SchedMessage>, Stealer<Handle>)>, | |
} | |
impl Scheduler { | |
fn new() -> Scheduler { | |
let bufpool = BufferPool::new(); | |
let (worker, stealer) = bufpool.deque(); | |
let (tx, rx) = channel(); | |
let scheds = schedulers(); | |
let mut guard = scheds.lock().unwrap(); | |
for &(ref rtx, _) in guard.iter() { | |
let _ = rtx.send(SchedMessage::NewNeighbor(tx.clone(), stealer.clone())); | |
} | |
guard.push((tx, stealer.clone())); | |
Scheduler { | |
workqueue: worker, | |
workstealer: stealer, | |
commchannel: rx, | |
neighbors: guard.clone(), | |
} | |
} | |
fn current() -> &'static mut Scheduler { | |
SCHEDULER.with(|s| unsafe { | |
&mut *s.get() | |
}) | |
} | |
fn spawn<F>(f: F) | |
where F: FnOnce() + Send + 'static { | |
let coro = spawn(f); | |
let sc = Scheduler::current(); | |
sc.workqueue.push(coro); | |
} | |
fn schedule(&mut self) { | |
loop { | |
match self.commchannel.try_recv() { | |
Ok(SchedMessage::NewNeighbor(tx, st)) => { | |
self.neighbors.push((tx, st)); | |
}, | |
Err(TryRecvError::Empty) => {}, | |
_ => panic!("Receiving from channel: Unknown message") | |
} | |
match self.workstealer.steal() { | |
Stolen::Data(work) => { | |
if let Err(msg) = work.resume() { | |
error!("Coroutine panicked! {:?}", msg); | |
} | |
match work.state() { | |
State::Suspended => self.workqueue.push(work), | |
_ => {} | |
} | |
continue; | |
}, | |
Stolen::Empty => { | |
debug!("Nothing to do, try to steal from neighbors"); | |
}, | |
Stolen::Abort => { | |
error!("Abort!?"); | |
} | |
} | |
for &(_, ref st) in self.neighbors.iter() { | |
match st.steal() { | |
Stolen::Empty => {}, | |
Stolen::Data(coro) => { | |
if let Err(msg) = coro.resume() { | |
error!("Coroutine panicked! {:?}", msg); | |
} | |
match coro.state() { | |
State::Suspended => self.workqueue.push(coro), | |
_ => {} | |
} | |
break; | |
}, | |
Stolen::Abort => {} | |
} | |
} | |
} | |
} | |
} | |
fn main() { | |
env_logger::init().unwrap(); | |
Scheduler::spawn(|| { | |
loop { | |
println!("A in {}", thread::current().name().unwrap()); | |
sched(); | |
} | |
}); | |
Scheduler::spawn(|| { | |
loop { | |
println!("B in {}", thread::current().name().unwrap()); | |
sched(); | |
} | |
}); | |
Scheduler::spawn(|| { | |
loop { | |
println!("C in {}", thread::current().name().unwrap()); | |
sched(); | |
} | |
}); | |
Scheduler::spawn(|| { | |
loop { | |
println!("D in {}", thread::current().name().unwrap()); | |
sched(); | |
} | |
}); | |
let mut threads = Vec::new(); | |
for tid in 0..num_cpus::get() { | |
let fut = thread::Builder::new().name(format!("Thread {}", tid)).scoped(|| { | |
Scheduler::current().schedule(); | |
}).unwrap(); | |
threads.push(fut); | |
} | |
for fut in threads.into_iter() { | |
fut.join(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment