Created
June 9, 2026 19:52
-
-
Save Micky774/55de015bbf9fb068f9d044446ec85037 to your computer and use it in GitHub Desktop.
Transient Noise Simulation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Simulation: does transient (time-correlated) noise bias comparative microbenchmarks, | |
| and does interleaving fix it? | |
| We compare two "kernels" A and B by timing N blocks of each, then running the | |
| Brunner-Munzel test (benchstats / PR #614 default) to decide "is one faster?". | |
| Two scheduling policies over 2N wall-clock measurement slots: | |
| - sequential : [A]*N then [B]*N (A-A-A..., B-B-B...) | |
| - interleaved: A,B,A,B,... (round-robin) | |
| Noise model per slot: | |
| measured[slot] = true_time * transient(slot) * white_jitter | |
| - white_jitter : IID lognormal, benign per-sample noise | |
| - transient(slot) : depends on WALL-CLOCK slot index, not on kernel id | |
| (models thermal ramp / DVFS throttle / neighbor load) | |
| Key idea: transient noise is correlated with TIME. Sequential layout makes time | |
| a proxy for kernel identity, so the transient becomes a between-kernel BIAS. | |
| Interleaving makes both kernels span the same time, so it becomes shared variance. | |
| Experiments: | |
| 0. CONTROL : no transient -> both policies must sit at nominal false-positive | |
| rate (proves the test/harness is calibrated; the inflation below | |
| is caused by the transient, not a broken test). | |
| 1. RAMP : monotonic warm-up drift -> DIRECTIONAL bias (worst case). | |
| 2. THROTTLE : random-onset step throttle -> inflated false-positive rate. | |
| 3. POWER : a REAL 3% effect; how often does sequential report the WRONG | |
| direction with significance? | |
| """ | |
| import warnings | |
| import numpy as np | |
| from scipy.stats import brunnermunzel | |
| warnings.filterwarnings("ignore") # BM warns on ties/degenerate inputs; we guard p=nan | |
| RNG = np.random.default_rng(0) | |
| N = 30 # timed blocks per kernel | |
| SIGMA_WHITE = 0.03 # 3% IID per-block jitter (realistic for torch autorange blocks) | |
| N_TRIALS = 4000 | |
| ALPHAS = (0.05, 0.001) # 0.001 is PR #614's default | |
| # --------------------------------------------------------------------------- | |
| # helpers | |
| # --------------------------------------------------------------------------- | |
| def white(n, rng): | |
| return np.exp(rng.normal(0.0, SIGMA_WHITE, n)) | |
| def split(meas, policy): | |
| """Return (samples_A, samples_B) given a length-2N wall-clock measurement array.""" | |
| if policy == "sequential": | |
| return meas[:N], meas[N:] | |
| # interleaved | |
| return meas[0::2], meas[1::2] | |
| def bm_p(xa, xb): | |
| """Two-sided Brunner-Munzel p-value; nan -> 1.0 (non-significant).""" | |
| try: | |
| p = brunnermunzel(xa, xb, alternative="two-sided").pvalue | |
| except Exception: | |
| return 1.0 | |
| return 1.0 if np.isnan(p) else p | |
| def ascii_hist(vals, lo, hi, bins=21, width=46, label=""): | |
| vals = np.asarray(vals) | |
| edges = np.linspace(lo, hi, bins + 1) | |
| counts, _ = np.histogram(np.clip(vals, lo, hi), bins=edges) | |
| peak = max(counts.max(), 1) | |
| mid = (edges[:-1] + edges[1:]) / 2 | |
| print(f" {label}") | |
| for c, m in zip(counts, mid): | |
| bar = "#" * int(round(width * c / peak)) | |
| print(f" {m:+.3f} | {bar}") | |
| # --------------------------------------------------------------------------- | |
| # transient generators (functions of wall-clock slot index, NOT kernel id) | |
| # --------------------------------------------------------------------------- | |
| def transient_none(n_slots, rng): | |
| return np.ones(n_slots) | |
| def transient_ramp(n_slots, rng, amp): | |
| """Monotonic warm-up: 1.0 at start -> 1+amp at end (deterministic shape).""" | |
| return 1.0 + amp * np.linspace(0.0, 1.0, n_slots) | |
| def transient_throttle(n_slots, rng, amp, frac=0.5): | |
| """Step throttle of duration frac*n_slots, onset uniformly random.""" | |
| mult = np.ones(n_slots) | |
| dur = int(frac * n_slots) | |
| onset = rng.integers(0, n_slots - dur + 1) | |
| mult[onset:onset + dur] *= (1.0 + amp) | |
| return mult | |
| # --------------------------------------------------------------------------- | |
| # core trial | |
| # --------------------------------------------------------------------------- | |
| def trial(transient_fn, rng, true_ratio=1.0, **kw): | |
| """One A-vs-B comparison. true_ratio = true_time_B / true_time_A. | |
| Returns dict of per-policy (pvalue, measured_ratio = median_B/median_A). | |
| """ | |
| n_slots = 2 * N | |
| base = np.empty(n_slots) | |
| # assign true per-slot kernel time AFTER we know the policy, so do per-policy | |
| out = {} | |
| for policy in ("sequential", "interleaved"): | |
| mult = transient_fn(n_slots, rng, **kw) if kw else transient_fn(n_slots, rng) | |
| # true time at each slot depends on which kernel occupies it under policy | |
| true = np.ones(n_slots) | |
| if policy == "sequential": | |
| true[N:] = true_ratio # B slots | |
| else: | |
| true[1::2] = true_ratio # B slots (odd) | |
| meas = true * mult * white(n_slots, rng) | |
| xa, xb = split(meas, policy) | |
| out[policy] = (bm_p(xa, xb), np.median(xb) / np.median(xa)) | |
| return out | |
| def monte_carlo(transient_fn, label, amps, true_ratio=1.0, **fixed_kw): | |
| print(f"\n{'='*78}\n{label} (true_ratio B/A = {true_ratio:.3f}, " | |
| f"N={N}/kernel, trials={N_TRIALS})\n{'='*78}") | |
| hdr = (f"{'amp':>6} | {'policy':<11} | {'FPR@.05':>8} {'FPR@.001':>9} | " | |
| f"{'median ratio':>12} {'ratio p5..p95':>16}") | |
| print(hdr); print("-" * len(hdr)) | |
| captured = {} | |
| for amp in amps: | |
| kw = dict(fixed_kw) | |
| if amp is not None: | |
| kw["amp"] = amp | |
| per = {pol: {"p": [], "r": []} for pol in ("sequential", "interleaved")} | |
| for _ in range(N_TRIALS): | |
| res = trial(transient_fn, RNG, true_ratio=true_ratio, **kw) | |
| for pol, (p, r) in res.items(): | |
| per[pol]["p"].append(p) | |
| per[pol]["r"].append(r) | |
| for pol in ("sequential", "interleaved"): | |
| p = np.array(per[pol]["p"]); r = np.array(per[pol]["r"]) | |
| # "false positive" under the null (true_ratio==1) = any significant call. | |
| # under a real effect we instead report WRONG-direction significant calls. | |
| if true_ratio == 1.0: | |
| rate05 = np.mean(p < 0.05); rate001 = np.mean(p < 0.001) | |
| else: | |
| faster_is_b = true_ratio < 1.0 # B truly faster -> ratio<1 is "right" | |
| wrong = (r > 1.0) if faster_is_b else (r < 1.0) | |
| rate05 = np.mean((p < 0.05) & wrong) | |
| rate001 = np.mean((p < 0.001) & wrong) | |
| amp_s = "ctrl" if (amp in (None, 0.0)) else f"{amp:.2f}" | |
| print(f"{amp_s:>6} | {pol:<11} | {rate05:>8.3f} {rate001:>9.3f} | " | |
| f"{np.median(r):>12.4f} " | |
| f"{np.percentile(r,5):>7.4f}..{np.percentile(r,95):<.4f}") | |
| captured[(amp, pol)] = r | |
| print("-" * len(hdr)) | |
| return captured | |
| # --------------------------------------------------------------------------- | |
| def main(): | |
| print("Transient-noise bias in comparative microbenchmarking") | |
| print(f"white jitter sigma={SIGMA_WHITE}, test=Brunner-Munzel (two-sided)") | |
| # 0. CONTROL: no transient. Both policies should sit at ~alpha. This is the | |
| # falsification control: if these aren't ~0.05/~0.001 the harness is rigged. | |
| monte_carlo(transient_none, "EXPT 0 CONTROL (no transient)", amps=[None]) | |
| # 1. RAMP: monotonic warm-up drift -> directional bias in sequential. | |
| ramp = monte_carlo(transient_ramp, "EXPT 1 WARM-UP RAMP (monotonic drift)", | |
| amps=[0.0, 0.05, 0.10, 0.20]) | |
| # 2. THROTTLE: random-onset step -> non-directional FPR inflation. | |
| monte_carlo(transient_throttle, "EXPT 2 STEP THROTTLE (random onset, 50% duration)", | |
| amps=[0.0, 0.05, 0.10, 0.20, 0.40], frac=0.5) | |
| # 3. SIGN-FLIP: a REAL 5% B-SPEEDUP (genuine optimization). Under a warm-up | |
| # ramp, B runs in the hotter later slots in sequential, which fights the | |
| # speedup. How often does each policy confidently report B as SLOWER | |
| # (a false REGRESSION that would reject a good PR)? | |
| monte_carlo(transient_ramp, | |
| "EXPT 3 REAL 5% SPEEDUP (B truly 5% FASTER) under warm-up ramp\n" | |
| " rate = fraction calling B SLOWER *with significance* (false regression)", | |
| amps=[0.0, 0.05, 0.10, 0.20], true_ratio=0.95) | |
| # Visualize bias-vs-variance: distribution of measured ratio under the ramp. | |
| print(f"\n{'='*78}\nDISTRIBUTION OF MEASURED RATIO (median_B/median_A), " | |
| f"warm-up ramp amp=0.20, null (true=1.0)\n{'='*78}") | |
| print(" Truth = 1.000. Sequential is shifted (BIAS); interleaved is centered.") | |
| ascii_hist(ramp[(0.20, "sequential")], 0.85, 1.25, | |
| label="sequential (biased: B always runs hotter)") | |
| ascii_hist(ramp[(0.20, "interleaved")], 0.85, 1.25, | |
| label="interleaved (centered on truth)") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment