Created
January 18, 2019 11:53
-
-
Save Thomasdezeeuw/8ca4df953d00d68f7d1960a6255ccd8c to your computer and use it in GitHub Desktop.
LocalWaker and Waker implementation.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use criterion::{Fun, Criterion, criterion_group, criterion_main}; | |
use crossbeam_channel::unbounded; | |
use waker::{self, LocalWaker, Waker, new_local_waker}; | |
fn wake_local(lw: &LocalWaker) { | |
lw.wake() | |
} | |
fn wake(waker: &Waker) { | |
waker.wake() | |
} | |
fn benchmark_wake(c: &mut Criterion) { | |
// Initialisation would happen on the main thread (that doesn't run any | |
// futures). | |
let (sender, receiver) = unbounded(); | |
// Each worker thread (that does run futures) would call init with its id. | |
let thread_id = 0; | |
unsafe { waker::init(thread_id, sender); } | |
let wake_local = Fun::new("Wake local", move |b, lw| b.iter(move || { | |
wake_local(lw); | |
// We'll also clear the events otherwise we would run out of memory. | |
waker::clear_local_events(thread_id); | |
})); | |
let wake = Fun::new("wake", move |b, lw: &LocalWaker| { | |
let waker = lw.clone().try_into_waker().unwrap(); | |
b.iter(|| { | |
wake(&waker); | |
// Again we also clear the send event here. | |
receiver.try_recv().unwrap(); | |
}) | |
}); | |
c.bench_functions("Waker", vec![wake_local, wake], new_local_waker(thread_id, 0)); | |
} | |
criterion_group!(benches, benchmark_wake); | |
criterion_main!(benches); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[package] | |
name = "waker" | |
version = "0.1.0" | |
authors = ["Thomas de Zeeuw <[email protected]>"] | |
edition = "2018" | |
[dependencies] | |
crossbeam-channel = "0.3.6" | |
[dev-dependencies] | |
criterion = "0.2" | |
[lib] | |
name = "waker" | |
path = "lib.rs" | |
[[bench]] | |
name = "waker" | |
path = "bench.rs" | |
harness = false |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::mem; | |
use crossbeam_channel::Sender; | |
// The waker data `*const ()` is interpreted as `u64`, lower 32 bits are | |
// `WakeId`, next 16 are the thread id and the last 16 are unused. | |
type WakeId = u32; | |
// The vec (0) is only accessable by the LocalWaker implementation, the sender | |
// (1) by the Waker. | |
// | |
// TODO: need to add a way to waken other threads currently polling. | |
type WakerImpl = Option<(Vec<WakeId>, Sender<WakeId>)>; | |
// Waker data per thread, see `init`. | |
const MAX_THREADS: usize = 2; | |
static mut THREAD_WAKERS: [WakerImpl; MAX_THREADS] = [None, None]; | |
// This would run on each worker thread that actually run futures. `thread_id` | |
// is a unique index into `THREAD_WAKERS`. | |
pub unsafe fn init(thread_id: u16, sender: Sender<WakeId>) { | |
THREAD_WAKERS[thread_id as usize] = Some((Vec::new(), sender)); | |
} | |
// Only here for the benchmark. | |
pub fn clear_local_events(thread_id: u16) { | |
unsafe { | |
THREAD_WAKERS[thread_id as usize].as_mut().unwrap().0.clear(); | |
} | |
} | |
// Create a new LocalWaker for a future with `id` running on thread with | |
// `thread_id`. | |
pub fn new_local_waker(thread_id: u16, id: WakeId) -> LocalWaker { | |
let data: u64 = (thread_id as u64) << 32 + (id as u64); | |
LocalWaker { | |
waker: RawWaker { | |
data: data as *const (), | |
vtable: LOCAL_WAKER_VTABLE, | |
}, | |
} | |
} | |
static LOCAL_WAKER_VTABLE: &RawWakerVTable = &RawWakerVTable { | |
clone: local_clone, | |
into_waker: local_into_waker, | |
wake: local_wake, | |
drop_fn: local_drop, | |
}; | |
// FIXME: this returns `RawWaker` in the RFC, but that is incorrect with the | |
// Clone implementation. | |
unsafe fn local_clone(data: *const ()) -> *const () { | |
data | |
} | |
unsafe fn local_into_waker(data: *const ()) -> Option<RawWaker> { | |
Some(RawWaker { | |
data: data, | |
vtable: WAKER_VTABLE, | |
}) | |
} | |
unsafe fn local_wake(data: *const ()) { | |
let data: u64 = data as usize as u64; | |
let thread_id: u16 = (data >> 32) as u16; | |
let wake_id: u32 = (data & ((1 << 32) - 1)) as u32; | |
THREAD_WAKERS[thread_id as usize].as_mut().unwrap().0.push(wake_id) | |
} | |
unsafe fn local_drop(_data: *const ()) { | |
// Do nothing. | |
} | |
static WAKER_VTABLE: &RawWakerVTable = &RawWakerVTable { | |
clone: wake_clone, | |
into_waker: wake_into_waker, | |
wake: wake_wake, | |
drop_fn: wake_drop, | |
}; | |
// FIXME: this returns `RawWaker` in the RFC, but that is incorrect with the | |
// Clone implementation. | |
unsafe fn wake_clone(data: *const ()) -> *const () { | |
data | |
} | |
unsafe fn wake_into_waker(data: *const ()) -> Option<RawWaker> { | |
Some(RawWaker { | |
data: data, | |
vtable: &WAKER_VTABLE, | |
}) | |
} | |
unsafe fn wake_wake(data: *const ()) { | |
let data: u64 = data as usize as u64; | |
let thread_id: u16 = (data >> 32) as u16; | |
let wake_id: u32 = (data & ((1 << 32) - 1)) as u32; | |
let _ = THREAD_WAKERS[thread_id as usize].as_mut().unwrap().1.try_send(wake_id); | |
// TODO: we still need to wake the other thread if it's currently polling. | |
// This could be done by using `eventedfd` on Linux or a pipe on any other | |
// unix platform. This means that another `write` system call is required. | |
} | |
unsafe fn wake_drop(_data: *const ()) { | |
// Do nothing. | |
} | |
// The LocalWaker and Waker struct as proposed in RFC 2592, as of January 18th | |
// 2019. Note however that the `RawWakerVTable.clone` was incorrect and here | |
// returns `*const ()`, rather then `RawWaker`. | |
#[derive(Debug)] | |
#[derive(PartialEq)] | |
pub struct RawWaker { | |
pub data: *const (), | |
pub vtable: &'static RawWakerVTable, | |
} | |
#[derive(Debug)] | |
#[derive(PartialEq, Copy, Clone)] | |
pub struct RawWakerVTable { | |
// FIXME: this returns `RawWaker` in the RFC, but that is incorrect with the | |
// Clone implementation. | |
pub clone: unsafe fn(*const ()) -> *const (), | |
pub into_waker: unsafe fn(*const ()) -> Option<RawWaker>, | |
pub wake: unsafe fn(*const ()), | |
pub drop_fn: unsafe fn(*const ()), | |
} | |
#[derive(Debug)] | |
pub struct Waker { | |
waker: RawWaker, | |
} | |
impl Waker { | |
pub fn wake(&self) { | |
unsafe { (self.waker.vtable.wake)(self.waker.data) } | |
} | |
pub unsafe fn new_unchecked(waker: RawWaker) -> Waker { | |
Waker { | |
waker: waker, | |
} | |
} | |
} | |
#[derive(Debug)] | |
pub struct LocalWaker { | |
waker: RawWaker, | |
} | |
impl LocalWaker { | |
pub fn wake(&self) { | |
unsafe { (self.waker.vtable.wake)(self.waker.data) } | |
} | |
pub fn try_into_waker(self) -> Option<Waker> { | |
unsafe { | |
let maybe_raw_waker = (self.waker.vtable.into_waker)(self.waker.data); | |
mem::forget(self); | |
match maybe_raw_waker { | |
Some(rw) => Some(Waker::new_unchecked(rw)), | |
None => None, | |
} | |
} | |
} | |
} | |
impl Clone for LocalWaker { | |
fn clone(&self) -> Self { | |
LocalWaker { | |
waker: RawWaker { | |
data: unsafe { (self.waker.vtable.clone)(self.waker.data) }, | |
vtable: self.waker.vtable, | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Waker/Wake local time: [5.3191 ns 5.3286 ns 5.3399 ns] | |
Found 14 outliers among 100 measurements (14.00%) | |
7 (7.00%) high mild | |
7 (7.00%) high severe | |
Waker/wake time: [53.726 ns 53.834 ns 53.955 ns] | |
Found 22 outliers among 100 measurements (22.00%) | |
12 (12.00%) low mild | |
2 (2.00%) high mild | |
8 (8.00%) high severe |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment