Created
June 4, 2026 08:54
-
-
Save julien-c/12f02a2f87aa7028a49f4dadf0b6bc10 to your computer and use it in GitHub Desktop.
hammer.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """Traffic hammer + canary for reproducing llama-server output corruption. | |
| Mimics the production traffic pattern that preceded the 2026-06-03 '/////' | |
| corruption: small chats with a growing shared prefix (checkpoint restores), | |
| long generations cancelled mid-stream, 4-way parallel bursts, slot swapping, | |
| and >10s idle gaps (CUDA/HIP graph-cache eviction boundary). | |
| After every round, a temperature-0 canary probe checks output sanity. | |
| On canary failure the harness logs the round + action history and exits 42. | |
| """ | |
| import http.client | |
| import json | |
| import sys | |
| import threading | |
| import time | |
| PORT = int(sys.argv[1]) | |
| LABEL = sys.argv[2] if len(sys.argv) > 2 else str(PORT) | |
| MAX_ROUNDS = int(sys.argv[3]) if len(sys.argv) > 3 else 200 | |
| # comma-separated action kinds to skip: small-chat,parallel,cancel,slot-swap,idle,followup | |
| SKIP = set((sys.argv[4] if len(sys.argv) > 4 else "").split(",")) - {""} | |
| LOG = f"/home/gibbon/llama-debug/hammer-{LABEL}.log" | |
| def log(msg): | |
| line = f"{time.strftime('%H:%M:%S')} [{LABEL}] {msg}" | |
| with open(LOG, "a") as f: | |
| f.write(line + "\n") | |
| def post(path, payload, timeout=600, read_body=True): | |
| conn = http.client.HTTPConnection("127.0.0.1", PORT, timeout=timeout) | |
| try: | |
| conn.request("POST", path, json.dumps(payload), | |
| {"Content-Type": "application/json"}) | |
| resp = conn.getresponse() | |
| body = resp.read() if read_body else b"" | |
| return resp.status, body | |
| finally: | |
| conn.close() | |
| def chat(messages, max_tokens, temperature=0.8): | |
| status, body = post("/v1/chat/completions", { | |
| "messages": messages, "max_tokens": max_tokens, | |
| "temperature": temperature, | |
| }) | |
| d = json.loads(body) | |
| m = d["choices"][0]["message"] | |
| return (m.get("reasoning_content") or "") + (m.get("content") or "") | |
| def cancelled_stream(messages, kill_after_bytes=2000): | |
| """Start a streaming request and slam the connection shut mid-generation.""" | |
| conn = http.client.HTTPConnection("127.0.0.1", PORT, timeout=600) | |
| conn.request("POST", "/v1/chat/completions", json.dumps({ | |
| "messages": messages, "max_tokens": 4096, "temperature": 0.8, | |
| "stream": True, | |
| }), {"Content-Type": "application/json"}) | |
| resp = conn.getresponse() | |
| got = 0 | |
| while got < kill_after_bytes: | |
| chunk = resp.read(256) | |
| if not chunk: | |
| break | |
| got += len(chunk) | |
| conn.sock.close() # abrupt disconnect -> server-side task cancel | |
| return got | |
| CANARY_MODE = "chat" if len(sys.argv) > 5 and sys.argv[5] == "chat" else "raw" | |
| def canary(): | |
| """Temp-0 probe with a known-good expected answer.""" | |
| if CANARY_MODE == "chat": | |
| status, body = post("/v1/chat/completions", { | |
| "messages": [{"role": "user", | |
| "content": "What is the capital of France? Answer with just the city name."}], | |
| "max_tokens": 80, "temperature": 0, | |
| }) | |
| m = json.loads(body)["choices"][0]["message"] | |
| content = (m.get("reasoning_content") or "") + (m.get("content") or "") | |
| else: | |
| status, body = post("/completion", { | |
| "prompt": "The capital of France is", "n_predict": 4, | |
| "temperature": 0, | |
| }) | |
| content = json.loads(body).get("content", "") | |
| ok = "Paris" in content | |
| return ok, content[-60:] | |
| CONVO_TOPICS = [ | |
| "Give me a one-line fun fact about octopuses.", | |
| "And about volcanoes?", | |
| "Now one about Roman history.", | |
| "One about deep sea fish.", | |
| "One about the Moon.", | |
| "One about honey bees.", | |
| ] | |
| KINDS = ["small-chat", "parallel", "cancel", "slot-swap", "idle", "followup"] | |
| def round_actions(r, convo): | |
| """Deterministic-ish schedule cycling through production-like events.""" | |
| k = r % 6 | |
| if KINDS[k] in SKIP: | |
| return f"skipped {KINDS[k]}" | |
| if k == 0: | |
| # growing-prefix small chat (mimics the periodic production client) | |
| topic = CONVO_TOPICS[(r // 6) % len(CONVO_TOPICS)] | |
| convo.append({"role": "user", "content": topic}) | |
| out = chat(convo, 48) | |
| convo.append({"role": "assistant", "content": out[:200] or "ok"}) | |
| if len(convo) > 9: | |
| del convo[1:3] # keep prefix shared but bounded | |
| return f"small-chat ({len(out)} chars)" | |
| if k == 1: | |
| # 4-way parallel burst, distinct prompts -> all slots active | |
| outs = [None] * 4 | |
| def worker(i): | |
| outs[i] = chat([{"role": "user", | |
| "content": f"Reply with one word: color #{i}"}], 24) | |
| ts = [threading.Thread(target=worker, args=(i,)) for i in range(4)] | |
| [t.start() for t in ts] | |
| [t.join() for t in ts] | |
| return "parallel-burst " + repr([0 if o is None else len(o) for o in outs]) | |
| if k == 2: | |
| # long generation, cancelled mid-stream (production task 74 pattern) | |
| got = cancelled_stream([{"role": "user", | |
| "content": "Write a very long detailed essay about glaciers."}]) | |
| return f"cancelled-stream ({got} bytes)" | |
| if k == 3: | |
| # slot-swap pressure: 5 distinct prompts > 4 slots -> prompt_save/clear | |
| for i in range(5): | |
| chat([{"role": "user", "content": f"Define term number {r}-{i} of thermodynamics briefly."}], 32) | |
| return "slot-swap x5" | |
| if k == 4: | |
| # idle gap > 10s: crosses the cuda-graph eviction sweep boundary | |
| time.sleep(12) | |
| return "idle-12s" | |
| # follow-up on existing convo (checkpoint restore path) | |
| out = chat(convo + [{"role": "user", "content": "Summarize our chat in 5 words."}], 32) | |
| return f"followup ({len(out)} chars)" | |
| def main(): | |
| log(f"start hammer port={PORT} max_rounds={MAX_ROUNDS}") | |
| ok, content = canary() | |
| log(f"baseline canary: ok={ok} content={content!r}") | |
| if not ok: | |
| log("BASELINE ALREADY BAD - aborting") | |
| sys.exit(43) | |
| convo = [{"role": "system", "content": "You are a helpful assistant."}] | |
| for r in range(MAX_ROUNDS): | |
| try: | |
| desc = round_actions(r, convo) | |
| except Exception as e: | |
| log(f"round {r}: action error: {e!r}") | |
| time.sleep(2) | |
| continue | |
| try: | |
| ok, content = canary() | |
| except Exception as e: | |
| log(f"round {r}: canary error: {e!r}") | |
| time.sleep(2) | |
| continue | |
| log(f"round {r}: {desc} | canary ok={ok} {content!r}") | |
| if not ok: | |
| log(f"*** CORRUPTION DETECTED at round {r} after: {desc} ***") | |
| sys.exit(42) | |
| log("finished all rounds, no corruption") | |
| sys.exit(0) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment