Created
October 27, 2025 04:16
-
-
Save oranie/af342814e43496f4a03e88134425ee4e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| ~$ cat ./valkey_benchmark.py l | |
| import valkey | |
| import threading | |
| import time | |
| import os | |
| import random | |
| import string | |
| # --- Benchmark Parameters --- | |
| HOST = 'localhost' | |
| PORT = 6379 | |
| PIPELINE_DEPTH = 16 | |
| TOTAL_CONCURRENCY = 128 | |
| NUM_REQUESTS = 50000 # Total requests for SETs and GETs respectively | |
| DATA_SIZE = 1024 | |
| # The number of keys in the keyspace to operate on. | |
| # This should be large enough to make GET operations meaningful. | |
| KEY_SPACE_SIZE = 100000 | |
| # --- Global Counters --- | |
| set_counter = 0 | |
| get_counter = 0 | |
| set_errors = 0 | |
| get_errors = 0 | |
| lock = threading.Lock() | |
| # --- Data Generation --- | |
| # Generate the value once to save CPU cycles during the benchmark | |
| VALUE = (''.join(random.choices(string.ascii_letters + string.digits, k=DATA_SIZE))).encode('utf-8') | |
| def pre_populate(client): | |
| """Pre-populates the database with data for GET workers to use.""" | |
| print(f"Flushing DB and pre-populating {KEY_SPACE_SIZE} keys...") | |
| client.flushdb() | |
| pipe = client.pipeline(transaction=False) | |
| for i in range(KEY_SPACE_SIZE): | |
| pipe.set(f"key:{i}", VALUE) | |
| if (i + 1) % 1000 == 0: | |
| pipe.execute() | |
| # Start a new pipeline | |
| pipe = client.pipeline(transaction=False) | |
| # Execute any remaining commands in the pipeline | |
| pipe.execute() | |
| print("Pre-population complete.") | |
| # --- Worker Functions --- | |
| def set_worker(client): | |
| """Worker thread for performing SET operations.""" | |
| global set_counter, set_errors | |
| while True: | |
| with lock: | |
| if set_counter >= NUM_REQUESTS: | |
| break | |
| # Atomically reserve a chunk of work | |
| requests_to_run = min(PIPELINE_DEPTH, NUM_REQUESTS - set_counter) | |
| set_counter += requests_to_run | |
| pipe = client.pipeline(transaction=False) | |
| for _ in range(requests_to_run): | |
| # Write to a random key within the key space | |
| key = f"key:{random.randint(0, KEY_SPACE_SIZE - 1)}" | |
| pipe.set(key, VALUE) | |
| try: | |
| pipe.execute() | |
| except Exception: | |
| with lock: | |
| set_errors += requests_to_run | |
| def get_worker(client): | |
| """Worker thread for performing GET operations.""" | |
| global get_counter, get_errors | |
| while True: | |
| with lock: | |
| if get_counter >= NUM_REQUESTS: | |
| break | |
| # Atomically reserve a chunk of work | |
| requests_to_run = min(PIPELINE_DEPTH, NUM_REQUESTS - get_counter) | |
| get_counter += requests_to_run | |
| pipe = client.pipeline(transaction=False) | |
| for _ in range(requests_to_run): | |
| # Read from a random key within the key space | |
| key = f"key:{random.randint(0, KEY_SPACE_SIZE - 1)}" | |
| pipe.get(key) | |
| try: | |
| pipe.execute() | |
| except Exception: | |
| with lock: | |
| get_errors += requests_to_run | |
| # --- Main Execution --- | |
| if __name__ == "__main__": | |
| print("--- Valkey Benchmark ---") | |
| print(f"Parameters: Concurrency={TOTAL_CONCURRENCY}, Pipeline={PIPELINE_DEPTH}, Data Size={DATA_SIZE}B") | |
| print(f"Target: {NUM_REQUESTS} SETs and {NUM_REQUESTS} GETs") | |
| print("-" * 30) | |
| # Establish a client for pre-population | |
| try: | |
| initial_client = valkey.Valkey(host=HOST, port=PORT, db=0) | |
| initial_client.ping() | |
| except valkey.exceptions.ConnectionError as e: | |
| print(f"Error: Could not connect to Valkey at {HOST}:{PORT}. Please ensure it is running.") | |
| print(e) | |
| exit(1) | |
| # Pre-populate data | |
| pre_populate(initial_client) | |
| initial_client.close() | |
| print("-" * 30) | |
| print("Starting benchmark with SET and GET workers...") | |
| threads = [] | |
| start_time = time.time() | |
| set_concurrency = TOTAL_CONCURRENCY // 2 | |
| get_concurrency = TOTAL_CONCURRENCY - set_concurrency | |
| # Create and start SET worker threads | |
| for _ in range(set_concurrency): | |
| # Each thread needs its own client instance | |
| client = valkey.Valkey(host=HOST, port=PORT, db=0, decode_responses=False) | |
| thread = threading.Thread(target=set_worker, args=(client,)) | |
| threads.append(thread) | |
| thread.start() | |
| # Create and start GET worker threads | |
| for _ in range(get_concurrency): | |
| # Each thread needs its own client instance | |
| client = valkey.Valkey(host=HOST, port=PORT, db=0, decode_responses=False) | |
| thread = threading.Thread(target=get_worker, args=(client,)) | |
| threads.append(thread) | |
| thread.start() | |
| # Wait for all threads to complete | |
| for thread in threads: | |
| thread.join() | |
| end_time = time.time() | |
| total_time = end_time - start_time | |
| # --- Print Results --- | |
| total_requests_done = set_counter + get_counter | |
| requests_per_second = total_requests_done / total_time if total_time > 0 else 0 | |
| print("-" * 30) | |
| print("--- Benchmark Results ---") | |
| print(f"Concurrency: {TOTAL_CONCURRENCY} ({set_concurrency} SET / {get_concurrency} GET)") | |
| print(f"Pipeline Depth: {PIPELINE_DEPTH}") | |
| print(f"Data Size: {DATA_SIZE} bytes") | |
| print("-" * 30) | |
| print(f"Total Time: {total_time:.2f} seconds") | |
| print(f"Total Requests: {total_requests_done} (SET: {set_counter}, GET: {get_counter})") | |
| print(f"Errors: {set_errors + get_errors} (SET: {set_errors}, GET: {get_errors})") | |
| print(f"Throughput (RPS): {requests_per_second:.2f} req/s") | |
| print("-" * 30) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment