|
#!/usr/bin/env python3 |
|
"""Sequential llama-server prompt benchmark for MoE CPU-offload experiments.""" |
|
|
|
from __future__ import annotations |
|
|
|
import argparse |
|
import json |
|
import os |
|
import signal |
|
import subprocess |
|
import sys |
|
import time |
|
import urllib.error |
|
import urllib.request |
|
from pathlib import Path |
|
|
|
|
|
PROMPTS = { |
|
"base": [ |
|
"Return exactly one concise sentence explaining what CPU-to-GPU expert offload means." |
|
], |
|
"multi": [ |
|
"""You are reviewing a llama.cpp change. The current code receives a bitset of MoE experts used by the router, copies contiguous selected expert ranges from host memory to a GPU-side buffer, and repeats that work every decode step. |
|
|
|
List the two most likely performance bottlenecks, the safest minimal optimization, and one correctness risk to test.""", |
|
"""Hardware: RTX 5090 Laptop GPU with 24 GB VRAM, 60 GB RAM, Intel Ultra 9. Model: Q4 MoE with experts on CPU and dense weights on GPU. |
|
|
|
Create a compact benchmark matrix that would convince a maintainer the optimization helps local users without overfitting to one prompt.""", |
|
"""Explain a persistent valid-expert bitmap design for MoE expert offload. Include pseudocode for used_ids, valid_ids, missing_ids, and contiguous range copy planning. Keep it implementation-focused.""", |
|
"""Draft a short maintainer-facing PR note for an optimization that skips re-copying MoE expert slices already present in the GPU copy buffer. Include scope, expected benefit, and why it should be low risk.""", |
|
], |
|
} |
|
|
|
|
|
def long_prompt_blocks(n_blocks: int) -> str: |
|
parts = [ |
|
"You are auditing a long local-inference trace for a llama.cpp MoE CPU-offload change.", |
|
"Use the whole trace, cite concrete recurring patterns, and finish with a compact engineering recommendation.", |
|
"", |
|
"TRACE:", |
|
] |
|
templates = [ |
|
"layer={i:05d} route_a={a:02d} route_b={b:02d} stage=decode copy_policy=contiguous_ranges note=expert reuse is possible when the resident bitmap already contains both routed experts.", |
|
"request={i:05d} ctx_window=long prefill_chunk={c:04d} observation=CPU experts remain host-resident while selected slices are staged to the active GPU buffer before MUL_MAT_ID.", |
|
"sample={i:05d} bottleneck=h2d_transfer risk=stale_residency_guard mitigation=compare source pointer destination pointer and expert count before skipping a copy.", |
|
"case={i:05d} hardware=rtx5090_laptop memory=24gb_vram_60gb_ram target=interactive_long_context expected=avoid repeated uploads for experts already present in the split copy.", |
|
] |
|
for i in range(n_blocks): |
|
parts.append(templates[i % len(templates)].format( |
|
i=i, |
|
a=(i * 7 + 3) % 64, |
|
b=(i * 11 + 5) % 64, |
|
c=(i * 13) % 4096, |
|
)) |
|
parts.extend([ |
|
"", |
|
"TASK:", |
|
"Summarize the routing and copy behavior. Identify whether persistent expert residency helps more during prefill or decode, and name one correctness invariant.", |
|
]) |
|
return "\n".join(parts) |
|
|
|
|
|
def medium_turn_blocks(turn: int, n_blocks: int) -> str: |
|
parts = [ |
|
f"Turn {turn}: medium-size trace chunk for a llama.cpp MoE CPU-offload investigation.", |
|
"Keep this chunk as part of the running conversation context.", |
|
"", |
|
"TRACE CHUNK:", |
|
] |
|
templates = [ |
|
"turn={turn:02d} row={i:05d} experts=({a:02d},{b:02d}) phase=medium_prefill observation=selected CPU MoE experts are staged to the active GPU copy buffer.", |
|
"turn={turn:02d} row={i:05d} reuse_key=layer_tensor_backend_copy invariant=skip only when source pointer destination pointer expert count and resident bit agree.", |
|
"turn={turn:02d} row={i:05d} local_user_pattern=long_chat chunk_tokens=medium expectation=do not recopy experts already present from earlier tokens.", |
|
"turn={turn:02d} row={i:05d} maintainer_note=small_scheduler_patch risk=stale_bitmap mitigation=data_pointer_guard and contiguous missing range copies.", |
|
] |
|
for i in range(n_blocks): |
|
parts.append(templates[(turn + i) % len(templates)].format( |
|
turn=turn, |
|
i=i, |
|
a=(turn * 17 + i * 7 + 3) % 64, |
|
b=(turn * 19 + i * 11 + 5) % 64, |
|
)) |
|
parts.extend([ |
|
"", |
|
"Instruction for this turn: acknowledge the chunk briefly and preserve the important routing/copy details for the next turn.", |
|
]) |
|
return "\n".join(parts) |
|
|
|
|
|
def tokenize_len(base_url: str, text: str) -> int: |
|
response = http_json("POST", base_url + "/tokenize", { |
|
"content": text, |
|
"add_special": False, |
|
"parse_special": True, |
|
}) |
|
return len(response.get("tokens", [])) |
|
|
|
|
|
def build_long_prompt(base_url: str, target_tokens: int) -> tuple[str, int]: |
|
if target_tokens <= 0: |
|
raise ValueError("--long-prompt-tokens must be positive for the long suite") |
|
|
|
lo = 1 |
|
hi = 1 |
|
while tokenize_len(base_url, long_prompt_blocks(hi)) < target_tokens: |
|
hi *= 2 |
|
|
|
best_prompt = long_prompt_blocks(hi) |
|
best_tokens = tokenize_len(base_url, best_prompt) |
|
while lo <= hi: |
|
mid = (lo + hi) // 2 |
|
prompt = long_prompt_blocks(mid) |
|
n_tokens = tokenize_len(base_url, prompt) |
|
if n_tokens < target_tokens: |
|
lo = mid + 1 |
|
else: |
|
best_prompt = prompt |
|
best_tokens = n_tokens |
|
hi = mid - 1 |
|
|
|
return best_prompt, best_tokens |
|
|
|
|
|
def build_medium_chunk(base_url: str, turn: int, target_tokens: int) -> tuple[str, int]: |
|
if target_tokens <= 0: |
|
raise ValueError("--medium-turn-tokens must be positive for the medium-turns suite") |
|
|
|
lo = 1 |
|
hi = 1 |
|
while tokenize_len(base_url, medium_turn_blocks(turn, hi)) < target_tokens: |
|
hi *= 2 |
|
|
|
best_prompt = medium_turn_blocks(turn, hi) |
|
best_tokens = tokenize_len(base_url, best_prompt) |
|
while lo <= hi: |
|
mid = (lo + hi) // 2 |
|
prompt = medium_turn_blocks(turn, mid) |
|
n_tokens = tokenize_len(base_url, prompt) |
|
if n_tokens < target_tokens: |
|
lo = mid + 1 |
|
else: |
|
best_prompt = prompt |
|
best_tokens = n_tokens |
|
hi = mid - 1 |
|
|
|
return best_prompt, best_tokens |
|
|
|
|
|
def get_prompts(suite: str, base_url: str, long_prompt_tokens: int) -> tuple[list[str], dict]: |
|
if suite != "long": |
|
return PROMPTS[suite], {} |
|
|
|
prompt, n_tokens = build_long_prompt(base_url, long_prompt_tokens) |
|
return [prompt], { |
|
"long_prompt_target_tokens": long_prompt_tokens, |
|
"long_prompt_tokenized_tokens": n_tokens, |
|
} |
|
|
|
|
|
def http_json(method: str, url: str, payload: dict | None = None, timeout: int = 600) -> dict: |
|
data = None |
|
headers = {} |
|
if payload is not None: |
|
data = json.dumps(payload).encode("utf-8") |
|
headers["Content-Type"] = "application/json" |
|
req = urllib.request.Request(url, data=data, headers=headers, method=method) |
|
with urllib.request.urlopen(req, timeout=timeout) as resp: |
|
raw = resp.read().decode("utf-8") |
|
return json.loads(raw) if raw else {} |
|
|
|
|
|
def wait_ready(base_url: str, proc: subprocess.Popen, timeout_s: int) -> None: |
|
deadline = time.monotonic() + timeout_s |
|
last_error = None |
|
while time.monotonic() < deadline: |
|
if proc.poll() is not None: |
|
raise RuntimeError(f"server exited early with code {proc.returncode}") |
|
try: |
|
with urllib.request.urlopen(base_url + "/health", timeout=5) as resp: |
|
if resp.status in (200, 503): |
|
if resp.status == 200: |
|
return |
|
except (urllib.error.URLError, TimeoutError) as exc: |
|
last_error = exc |
|
time.sleep(1) |
|
raise TimeoutError(f"server did not become healthy within {timeout_s}s: {last_error}") |
|
|
|
|
|
def chat_messages_once(base_url: str, alias: str, messages: list[dict], max_tokens: int, cache_prompt: bool) -> tuple[dict, str]: |
|
payload = { |
|
"model": alias, |
|
"messages": messages, |
|
"max_tokens": max_tokens, |
|
"temperature": 0, |
|
"top_k": 1, |
|
"stream": False, |
|
"cache_prompt": cache_prompt, |
|
} |
|
started = time.perf_counter() |
|
response = http_json("POST", base_url + "/v1/chat/completions", payload) |
|
ended = time.perf_counter() |
|
usage = response.get("usage", {}) |
|
content = response.get("choices", [{}])[0].get("message", {}).get("content", "") |
|
prompt_tokens = usage.get("prompt_tokens", 0) |
|
cached_tokens = usage.get("prompt_tokens_details", {}).get("cached_tokens", 0) |
|
return { |
|
"latency_s": ended - started, |
|
"prompt_tokens": prompt_tokens, |
|
"cached_prompt_tokens": cached_tokens, |
|
"processed_prompt_tokens": max(prompt_tokens - cached_tokens, 0), |
|
"completion_tokens": usage.get("completion_tokens", 0), |
|
"total_tokens": usage.get("total_tokens", 0), |
|
"content_chars": len(content), |
|
}, content |
|
|
|
|
|
def chat_once(base_url: str, alias: str, prompt: str, max_tokens: int, cache_prompt: bool) -> dict: |
|
item, _ = chat_messages_once(base_url, alias, [ |
|
{ |
|
"role": "system", |
|
"content": "You are a concise senior systems engineer. Answer directly.", |
|
}, |
|
{"role": "user", "content": prompt}, |
|
], max_tokens, cache_prompt) |
|
return item |
|
|
|
|
|
def run_medium_turns(base_url: str, alias: str, args: argparse.Namespace) -> tuple[list[dict], float, dict]: |
|
messages = [{ |
|
"role": "system", |
|
"content": "You are a concise senior systems engineer. Answer directly and remember the accumulated trace.", |
|
}] |
|
requests = [] |
|
chunk_tokens = [] |
|
measured_start = time.perf_counter() |
|
|
|
for turn in range(args.medium_max_turns): |
|
chunk, n_tokens = build_medium_chunk(base_url, turn + 1, args.medium_turn_tokens) |
|
chunk_tokens.append(n_tokens) |
|
messages.append({"role": "user", "content": chunk}) |
|
|
|
item, content = chat_messages_once(base_url, alias, messages, args.max_tokens, True) |
|
item["prompt_index"] = turn |
|
item["turn"] = turn + 1 |
|
item["chunk_tokenized_tokens"] = n_tokens |
|
requests.append(item) |
|
|
|
messages.append({"role": "assistant", "content": content}) |
|
if item["prompt_tokens"] >= args.medium_target_tokens: |
|
break |
|
|
|
measured_s = time.perf_counter() - measured_start |
|
return requests, measured_s, { |
|
"medium_turn_tokens": args.medium_turn_tokens, |
|
"medium_target_tokens": args.medium_target_tokens, |
|
"medium_max_turns": args.medium_max_turns, |
|
"medium_actual_turns": len(requests), |
|
"medium_chunk_tokenized_min": min(chunk_tokens) if chunk_tokens else 0, |
|
"medium_chunk_tokenized_max": max(chunk_tokens) if chunk_tokens else 0, |
|
} |
|
|
|
|
|
def terminate(proc: subprocess.Popen) -> None: |
|
if proc.poll() is not None: |
|
return |
|
proc.send_signal(signal.SIGINT) |
|
try: |
|
proc.wait(timeout=15) |
|
except subprocess.TimeoutExpired: |
|
proc.terminate() |
|
try: |
|
proc.wait(timeout=10) |
|
except subprocess.TimeoutExpired: |
|
proc.kill() |
|
|
|
|
|
def main() -> int: |
|
parser = argparse.ArgumentParser() |
|
parser.add_argument("--server-bin", required=True) |
|
parser.add_argument("--repo", required=True) |
|
parser.add_argument("--suite", choices=sorted([*PROMPTS, "long", "medium-turns"]), required=True) |
|
parser.add_argument("--mode", choices=["cpu-moe", "fit-auto"], default="cpu-moe") |
|
parser.add_argument("--port", type=int, required=True) |
|
parser.add_argument("--duration-s", type=float, default=60.0) |
|
parser.add_argument("--ctx-size", type=int, default=4096) |
|
parser.add_argument("--max-tokens", type=int, default=192) |
|
parser.add_argument("--warmup-s", type=float, default=10.0) |
|
parser.add_argument("--warmup-tokens", type=int, default=32) |
|
parser.add_argument("--ready-timeout-s", type=int, default=1200) |
|
parser.add_argument("--long-prompt-tokens", type=int, default=8192) |
|
parser.add_argument("--medium-turn-tokens", type=int, default=5000) |
|
parser.add_argument("--medium-target-tokens", type=int, default=60000) |
|
parser.add_argument("--medium-max-turns", type=int, default=20) |
|
parser.add_argument("--cache-prompt", action="store_true") |
|
parser.add_argument("--no-op-offload", action="store_true") |
|
parser.add_argument("--out", required=True) |
|
parser.add_argument("--log", required=True) |
|
args = parser.parse_args() |
|
|
|
out_path = Path(args.out) |
|
log_path = Path(args.log) |
|
out_path.parent.mkdir(parents=True, exist_ok=True) |
|
log_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
alias = f"bench-{args.suite}-{args.mode}" |
|
prompt_cache_enabled = args.cache_prompt or args.suite == "medium-turns" |
|
cmd = [ |
|
args.server_bin, |
|
"-hf", |
|
args.repo, |
|
"--no-mmproj", |
|
"--host", |
|
"127.0.0.1", |
|
"--port", |
|
str(args.port), |
|
"-a", |
|
alias, |
|
"-c", |
|
str(args.ctx_size), |
|
"-ngl", |
|
"auto", |
|
"-np", |
|
"1", |
|
"--no-ui", |
|
"--no-warmup", |
|
"--metrics", |
|
"--reasoning", |
|
"off", |
|
"--log-verbosity", |
|
"2", |
|
] |
|
if not prompt_cache_enabled: |
|
cmd.append("--no-cache-prompt") |
|
if args.mode == "cpu-moe": |
|
cmd.append("--cpu-moe") |
|
if args.no_op_offload: |
|
cmd.append("--no-op-offload") |
|
|
|
env = os.environ.copy() |
|
env.setdefault("LLAMA_ARG_FLASH_ATTN", "auto") |
|
|
|
with log_path.open("w", encoding="utf-8") as log: |
|
proc = subprocess.Popen(cmd, stdout=log, stderr=subprocess.STDOUT, env=env) |
|
base_url = f"http://127.0.0.1:{args.port}" |
|
try: |
|
wait_ready(base_url, proc, args.ready_timeout_s) |
|
warmups = [] |
|
if args.suite == "medium-turns": |
|
warmup_s = 0.0 |
|
requests, measured_s, extra_summary = run_medium_turns(base_url, alias, args) |
|
else: |
|
prompts, extra_summary = get_prompts(args.suite, base_url, args.long_prompt_tokens) |
|
warmup_start = time.perf_counter() |
|
warmup_idx = 0 |
|
while time.perf_counter() - warmup_start < args.warmup_s: |
|
prompt = prompts[warmup_idx % len(prompts)] |
|
item = chat_once(base_url, alias, prompt, args.warmup_tokens, prompt_cache_enabled) |
|
item["prompt_index"] = warmup_idx % len(prompts) |
|
warmups.append(item) |
|
warmup_idx += 1 |
|
warmup_s = time.perf_counter() - warmup_start |
|
|
|
requests = [] |
|
measured_start = time.perf_counter() |
|
idx = 0 |
|
while True: |
|
elapsed = time.perf_counter() - measured_start |
|
if elapsed >= args.duration_s and idx >= len(prompts): |
|
break |
|
prompt = prompts[idx % len(prompts)] |
|
item = chat_once(base_url, alias, prompt, args.max_tokens, prompt_cache_enabled) |
|
item["prompt_index"] = idx % len(prompts) |
|
requests.append(item) |
|
idx += 1 |
|
measured_s = time.perf_counter() - measured_start |
|
finally: |
|
terminate(proc) |
|
|
|
prompt_tokens = sum(r["prompt_tokens"] for r in requests) |
|
cached_prompt_tokens = sum(r.get("cached_prompt_tokens", 0) for r in requests) |
|
processed_prompt_tokens = sum(r.get("processed_prompt_tokens", r["prompt_tokens"]) for r in requests) |
|
completion_tokens = sum(r["completion_tokens"] for r in requests) |
|
total_tokens = sum(r["total_tokens"] for r in requests) |
|
latencies = [r["latency_s"] for r in requests] |
|
summary = { |
|
"repo": args.repo, |
|
"suite": args.suite, |
|
"mode": args.mode, |
|
"ctx_size": args.ctx_size, |
|
"duration_s": measured_s, |
|
"requests": len(requests), |
|
"prompt_tokens": prompt_tokens, |
|
"cached_prompt_tokens": cached_prompt_tokens, |
|
"processed_prompt_tokens": processed_prompt_tokens, |
|
"completion_tokens": completion_tokens, |
|
"total_tokens": total_tokens, |
|
"prompt_tokens_per_s": prompt_tokens / measured_s if measured_s else 0, |
|
"processed_prompt_tokens_per_s": processed_prompt_tokens / measured_s if measured_s else 0, |
|
"completion_tokens_per_s": completion_tokens / measured_s if measured_s else 0, |
|
"total_tokens_per_s": total_tokens / measured_s if measured_s else 0, |
|
"max_prompt_tokens": max((r["prompt_tokens"] for r in requests), default=0), |
|
"max_cached_prompt_tokens": max((r.get("cached_prompt_tokens", 0) for r in requests), default=0), |
|
"max_processed_prompt_tokens": max((r.get("processed_prompt_tokens", r["prompt_tokens"]) for r in requests), default=0), |
|
"prompt_cache_enabled": prompt_cache_enabled, |
|
"latency_avg_s": sum(latencies) / len(latencies) if latencies else 0, |
|
"latency_min_s": min(latencies) if latencies else 0, |
|
"latency_max_s": max(latencies) if latencies else 0, |
|
"warmup_s": warmup_s, |
|
"warmup_requests": len(warmups), |
|
} |
|
summary.update(extra_summary) |
|
result = { |
|
"server_cmd": cmd, |
|
"warmup": warmups, |
|
"summary": summary, |
|
"requests": requests, |
|
} |
|
out_path.write_text(json.dumps(result, indent=2) + "\n", encoding="utf-8") |
|
|
|
print("PERF_METRICS_START") |
|
print(json.dumps({"scenarios": {f"{args.repo}:{args.mode}:{args.suite}": summary}}, sort_keys=True)) |
|
print("PERF_METRICS_END") |
|
return 0 |
|
|
|
|
|
if __name__ == "__main__": |
|
raise SystemExit(main()) |