Skip to content

Instantly share code, notes, and snippets.

@Thomasdezeeuw
Created January 18, 2019 11:53
Show Gist options
  • Save Thomasdezeeuw/8ca4df953d00d68f7d1960a6255ccd8c to your computer and use it in GitHub Desktop.
Save Thomasdezeeuw/8ca4df953d00d68f7d1960a6255ccd8c to your computer and use it in GitHub Desktop.
LocalWaker and Waker implementation.
use criterion::{Fun, Criterion, criterion_group, criterion_main};
use crossbeam_channel::unbounded;
use waker::{self, LocalWaker, Waker, new_local_waker};
fn wake_local(lw: &LocalWaker) {
lw.wake()
}
fn wake(waker: &Waker) {
waker.wake()
}
fn benchmark_wake(c: &mut Criterion) {
// Initialisation would happen on the main thread (that doesn't run any
// futures).
let (sender, receiver) = unbounded();
// Each worker thread (that does run futures) would call init with its id.
let thread_id = 0;
unsafe { waker::init(thread_id, sender); }
let wake_local = Fun::new("Wake local", move |b, lw| b.iter(move || {
wake_local(lw);
// We'll also clear the events otherwise we would run out of memory.
waker::clear_local_events(thread_id);
}));
let wake = Fun::new("wake", move |b, lw: &LocalWaker| {
let waker = lw.clone().try_into_waker().unwrap();
b.iter(|| {
wake(&waker);
// Again we also clear the send event here.
receiver.try_recv().unwrap();
})
});
c.bench_functions("Waker", vec![wake_local, wake], new_local_waker(thread_id, 0));
}
criterion_group!(benches, benchmark_wake);
criterion_main!(benches);
[package]
name = "waker"
version = "0.1.0"
authors = ["Thomas de Zeeuw <[email protected]>"]
edition = "2018"
[dependencies]
crossbeam-channel = "0.3.6"
[dev-dependencies]
criterion = "0.2"
[lib]
name = "waker"
path = "lib.rs"
[[bench]]
name = "waker"
path = "bench.rs"
harness = false
use std::mem;
use crossbeam_channel::Sender;
// The waker data `*const ()` is interpreted as `u64`, lower 32 bits are
// `WakeId`, next 16 are the thread id and the last 16 are unused.
type WakeId = u32;
// The vec (0) is only accessable by the LocalWaker implementation, the sender
// (1) by the Waker.
//
// TODO: need to add a way to waken other threads currently polling.
type WakerImpl = Option<(Vec<WakeId>, Sender<WakeId>)>;
// Waker data per thread, see `init`.
const MAX_THREADS: usize = 2;
static mut THREAD_WAKERS: [WakerImpl; MAX_THREADS] = [None, None];
// This would run on each worker thread that actually run futures. `thread_id`
// is a unique index into `THREAD_WAKERS`.
pub unsafe fn init(thread_id: u16, sender: Sender<WakeId>) {
THREAD_WAKERS[thread_id as usize] = Some((Vec::new(), sender));
}
// Only here for the benchmark.
pub fn clear_local_events(thread_id: u16) {
unsafe {
THREAD_WAKERS[thread_id as usize].as_mut().unwrap().0.clear();
}
}
// Create a new LocalWaker for a future with `id` running on thread with
// `thread_id`.
pub fn new_local_waker(thread_id: u16, id: WakeId) -> LocalWaker {
let data: u64 = (thread_id as u64) << 32 + (id as u64);
LocalWaker {
waker: RawWaker {
data: data as *const (),
vtable: LOCAL_WAKER_VTABLE,
},
}
}
static LOCAL_WAKER_VTABLE: &RawWakerVTable = &RawWakerVTable {
clone: local_clone,
into_waker: local_into_waker,
wake: local_wake,
drop_fn: local_drop,
};
// FIXME: this returns `RawWaker` in the RFC, but that is incorrect with the
// Clone implementation.
unsafe fn local_clone(data: *const ()) -> *const () {
data
}
unsafe fn local_into_waker(data: *const ()) -> Option<RawWaker> {
Some(RawWaker {
data: data,
vtable: WAKER_VTABLE,
})
}
unsafe fn local_wake(data: *const ()) {
let data: u64 = data as usize as u64;
let thread_id: u16 = (data >> 32) as u16;
let wake_id: u32 = (data & ((1 << 32) - 1)) as u32;
THREAD_WAKERS[thread_id as usize].as_mut().unwrap().0.push(wake_id)
}
unsafe fn local_drop(_data: *const ()) {
// Do nothing.
}
static WAKER_VTABLE: &RawWakerVTable = &RawWakerVTable {
clone: wake_clone,
into_waker: wake_into_waker,
wake: wake_wake,
drop_fn: wake_drop,
};
// FIXME: this returns `RawWaker` in the RFC, but that is incorrect with the
// Clone implementation.
unsafe fn wake_clone(data: *const ()) -> *const () {
data
}
unsafe fn wake_into_waker(data: *const ()) -> Option<RawWaker> {
Some(RawWaker {
data: data,
vtable: &WAKER_VTABLE,
})
}
unsafe fn wake_wake(data: *const ()) {
let data: u64 = data as usize as u64;
let thread_id: u16 = (data >> 32) as u16;
let wake_id: u32 = (data & ((1 << 32) - 1)) as u32;
let _ = THREAD_WAKERS[thread_id as usize].as_mut().unwrap().1.try_send(wake_id);
// TODO: we still need to wake the other thread if it's currently polling.
// This could be done by using `eventedfd` on Linux or a pipe on any other
// unix platform. This means that another `write` system call is required.
}
unsafe fn wake_drop(_data: *const ()) {
// Do nothing.
}
// The LocalWaker and Waker struct as proposed in RFC 2592, as of January 18th
// 2019. Note however that the `RawWakerVTable.clone` was incorrect and here
// returns `*const ()`, rather then `RawWaker`.
#[derive(Debug)]
#[derive(PartialEq)]
pub struct RawWaker {
pub data: *const (),
pub vtable: &'static RawWakerVTable,
}
#[derive(Debug)]
#[derive(PartialEq, Copy, Clone)]
pub struct RawWakerVTable {
// FIXME: this returns `RawWaker` in the RFC, but that is incorrect with the
// Clone implementation.
pub clone: unsafe fn(*const ()) -> *const (),
pub into_waker: unsafe fn(*const ()) -> Option<RawWaker>,
pub wake: unsafe fn(*const ()),
pub drop_fn: unsafe fn(*const ()),
}
#[derive(Debug)]
pub struct Waker {
waker: RawWaker,
}
impl Waker {
pub fn wake(&self) {
unsafe { (self.waker.vtable.wake)(self.waker.data) }
}
pub unsafe fn new_unchecked(waker: RawWaker) -> Waker {
Waker {
waker: waker,
}
}
}
#[derive(Debug)]
pub struct LocalWaker {
waker: RawWaker,
}
impl LocalWaker {
pub fn wake(&self) {
unsafe { (self.waker.vtable.wake)(self.waker.data) }
}
pub fn try_into_waker(self) -> Option<Waker> {
unsafe {
let maybe_raw_waker = (self.waker.vtable.into_waker)(self.waker.data);
mem::forget(self);
match maybe_raw_waker {
Some(rw) => Some(Waker::new_unchecked(rw)),
None => None,
}
}
}
}
impl Clone for LocalWaker {
fn clone(&self) -> Self {
LocalWaker {
waker: RawWaker {
data: unsafe { (self.waker.vtable.clone)(self.waker.data) },
vtable: self.waker.vtable,
}
}
}
}
Waker/Wake local time: [5.3191 ns 5.3286 ns 5.3399 ns]
Found 14 outliers among 100 measurements (14.00%)
7 (7.00%) high mild
7 (7.00%) high severe
Waker/wake time: [53.726 ns 53.834 ns 53.955 ns]
Found 22 outliers among 100 measurements (22.00%)
12 (12.00%) low mild
2 (2.00%) high mild
8 (8.00%) high severe
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment