Skip to content

Instantly share code, notes, and snippets.

@julien-c
Created June 4, 2026 08:54
Show Gist options
  • Select an option

  • Save julien-c/12f02a2f87aa7028a49f4dadf0b6bc10 to your computer and use it in GitHub Desktop.

Select an option

Save julien-c/12f02a2f87aa7028a49f4dadf0b6bc10 to your computer and use it in GitHub Desktop.
hammer.py
#!/usr/bin/env python3
"""Traffic hammer + canary for reproducing llama-server output corruption.
Mimics the production traffic pattern that preceded the 2026-06-03 '/////'
corruption: small chats with a growing shared prefix (checkpoint restores),
long generations cancelled mid-stream, 4-way parallel bursts, slot swapping,
and >10s idle gaps (CUDA/HIP graph-cache eviction boundary).
After every round, a temperature-0 canary probe checks output sanity.
On canary failure the harness logs the round + action history and exits 42.
"""
import http.client
import json
import sys
import threading
import time
PORT = int(sys.argv[1])
LABEL = sys.argv[2] if len(sys.argv) > 2 else str(PORT)
MAX_ROUNDS = int(sys.argv[3]) if len(sys.argv) > 3 else 200
# comma-separated action kinds to skip: small-chat,parallel,cancel,slot-swap,idle,followup
SKIP = set((sys.argv[4] if len(sys.argv) > 4 else "").split(",")) - {""}
LOG = f"/home/gibbon/llama-debug/hammer-{LABEL}.log"
def log(msg):
line = f"{time.strftime('%H:%M:%S')} [{LABEL}] {msg}"
with open(LOG, "a") as f:
f.write(line + "\n")
def post(path, payload, timeout=600, read_body=True):
conn = http.client.HTTPConnection("127.0.0.1", PORT, timeout=timeout)
try:
conn.request("POST", path, json.dumps(payload),
{"Content-Type": "application/json"})
resp = conn.getresponse()
body = resp.read() if read_body else b""
return resp.status, body
finally:
conn.close()
def chat(messages, max_tokens, temperature=0.8):
status, body = post("/v1/chat/completions", {
"messages": messages, "max_tokens": max_tokens,
"temperature": temperature,
})
d = json.loads(body)
m = d["choices"][0]["message"]
return (m.get("reasoning_content") or "") + (m.get("content") or "")
def cancelled_stream(messages, kill_after_bytes=2000):
"""Start a streaming request and slam the connection shut mid-generation."""
conn = http.client.HTTPConnection("127.0.0.1", PORT, timeout=600)
conn.request("POST", "/v1/chat/completions", json.dumps({
"messages": messages, "max_tokens": 4096, "temperature": 0.8,
"stream": True,
}), {"Content-Type": "application/json"})
resp = conn.getresponse()
got = 0
while got < kill_after_bytes:
chunk = resp.read(256)
if not chunk:
break
got += len(chunk)
conn.sock.close() # abrupt disconnect -> server-side task cancel
return got
CANARY_MODE = "chat" if len(sys.argv) > 5 and sys.argv[5] == "chat" else "raw"
def canary():
"""Temp-0 probe with a known-good expected answer."""
if CANARY_MODE == "chat":
status, body = post("/v1/chat/completions", {
"messages": [{"role": "user",
"content": "What is the capital of France? Answer with just the city name."}],
"max_tokens": 80, "temperature": 0,
})
m = json.loads(body)["choices"][0]["message"]
content = (m.get("reasoning_content") or "") + (m.get("content") or "")
else:
status, body = post("/completion", {
"prompt": "The capital of France is", "n_predict": 4,
"temperature": 0,
})
content = json.loads(body).get("content", "")
ok = "Paris" in content
return ok, content[-60:]
CONVO_TOPICS = [
"Give me a one-line fun fact about octopuses.",
"And about volcanoes?",
"Now one about Roman history.",
"One about deep sea fish.",
"One about the Moon.",
"One about honey bees.",
]
KINDS = ["small-chat", "parallel", "cancel", "slot-swap", "idle", "followup"]
def round_actions(r, convo):
"""Deterministic-ish schedule cycling through production-like events."""
k = r % 6
if KINDS[k] in SKIP:
return f"skipped {KINDS[k]}"
if k == 0:
# growing-prefix small chat (mimics the periodic production client)
topic = CONVO_TOPICS[(r // 6) % len(CONVO_TOPICS)]
convo.append({"role": "user", "content": topic})
out = chat(convo, 48)
convo.append({"role": "assistant", "content": out[:200] or "ok"})
if len(convo) > 9:
del convo[1:3] # keep prefix shared but bounded
return f"small-chat ({len(out)} chars)"
if k == 1:
# 4-way parallel burst, distinct prompts -> all slots active
outs = [None] * 4
def worker(i):
outs[i] = chat([{"role": "user",
"content": f"Reply with one word: color #{i}"}], 24)
ts = [threading.Thread(target=worker, args=(i,)) for i in range(4)]
[t.start() for t in ts]
[t.join() for t in ts]
return "parallel-burst " + repr([0 if o is None else len(o) for o in outs])
if k == 2:
# long generation, cancelled mid-stream (production task 74 pattern)
got = cancelled_stream([{"role": "user",
"content": "Write a very long detailed essay about glaciers."}])
return f"cancelled-stream ({got} bytes)"
if k == 3:
# slot-swap pressure: 5 distinct prompts > 4 slots -> prompt_save/clear
for i in range(5):
chat([{"role": "user", "content": f"Define term number {r}-{i} of thermodynamics briefly."}], 32)
return "slot-swap x5"
if k == 4:
# idle gap > 10s: crosses the cuda-graph eviction sweep boundary
time.sleep(12)
return "idle-12s"
# follow-up on existing convo (checkpoint restore path)
out = chat(convo + [{"role": "user", "content": "Summarize our chat in 5 words."}], 32)
return f"followup ({len(out)} chars)"
def main():
log(f"start hammer port={PORT} max_rounds={MAX_ROUNDS}")
ok, content = canary()
log(f"baseline canary: ok={ok} content={content!r}")
if not ok:
log("BASELINE ALREADY BAD - aborting")
sys.exit(43)
convo = [{"role": "system", "content": "You are a helpful assistant."}]
for r in range(MAX_ROUNDS):
try:
desc = round_actions(r, convo)
except Exception as e:
log(f"round {r}: action error: {e!r}")
time.sleep(2)
continue
try:
ok, content = canary()
except Exception as e:
log(f"round {r}: canary error: {e!r}")
time.sleep(2)
continue
log(f"round {r}: {desc} | canary ok={ok} {content!r}")
if not ok:
log(f"*** CORRUPTION DETECTED at round {r} after: {desc} ***")
sys.exit(42)
log("finished all rounds, no corruption")
sys.exit(0)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment