Skip to content

Instantly share code, notes, and snippets.

@ehartford
Created August 25, 2025 02:21
Show Gist options
  • Select an option

  • Save ehartford/8fbd687a7133900fc8b22ff5a2971766 to your computer and use it in GitHub Desktop.

Select an option

Save ehartford/8fbd687a7133900fc8b22ff5a2971766 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
OpenAI API benchmark script that replicates llama-bench behavior exactly.
Uses random tokens for both prompt and generation, no sampling.
Works with OpenAI-compatible endpoints like vLLM.
"""
import time
import numpy as np
import argparse
import sys
import asyncio
import aiohttp
from openai import OpenAI
import tiktoken
import json
def get_random_tokens(n_tokens, vocab_size=32000):
"""Generate random token IDs."""
return np.random.randint(0, vocab_size, n_tokens).tolist()
def tokens_to_text(token_ids, encoding):
"""Convert token IDs to text using tiktoken."""
try:
# tiktoken can decode arbitrary token IDs
return encoding.decode(token_ids)
except Exception:
# Fallback: just create a string of numbers
return ' '.join(str(t) for t in token_ids)
async def test_prompt_async(client, model, n_prompt, n_batch=2048, encoding=None):
"""
Test prompt processing with random tokens asynchronously.
"""
# Get vocab size (using tiktoken's vocab size or default)
vocab_size = encoding.n_vocab if encoding else 32000
# Generate random token IDs
token_ids = get_random_tokens(n_prompt, vocab_size)
# Convert to text (vLLM needs text input)
if encoding:
prompt_text = tokens_to_text(token_ids, encoding)
else:
# Fallback: create pseudo-text from token IDs
prompt_text = ' '.join(f"token_{t}" for t in token_ids)
start = time.perf_counter()
# Make completion request with immediate stop
# This forces the model to process the prompt but not generate
response = client.completions.create(
model=model,
prompt=prompt_text,
max_tokens=1,
temperature=0,
logprobs=1, # Request logprobs to ensure full processing
echo=False, # Don't echo the prompt
stop=["\n", " ", ".", ",", "!", "?", ""] # Stop immediately
)
end = time.perf_counter()
return end - start
def test_prompt(client, model, n_prompt, n_batch=2048, encoding=None):
"""
Test prompt processing with random tokens, exactly like llama-bench.
"""
# For synchronous operation, we'll use the sync client
vocab_size = encoding.n_vocab if encoding else 32000
# Generate random token IDs
token_ids = get_random_tokens(n_prompt, vocab_size)
# Convert to text
if encoding:
prompt_text = tokens_to_text(token_ids, encoding)
else:
prompt_text = ' '.join(f"token_{t}" for t in token_ids)
start = time.perf_counter()
# Make completion request
response = client.completions.create(
model=model,
prompt=prompt_text,
max_tokens=1,
temperature=0,
logprobs=1,
echo=False
)
end = time.perf_counter()
return end - start
def test_gen(client, model, n_gen, encoding=None):
"""
Test generation with random tokens, exactly like llama-bench.
Measures pure generation speed.
"""
vocab_size = encoding.n_vocab if encoding else 32000
# Start with a minimal prompt (single random token)
token_id = np.random.randint(0, vocab_size)
if encoding:
prompt_text = tokens_to_text([token_id], encoding)
else:
prompt_text = f"token_{token_id}"
start = time.perf_counter()
# Generate n_gen tokens
response = client.completions.create(
model=model,
prompt=prompt_text,
max_tokens=n_gen,
temperature=0, # Deterministic generation
top_p=1.0,
stream=False
)
end = time.perf_counter()
return end - start
def run_benchmark(base_url, model, n_prompt=512, n_gen=128, n_reps=5, warmup=True, api_key="EMPTY"):
"""
Run benchmark exactly like llama-bench.
Args:
base_url: Base URL for the OpenAI-compatible API
model: Model name/ID to use
n_prompt: Number of prompt tokens (default: 512)
n_gen: Number of generation tokens (default: 128)
n_reps: Number of repetitions (default: 5)
warmup: Whether to do warmup runs (default: True)
api_key: API key (default: "EMPTY" for vLLM)
"""
print(f"OpenAI API benchmark (llama-bench compatible mode)")
print(f"=================================================")
print(f"API endpoint: {base_url}")
print(f"Model: {model}")
# Initialize OpenAI client
client = OpenAI(
base_url=base_url,
api_key=api_key
)
# Try to get encoding for the model
encoding = None
try:
# Try common encodings
for enc_name in ['cl100k_base', 'p50k_base', 'r50k_base']:
try:
encoding = tiktoken.get_encoding(enc_name)
print(f"Using encoding: {enc_name}")
break
except:
pass
except:
print("Warning: Could not load tiktoken encoding, using fallback")
print(f"\nTest configuration:")
print(f" Prompt tokens: {n_prompt} (random)")
print(f" Generation tokens: {n_gen} (random)")
print(f" Repetitions: {n_reps}")
print(f" Warmup: {warmup}")
print()
# Warmup runs (like llama-bench does)
if warmup:
print("Running warmup...")
try:
if n_prompt > 0:
print(" Warmup prompt processing...")
_ = test_prompt(client, model, min(32, n_prompt), encoding=encoding)
if n_gen > 0:
print(" Warmup generation...")
_ = test_gen(client, model, 1, encoding=encoding)
print("Warmup complete.\n")
except Exception as e:
print(f"Warning: Warmup failed: {e}")
print("Continuing with benchmark...\n")
# Benchmark runs
print("Running benchmark...")
prompt_times = []
gen_times = []
total_times = []
for i in range(n_reps):
prompt_time = 0
gen_time = 0
# Test prompt processing
if n_prompt > 0:
try:
prompt_time = test_prompt(client, model, n_prompt, encoding=encoding)
except Exception as e:
print(f" Error in prompt test: {e}")
continue
# Test generation
if n_gen > 0:
try:
gen_time = test_gen(client, model, n_gen, encoding=encoding)
except Exception as e:
print(f" Error in generation test: {e}")
continue
total_time = prompt_time + gen_time
prompt_times.append(prompt_time)
gen_times.append(gen_time)
total_times.append(total_time)
# Calculate tokens per second
total_tokens = n_prompt + n_gen
tps = total_tokens / total_time if total_time > 0 else 0
print(f"Run {i+1}/{n_reps}:")
if n_prompt > 0:
pp_tps = n_prompt / prompt_time if prompt_time > 0 else 0
print(f" Prompt: {prompt_time:.3f}s ({pp_tps:.2f} tokens/sec)")
if n_gen > 0:
tg_tps = n_gen / gen_time if gen_time > 0 else 0
print(f" Generation: {gen_time:.3f}s ({tg_tps:.2f} tokens/sec)")
print(f" Total: {total_time:.3f}s ({tps:.2f} tokens/sec)")
if not total_times:
print("Error: No successful benchmark runs completed!")
return None
# Calculate statistics
total_tokens = n_prompt + n_gen
# Overall statistics
avg_total = np.mean(total_times)
std_total = np.std(total_times)
avg_tps = total_tokens / avg_total if avg_total > 0 else 0
tps_values = [total_tokens / t for t in total_times if t > 0]
std_tps = np.std(tps_values) if tps_values else 0
print("\n" + "="*50)
print("Benchmark Results (llama-bench compatible):")
print("="*50)
print(f"Model: {model}")
print(f"API: {base_url}")
# Test description (like llama-bench output)
test_desc = []
if n_prompt > 0:
test_desc.append(f"pp{n_prompt}")
if n_gen > 0:
test_desc.append(f"tg{n_gen}")
print(f"Test: {'+'.join(test_desc)}")
print(f"\nPrompt processing ({n_prompt} tokens):")
if n_prompt > 0 and prompt_times:
avg_pp = np.mean(prompt_times)
std_pp = np.std(prompt_times)
avg_pp_tps = n_prompt / avg_pp if avg_pp > 0 else 0
print(f" Time: {avg_pp:.3f} ± {std_pp:.3f}s")
print(f" Speed: {avg_pp_tps:.2f} tokens/sec")
print(f"\nGeneration ({n_gen} tokens):")
if n_gen > 0 and gen_times:
avg_tg = np.mean(gen_times)
std_tg = np.std(gen_times)
avg_tg_tps = n_gen / avg_tg if avg_tg > 0 else 0
print(f" Time: {avg_tg:.3f} ± {std_tg:.3f}s")
print(f" Speed: {avg_tg_tps:.2f} tokens/sec")
print(f"\nOverall ({total_tokens} tokens):")
print(f" Time: {avg_total:.3f} ± {std_total:.3f}s")
print(f" Speed: {avg_tps:.2f} ± {std_tps:.2f} tokens/sec")
return {
'prompt_times': prompt_times,
'gen_times': gen_times,
'total_times': total_times,
'avg_tps': avg_tps,
'std_tps': std_tps
}
def main():
parser = argparse.ArgumentParser(
description='OpenAI API benchmark that exactly replicates llama-bench behavior'
)
parser.add_argument('-u', '--base-url', type=str, default="http://localhost:8000/v1",
help='Base URL for OpenAI-compatible API (default: http://localhost:8000/v1)')
parser.add_argument('-m', '--model', type=str, required=True,
help='Model name/ID to use')
parser.add_argument('-k', '--api-key', type=str, default="EMPTY",
help='API key (default: EMPTY for vLLM)')
parser.add_argument('-p', '--n-prompt', type=int, default=512,
help='Number of prompt tokens (default: 512)')
parser.add_argument('-n', '--n-gen', type=int, default=128,
help='Number of generation tokens (default: 128)')
parser.add_argument('-b', '--batch-size', type=int, default=2048,
help='Batch size for prompt processing (default: 2048)')
parser.add_argument('-r', '--reps', type=int, default=5,
help='Number of repetitions (default: 5)')
parser.add_argument('--no-warmup', action='store_true',
help='Skip warmup run')
args = parser.parse_args()
try:
results = run_benchmark(
base_url=args.base_url,
model=args.model,
n_prompt=args.n_prompt,
n_gen=args.n_gen,
n_reps=args.reps,
warmup=not args.no_warmup,
api_key=args.api_key
)
if results is None:
return 1
except KeyboardInterrupt:
print("\nBenchmark interrupted by user")
return 1
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
import traceback
traceback.print_exc()
return 1
return 0
if __name__ == "__main__":
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment