Skip to content

Instantly share code, notes, and snippets.

@MohamedGouaouri
Created September 30, 2025 15:16
Show Gist options
  • Save MohamedGouaouri/93f535ec18fd347b17985a416630969c to your computer and use it in GitHub Desktop.
Save MohamedGouaouri/93f535ec18fd347b17985a416630969c to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import os, sys, time, json, math, signal, subprocess, shlex
from datetime import datetime
from statistics import mean, pstdev
import requests
###############################################################################
# Env / defaults
###############################################################################
NODE_NAME = os.getenv("NODE_NAME", "") # required
PROM_URL = os.getenv("PROM_URL", "http://prometheus.monitoring.svc:9090")
# Default: compute Watts from platform energy (J/s). Adjust label to your Kepler.
# Try several expressions in order; first that returns a number is used.
PROM_POWER_EXPR_LIST = [
# Kepler classic (energy per node)
'sum by (node) (rate(node_platform_joules_total{node="%NODE%"}[30s]))',
'sum by (node) (rate(node_package_joules_total{node="%NODE%"}[30s])) + '
'sum by (node) (rate(node_dram_joules_total{node="%NODE%"}[30s]))',
# Kepler prefixed
'sum by (node) (rate(kepler_node_platform_joules_total{node="%NODE%"}[30s]))',
]
PROM_TLS_SKIP_VERIFY = os.getenv("PROM_TLS_SKIP_VERIFY", "false").lower() == "true"
PROM_QUERY_TIMEOUT_S = int(os.getenv("PROM_QUERY_TIMEOUT_S", "10"))
STEP_PERCENT = float(os.getenv("STEP_PERCENT", "0.5")) # e.g., 0.5 (%)
HOLD_SECONDS = int(os.getenv("HOLD_SECONDS", "60")) # per step
REPEATS = int(os.getenv("REPEATS", "3"))
WARMUP_SECONDS = int(os.getenv("WARMUP_SECONDS", "10"))
SAMPLE_INTERVAL_SECONDS = int(os.getenv("SAMPLE_INTERVAL_SECONDS", "5"))
CPU_WORKERS = os.getenv("CPU_WORKERS") # default: all logical CPUs
if CPU_WORKERS is None or CPU_WORKERS == "":
try:
CPU_WORKERS = str(os.cpu_count())
except:
CPU_WORKERS = "1"
RESULTS_PATH = os.getenv("RESULTS_PATH", "/results/results.csv")
###############################################################################
# Helpers
###############################################################################
def log(msg):
print(f"[{datetime.utcnow().isoformat()}Z] {msg}", flush=True)
def _prom_query(expr):
try:
r = requests.get(
PROM_URL.rstrip("/") + "/api/v1/query",
params={"query": expr},
timeout=PROM_QUERY_TIMEOUT_S,
verify=not PROM_TLS_SKIP_VERIFY,
)
r.raise_for_status()
data = r.json()
if data.get("status") != "success":
return None
res = data.get("data", {}).get("result", [])
if not res:
return None
# Expect a vector result; take the first sample value
v = float(res[0]["value"][1])
return v
except Exception as e:
log(f"PromQL error: {e}")
return None
def query_power_watts():
# Try expressions in order; return first non-None
for expr in PROM_POWER_EXPR_LIST:
expr_use = expr.replace("%NODE%", NODE_NAME)
val = _prom_query(expr_use)
if val is not None and not math.isnan(val) and math.isfinite(val):
return val
return None
def run_stress(load_percent, duration_sec):
"""
Start stress-ng with given cpu-load% across CPU_WORKERS workers.
Return Popen handle; caller stops it.
"""
# stress-ng: --cpu N --cpu-load P
cmd = f"stress-ng --cpu {CPU_WORKERS} --cpu-load {load_percent} --timeout {duration_sec}s --metrics-brief"
log(f"Starting: {cmd}")
# Use Popen so we can sample during execution
p = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
return p
def stop_process(p):
if p and p.poll() is None:
try:
p.terminate()
except:
pass
try:
p.wait(timeout=5)
except subprocess.TimeoutExpired:
try:
p.kill()
except:
pass
def sweep_steps():
# 0% .. 100% inclusive by STEP_PERCENT
steps = []
s = 0.0
while s <= 100.0000001:
steps.append(round(s, 3))
s += STEP_PERCENT
# Guarantee 100% included
if steps[-1] != 100.0:
steps.append(100.0)
return steps
def ensure_results_header():
if not os.path.exists(RESULTS_PATH) or os.path.getsize(RESULTS_PATH) == 0:
with open(RESULTS_PATH, "w") as f:
f.write("timestamp_start,timestamp_end,node,repeat,step_percent,workers,watts_mean,watts_std,samples\n")
###############################################################################
# Main
###############################################################################
def profiler():
if not NODE_NAME:
log("ERROR: NODE_NAME env is required"); sys.exit(2)
ensure_results_header()
steps = sweep_steps()
log(f"Node={NODE_NAME} workers={CPU_WORKERS} repeats={REPEATS} steps={len(steps)} "
f"(step={STEP_PERCENT}%, hold={HOLD_SECONDS}s, warmup={WARMUP_SECONDS}s, sample_interval={SAMPLE_INTERVAL_SECONDS}s)")
for r in range(1, REPEATS + 1):
log(f"=== Repeat {r}/{REPEATS} ===")
for sp in steps:
t0 = time.time()
p = run_stress(load_percent=sp, duration_sec=HOLD_SECONDS + WARMUP_SECONDS + 5)
# warmup
time.sleep(WARMUP_SECONDS)
# sample
samples = []
end_at = time.time() + HOLD_SECONDS
while time.time() < end_at:
val = query_power_watts()
if val is not None:
samples.append(val)
log(f"step={sp:.2f}% sample={len(samples)} power={val:.3f} W")
else:
log(f"step={sp:.2f}% sample failed (PromQL returned None)")
time.sleep(SAMPLE_INTERVAL_SECONDS)
stop_process(p)
t1 = time.time()
if samples:
mu = mean(samples)
sd = pstdev(samples) if len(samples) > 1 else 0.0
else:
mu = float("nan")
sd = float("nan")
with open(RESULTS_PATH, "a") as f:
f.write(f"{datetime.utcfromtimestamp(t0).isoformat()}Z,"
f"{datetime.utcfromtimestamp(t1).isoformat()}Z,"
f"{NODE_NAME},{r},{sp},{CPU_WORKERS},{mu},{sd},{len(samples)}\n")
log(f"[RESULT] repeat={r} step={sp:.2f}% watts_mean={mu:.3f} watts_std={sd:.3f} samples={len(samples)}")
log(f"Done. CSV at {RESULTS_PATH}")
def on_sigterm(signum, frame):
log("SIGTERM received, exiting…")
sys.exit(0)
signal.signal(signal.SIGTERM, on_sigterm)
try:
profiler()
except KeyboardInterrupt:
on_sigterm(None, None)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment