Skip to content

Instantly share code, notes, and snippets.

@iximeow
Last active December 13, 2024 01:41
Show Gist options
  • Save iximeow/b50b10c26575a4b8be001108bfd8f069 to your computer and use it in GitHub Desktop.
Save iximeow/b50b10c26575a4b8be001108bfd8f069 to your computer and use it in GitHub Desktop.
measure wakeup latencies
// this program tries to measure a producer/consumer latency when
// the consumer thread may be in deep idle states.
//
// illumos only for now, sorry.
//
// run like `./wakeup_lat 20 22 park 200` to measure
// core 20 waking core 22, 200 times
use std::sync::{Arc, Condvar, Mutex};
use std::sync::atomic::Ordering;
use std::ffi;
use std::sync::atomic::{AtomicU8, AtomicU64};
// sys/types.h
#[allow(non_camel_case_types)]
type id_t = ffi::c_int;
// sys/processor.h
#[allow(non_camel_case_types)]
type processorid_t = ffi::c_int;
// sys/procset.h
#[allow(dead_code, non_camel_case_types)]
#[repr(C)]
enum idtype_t {
P_PID = 0,
P_PPID,
P_PGID,
P_SID,
P_CID,
P_UID,
P_GID,
P_ALL,
P_LWPID,
P_TASKID,
P_PROJID,
P_POOLID,
P_ZONEID,
P_CTID,
P_CPUID,
P_PSETID,
}
extern "C" {
fn processor_bind(idtype: idtype_t, id: id_t, processorid: processorid_t, obind: *mut processorid_t) -> ffi::c_int;
fn gethrtime() -> ffi::c_longlong;
fn _lwp_self() -> ffi::c_int;
}
pub fn bind_to_proc(proc: processorid_t) {
let mut prev_processor = 0xffffffffu32 as i32;
unsafe {
let res = processor_bind(
idtype_t::P_LWPID,
_lwp_self(),
proc,
&mut prev_processor,
);
assert_eq!(res, 0);
}
}
struct Measurements {
times: Arc<Mutex<Vec<u64>>>,
flag: Arc<(Mutex<()>, Condvar)>,
producer_cpu: processorid_t,
consumer_cpu: processorid_t,
// written by a producer thread, always set before waking a consumer.
poke_time: AtomicU64,
// 0 = uninitialized
// 1 = wait 1ms, then write and wake
// 2 = wrote, read time when woken then reset
state: AtomicU8,
}
impl Measurements {
fn new(producer_cpu: processorid_t, consumer_cpu: processorid_t) -> Self {
Self {
times: Arc::new(Mutex::new(Vec::with_capacity(10000))),
flag: Arc::new((Mutex::new(()), Condvar::new())),
producer_cpu,
consumer_cpu,
state: AtomicU8::new(0),
poke_time: AtomicU64::new(0),
}
}
fn get_state(&self) -> u8 {
let state = self.state.load(Ordering::Acquire);
if state > 3 {
panic!("impossible state: {}", state);
}
state
}
fn set_state(&self, state: u8) {
let old_state = self.state.swap(state, Ordering::Release);
assert!(old_state != state, "old state and new state are same? {}", state);
assert!(old_state <= 3);
}
fn wait_for_sample(&self) {
loop {
let state = self.get_state();
// 3 == "consumer has written a sample". wait for at least this
// point, because otherwise the producer might check for a sample
// just before the consumer thread actually wrote it. the sample
// would still get recorded eventually, but we expect to record
// exactly zero or one samples per producer loop iteration.
if state == 3 { break; }
// 1 == "consumer will sleep soon", which implies it has produced
// a sample as well. this just means we didn't check in the short
// window when the state was 3.
if state == 1 { break; }
// 0 == "consumer has not initialized yet". keep waiting.
if state == 0 { continue; }
// 2 == "consumer should measure now". it's busy, we wait.
assert_eq!(state, 2);
}
}
fn wait_for_consumer(&self) {
loop {
if self.get_state() == 1 { break; }
}
}
// record that the consumer will be parking "soon". there isn't a very tight
// bound on how long it will take to park the consumer cpu, or how long it
// will take from park to actually putting the cpu in a low-power state.
//
// parking the thread should be fast (single-digit microseconds to lwp_park
// after this is called), but putting the processor in a low-power state
// depends on what power states are enabled, reported to the OS, and if
// reported to the OS, their reported transition latencies.
//
// if processor power states are fully autonomously managed, an amount of
// time may have to elapse that is otherwise opaque to the OS.
//
// in practice, 10ms should be a long enough wait. might be prudent to
// exclude the worst few observations, given the above sources of noise.
fn state_consumer_parking(&self) {
self.set_state(1);
}
// we've recorded an hrtime, tell the consumer thread to wake up and record
// a wake delay.
fn state_poked(&self) -> bool {
self.get_state() == 2
}
fn prepare_poke(&self) {
let poke_time = unsafe { gethrtime() } as u64;
self.poke_time.store(poke_time, Ordering::Release);
// 2 == "poke is ready, consumer measures delay when woken"
self.set_state(2);
}
// try to collect one wakeup latency measurement.
//
// "try", because the consumer thread may be spuriously woken at exactly the
// wrong time and discard the attempt to measure wakeup latency.
fn produce(&self) {
// wait for the consumer to indicate it has gone to sleep
self.wait_for_consumer();
// ok, consumer is ready. wait as promised..
// 10ms is an arbitrary choice intended to be high enough to make
// deepest ACPI C-state sleep palatable.
std::thread::sleep(std::time::Duration::from_millis(10));
// consumer CPU should be fully idle at this point.
self.prepare_poke();
// the condvar may not actually be waited depending on the consumer
// mode. no harm in signalling anyway.
self.flag.1.notify_one();
}
fn measure_delay(&self) {
let peek_time = unsafe { gethrtime() } as u64;
let delay = peek_time - self.poke_time.load(Ordering::Acquire);
self.times.lock().unwrap().push(delay);
// 3 == "consumer has recorded latency". the consumer will immediately
// proceed to sleep again, so the producer may see 3 if it checks
// quickly, or we'll have moved to state 1 ("consumer will sleep soon")
// if the producer is slower to check.
self.set_state(3);
}
// consumer function that coordinates work by spinning on a shared byte. the
// consumer thread will never actually be idle, so its cpu will never be in
// a low-power state. `truss` confirms that `wait_spin` results in
// exclusively calls to `nanosleep` (as the producer is waiting as if the
// consumer might sleep).
//
// this should roughly establish a baseline for how long it takes to call
// `hrtime`, and little else. it should measure wakeup delays well under a
// microsecond.
fn wait_spin(self: Arc<Self>) -> Box<dyn Fn() + Send> {
Box::new(move || {
bind_to_proc(self.consumer_cpu);
eprintln!("[+] consumer bound to {}", self.consumer_cpu);
loop {
self.state_consumer_parking();
while !self.state_poked() { }
self.measure_delay();
}
})
}
// consumer function that coordinates work by waiting on a `Condvar` shared
// with a producer function. `Condvar` is implemented in terms of
// `pthread_cond_{wait,signal,broadcast}` for unix targetsr. `truss`
// confirms that `wait_park` results in the measurement procedure of
// exclusively `lwp_unpark`/`lwp_park`/`nanosleep` for waiting, signaling,
// and waiting-for-cons-umer-to-idle.
fn wait_park(self: Arc<Self>) -> Box<dyn Fn() + Send + 'static> {
Box::new(move || {
bind_to_proc(self.consumer_cpu);
eprintln!("[+] consumer bound to {}", self.consumer_cpu);
let mut guard = self.flag.0.lock().unwrap();
loop {
self.state_consumer_parking();
guard = self.flag.1.wait(guard).unwrap();
if !self.state_poked() {
// spurious wake, consider the reading bogus and try again.
eprintln!("spurious");
continue;
}
self.measure_delay();
}
})
}
}
pub fn main() {
let args = std::env::args().collect::<Vec<String>>();
if args.len() < 5 {
eprintln!(
"usage: ./wakeup_lat [producer_cpu] [consumer_cpu] \
[\"park\"/\"spin\"] [samples]"
);
eprintln!("");
eprintln!(
" choice of \"park\" or \"spin\" selects how the consumer waits \
to discover it should measure a latency: lwp_park/unpark or spin \
on a shared variable");
std::process::exit(1);
}
let producer_cpu = args[1].parse::<processorid_t>().unwrap();
let consumer_cpu = args[2].parse::<processorid_t>().unwrap();
let consumer_mode = args[3].as_str();
let samples = args[4].as_str().parse::<usize>().unwrap();
let measurements = Arc::new(Measurements::new(producer_cpu, consumer_cpu));
let consumer = match consumer_mode {
"park" => Arc::clone(&measurements).wait_park(),
"spin" => Arc::clone(&measurements).wait_spin(),
o => {
panic!("unsupported consumer mode: {}", o);
}
};
std::thread::spawn(consumer);
bind_to_proc(measurements.producer_cpu);
eprintln!("[+] producer bound to {}", producer_cpu);
loop {
measurements.wait_for_sample();
let measurement_count = measurements.times.lock().unwrap().len();
if measurement_count == samples {
break;
}
if measurement_count % 10 == 0 {
eprintln!("[i] {}/{}", measurement_count, samples);
}
measurements.produce();
}
let mut times = measurements.times.lock().unwrap();
times.sort();
println!("min: {}", times[0]);
println!("p50: {}", times[times.len() / 2]);
println!("p90: {}", times[(times.len() as f32 * 0.9) as usize]);
println!("p99: {}", times[(times.len() as f32 * 0.99) as usize]);
println!("max: {}", times[times.len() - 1]);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment