Created
August 25, 2025 02:21
-
-
Save ehartford/8fbd687a7133900fc8b22ff5a2971766 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| OpenAI API benchmark script that replicates llama-bench behavior exactly. | |
| Uses random tokens for both prompt and generation, no sampling. | |
| Works with OpenAI-compatible endpoints like vLLM. | |
| """ | |
| import time | |
| import numpy as np | |
| import argparse | |
| import sys | |
| import asyncio | |
| import aiohttp | |
| from openai import OpenAI | |
| import tiktoken | |
| import json | |
| def get_random_tokens(n_tokens, vocab_size=32000): | |
| """Generate random token IDs.""" | |
| return np.random.randint(0, vocab_size, n_tokens).tolist() | |
| def tokens_to_text(token_ids, encoding): | |
| """Convert token IDs to text using tiktoken.""" | |
| try: | |
| # tiktoken can decode arbitrary token IDs | |
| return encoding.decode(token_ids) | |
| except Exception: | |
| # Fallback: just create a string of numbers | |
| return ' '.join(str(t) for t in token_ids) | |
| async def test_prompt_async(client, model, n_prompt, n_batch=2048, encoding=None): | |
| """ | |
| Test prompt processing with random tokens asynchronously. | |
| """ | |
| # Get vocab size (using tiktoken's vocab size or default) | |
| vocab_size = encoding.n_vocab if encoding else 32000 | |
| # Generate random token IDs | |
| token_ids = get_random_tokens(n_prompt, vocab_size) | |
| # Convert to text (vLLM needs text input) | |
| if encoding: | |
| prompt_text = tokens_to_text(token_ids, encoding) | |
| else: | |
| # Fallback: create pseudo-text from token IDs | |
| prompt_text = ' '.join(f"token_{t}" for t in token_ids) | |
| start = time.perf_counter() | |
| # Make completion request with immediate stop | |
| # This forces the model to process the prompt but not generate | |
| response = client.completions.create( | |
| model=model, | |
| prompt=prompt_text, | |
| max_tokens=1, | |
| temperature=0, | |
| logprobs=1, # Request logprobs to ensure full processing | |
| echo=False, # Don't echo the prompt | |
| stop=["\n", " ", ".", ",", "!", "?", ""] # Stop immediately | |
| ) | |
| end = time.perf_counter() | |
| return end - start | |
| def test_prompt(client, model, n_prompt, n_batch=2048, encoding=None): | |
| """ | |
| Test prompt processing with random tokens, exactly like llama-bench. | |
| """ | |
| # For synchronous operation, we'll use the sync client | |
| vocab_size = encoding.n_vocab if encoding else 32000 | |
| # Generate random token IDs | |
| token_ids = get_random_tokens(n_prompt, vocab_size) | |
| # Convert to text | |
| if encoding: | |
| prompt_text = tokens_to_text(token_ids, encoding) | |
| else: | |
| prompt_text = ' '.join(f"token_{t}" for t in token_ids) | |
| start = time.perf_counter() | |
| # Make completion request | |
| response = client.completions.create( | |
| model=model, | |
| prompt=prompt_text, | |
| max_tokens=1, | |
| temperature=0, | |
| logprobs=1, | |
| echo=False | |
| ) | |
| end = time.perf_counter() | |
| return end - start | |
| def test_gen(client, model, n_gen, encoding=None): | |
| """ | |
| Test generation with random tokens, exactly like llama-bench. | |
| Measures pure generation speed. | |
| """ | |
| vocab_size = encoding.n_vocab if encoding else 32000 | |
| # Start with a minimal prompt (single random token) | |
| token_id = np.random.randint(0, vocab_size) | |
| if encoding: | |
| prompt_text = tokens_to_text([token_id], encoding) | |
| else: | |
| prompt_text = f"token_{token_id}" | |
| start = time.perf_counter() | |
| # Generate n_gen tokens | |
| response = client.completions.create( | |
| model=model, | |
| prompt=prompt_text, | |
| max_tokens=n_gen, | |
| temperature=0, # Deterministic generation | |
| top_p=1.0, | |
| stream=False | |
| ) | |
| end = time.perf_counter() | |
| return end - start | |
| def run_benchmark(base_url, model, n_prompt=512, n_gen=128, n_reps=5, warmup=True, api_key="EMPTY"): | |
| """ | |
| Run benchmark exactly like llama-bench. | |
| Args: | |
| base_url: Base URL for the OpenAI-compatible API | |
| model: Model name/ID to use | |
| n_prompt: Number of prompt tokens (default: 512) | |
| n_gen: Number of generation tokens (default: 128) | |
| n_reps: Number of repetitions (default: 5) | |
| warmup: Whether to do warmup runs (default: True) | |
| api_key: API key (default: "EMPTY" for vLLM) | |
| """ | |
| print(f"OpenAI API benchmark (llama-bench compatible mode)") | |
| print(f"=================================================") | |
| print(f"API endpoint: {base_url}") | |
| print(f"Model: {model}") | |
| # Initialize OpenAI client | |
| client = OpenAI( | |
| base_url=base_url, | |
| api_key=api_key | |
| ) | |
| # Try to get encoding for the model | |
| encoding = None | |
| try: | |
| # Try common encodings | |
| for enc_name in ['cl100k_base', 'p50k_base', 'r50k_base']: | |
| try: | |
| encoding = tiktoken.get_encoding(enc_name) | |
| print(f"Using encoding: {enc_name}") | |
| break | |
| except: | |
| pass | |
| except: | |
| print("Warning: Could not load tiktoken encoding, using fallback") | |
| print(f"\nTest configuration:") | |
| print(f" Prompt tokens: {n_prompt} (random)") | |
| print(f" Generation tokens: {n_gen} (random)") | |
| print(f" Repetitions: {n_reps}") | |
| print(f" Warmup: {warmup}") | |
| print() | |
| # Warmup runs (like llama-bench does) | |
| if warmup: | |
| print("Running warmup...") | |
| try: | |
| if n_prompt > 0: | |
| print(" Warmup prompt processing...") | |
| _ = test_prompt(client, model, min(32, n_prompt), encoding=encoding) | |
| if n_gen > 0: | |
| print(" Warmup generation...") | |
| _ = test_gen(client, model, 1, encoding=encoding) | |
| print("Warmup complete.\n") | |
| except Exception as e: | |
| print(f"Warning: Warmup failed: {e}") | |
| print("Continuing with benchmark...\n") | |
| # Benchmark runs | |
| print("Running benchmark...") | |
| prompt_times = [] | |
| gen_times = [] | |
| total_times = [] | |
| for i in range(n_reps): | |
| prompt_time = 0 | |
| gen_time = 0 | |
| # Test prompt processing | |
| if n_prompt > 0: | |
| try: | |
| prompt_time = test_prompt(client, model, n_prompt, encoding=encoding) | |
| except Exception as e: | |
| print(f" Error in prompt test: {e}") | |
| continue | |
| # Test generation | |
| if n_gen > 0: | |
| try: | |
| gen_time = test_gen(client, model, n_gen, encoding=encoding) | |
| except Exception as e: | |
| print(f" Error in generation test: {e}") | |
| continue | |
| total_time = prompt_time + gen_time | |
| prompt_times.append(prompt_time) | |
| gen_times.append(gen_time) | |
| total_times.append(total_time) | |
| # Calculate tokens per second | |
| total_tokens = n_prompt + n_gen | |
| tps = total_tokens / total_time if total_time > 0 else 0 | |
| print(f"Run {i+1}/{n_reps}:") | |
| if n_prompt > 0: | |
| pp_tps = n_prompt / prompt_time if prompt_time > 0 else 0 | |
| print(f" Prompt: {prompt_time:.3f}s ({pp_tps:.2f} tokens/sec)") | |
| if n_gen > 0: | |
| tg_tps = n_gen / gen_time if gen_time > 0 else 0 | |
| print(f" Generation: {gen_time:.3f}s ({tg_tps:.2f} tokens/sec)") | |
| print(f" Total: {total_time:.3f}s ({tps:.2f} tokens/sec)") | |
| if not total_times: | |
| print("Error: No successful benchmark runs completed!") | |
| return None | |
| # Calculate statistics | |
| total_tokens = n_prompt + n_gen | |
| # Overall statistics | |
| avg_total = np.mean(total_times) | |
| std_total = np.std(total_times) | |
| avg_tps = total_tokens / avg_total if avg_total > 0 else 0 | |
| tps_values = [total_tokens / t for t in total_times if t > 0] | |
| std_tps = np.std(tps_values) if tps_values else 0 | |
| print("\n" + "="*50) | |
| print("Benchmark Results (llama-bench compatible):") | |
| print("="*50) | |
| print(f"Model: {model}") | |
| print(f"API: {base_url}") | |
| # Test description (like llama-bench output) | |
| test_desc = [] | |
| if n_prompt > 0: | |
| test_desc.append(f"pp{n_prompt}") | |
| if n_gen > 0: | |
| test_desc.append(f"tg{n_gen}") | |
| print(f"Test: {'+'.join(test_desc)}") | |
| print(f"\nPrompt processing ({n_prompt} tokens):") | |
| if n_prompt > 0 and prompt_times: | |
| avg_pp = np.mean(prompt_times) | |
| std_pp = np.std(prompt_times) | |
| avg_pp_tps = n_prompt / avg_pp if avg_pp > 0 else 0 | |
| print(f" Time: {avg_pp:.3f} ± {std_pp:.3f}s") | |
| print(f" Speed: {avg_pp_tps:.2f} tokens/sec") | |
| print(f"\nGeneration ({n_gen} tokens):") | |
| if n_gen > 0 and gen_times: | |
| avg_tg = np.mean(gen_times) | |
| std_tg = np.std(gen_times) | |
| avg_tg_tps = n_gen / avg_tg if avg_tg > 0 else 0 | |
| print(f" Time: {avg_tg:.3f} ± {std_tg:.3f}s") | |
| print(f" Speed: {avg_tg_tps:.2f} tokens/sec") | |
| print(f"\nOverall ({total_tokens} tokens):") | |
| print(f" Time: {avg_total:.3f} ± {std_total:.3f}s") | |
| print(f" Speed: {avg_tps:.2f} ± {std_tps:.2f} tokens/sec") | |
| return { | |
| 'prompt_times': prompt_times, | |
| 'gen_times': gen_times, | |
| 'total_times': total_times, | |
| 'avg_tps': avg_tps, | |
| 'std_tps': std_tps | |
| } | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description='OpenAI API benchmark that exactly replicates llama-bench behavior' | |
| ) | |
| parser.add_argument('-u', '--base-url', type=str, default="http://localhost:8000/v1", | |
| help='Base URL for OpenAI-compatible API (default: http://localhost:8000/v1)') | |
| parser.add_argument('-m', '--model', type=str, required=True, | |
| help='Model name/ID to use') | |
| parser.add_argument('-k', '--api-key', type=str, default="EMPTY", | |
| help='API key (default: EMPTY for vLLM)') | |
| parser.add_argument('-p', '--n-prompt', type=int, default=512, | |
| help='Number of prompt tokens (default: 512)') | |
| parser.add_argument('-n', '--n-gen', type=int, default=128, | |
| help='Number of generation tokens (default: 128)') | |
| parser.add_argument('-b', '--batch-size', type=int, default=2048, | |
| help='Batch size for prompt processing (default: 2048)') | |
| parser.add_argument('-r', '--reps', type=int, default=5, | |
| help='Number of repetitions (default: 5)') | |
| parser.add_argument('--no-warmup', action='store_true', | |
| help='Skip warmup run') | |
| args = parser.parse_args() | |
| try: | |
| results = run_benchmark( | |
| base_url=args.base_url, | |
| model=args.model, | |
| n_prompt=args.n_prompt, | |
| n_gen=args.n_gen, | |
| n_reps=args.reps, | |
| warmup=not args.no_warmup, | |
| api_key=args.api_key | |
| ) | |
| if results is None: | |
| return 1 | |
| except KeyboardInterrupt: | |
| print("\nBenchmark interrupted by user") | |
| return 1 | |
| except Exception as e: | |
| print(f"Error: {e}", file=sys.stderr) | |
| import traceback | |
| traceback.print_exc() | |
| return 1 | |
| return 0 | |
| if __name__ == "__main__": | |
| sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment