Skip to content

Instantly share code, notes, and snippets.

@oranie
Created October 27, 2025 04:16
Show Gist options
  • Select an option

  • Save oranie/af342814e43496f4a03e88134425ee4e to your computer and use it in GitHub Desktop.

Select an option

Save oranie/af342814e43496f4a03e88134425ee4e to your computer and use it in GitHub Desktop.
~$ cat ./valkey_benchmark.py l
import valkey
import threading
import time
import os
import random
import string
# --- Benchmark Parameters ---
HOST = 'localhost'
PORT = 6379
PIPELINE_DEPTH = 16
TOTAL_CONCURRENCY = 128
NUM_REQUESTS = 50000 # Total requests for SETs and GETs respectively
DATA_SIZE = 1024
# The number of keys in the keyspace to operate on.
# This should be large enough to make GET operations meaningful.
KEY_SPACE_SIZE = 100000
# --- Global Counters ---
set_counter = 0
get_counter = 0
set_errors = 0
get_errors = 0
lock = threading.Lock()
# --- Data Generation ---
# Generate the value once to save CPU cycles during the benchmark
VALUE = (''.join(random.choices(string.ascii_letters + string.digits, k=DATA_SIZE))).encode('utf-8')
def pre_populate(client):
"""Pre-populates the database with data for GET workers to use."""
print(f"Flushing DB and pre-populating {KEY_SPACE_SIZE} keys...")
client.flushdb()
pipe = client.pipeline(transaction=False)
for i in range(KEY_SPACE_SIZE):
pipe.set(f"key:{i}", VALUE)
if (i + 1) % 1000 == 0:
pipe.execute()
# Start a new pipeline
pipe = client.pipeline(transaction=False)
# Execute any remaining commands in the pipeline
pipe.execute()
print("Pre-population complete.")
# --- Worker Functions ---
def set_worker(client):
"""Worker thread for performing SET operations."""
global set_counter, set_errors
while True:
with lock:
if set_counter >= NUM_REQUESTS:
break
# Atomically reserve a chunk of work
requests_to_run = min(PIPELINE_DEPTH, NUM_REQUESTS - set_counter)
set_counter += requests_to_run
pipe = client.pipeline(transaction=False)
for _ in range(requests_to_run):
# Write to a random key within the key space
key = f"key:{random.randint(0, KEY_SPACE_SIZE - 1)}"
pipe.set(key, VALUE)
try:
pipe.execute()
except Exception:
with lock:
set_errors += requests_to_run
def get_worker(client):
"""Worker thread for performing GET operations."""
global get_counter, get_errors
while True:
with lock:
if get_counter >= NUM_REQUESTS:
break
# Atomically reserve a chunk of work
requests_to_run = min(PIPELINE_DEPTH, NUM_REQUESTS - get_counter)
get_counter += requests_to_run
pipe = client.pipeline(transaction=False)
for _ in range(requests_to_run):
# Read from a random key within the key space
key = f"key:{random.randint(0, KEY_SPACE_SIZE - 1)}"
pipe.get(key)
try:
pipe.execute()
except Exception:
with lock:
get_errors += requests_to_run
# --- Main Execution ---
if __name__ == "__main__":
print("--- Valkey Benchmark ---")
print(f"Parameters: Concurrency={TOTAL_CONCURRENCY}, Pipeline={PIPELINE_DEPTH}, Data Size={DATA_SIZE}B")
print(f"Target: {NUM_REQUESTS} SETs and {NUM_REQUESTS} GETs")
print("-" * 30)
# Establish a client for pre-population
try:
initial_client = valkey.Valkey(host=HOST, port=PORT, db=0)
initial_client.ping()
except valkey.exceptions.ConnectionError as e:
print(f"Error: Could not connect to Valkey at {HOST}:{PORT}. Please ensure it is running.")
print(e)
exit(1)
# Pre-populate data
pre_populate(initial_client)
initial_client.close()
print("-" * 30)
print("Starting benchmark with SET and GET workers...")
threads = []
start_time = time.time()
set_concurrency = TOTAL_CONCURRENCY // 2
get_concurrency = TOTAL_CONCURRENCY - set_concurrency
# Create and start SET worker threads
for _ in range(set_concurrency):
# Each thread needs its own client instance
client = valkey.Valkey(host=HOST, port=PORT, db=0, decode_responses=False)
thread = threading.Thread(target=set_worker, args=(client,))
threads.append(thread)
thread.start()
# Create and start GET worker threads
for _ in range(get_concurrency):
# Each thread needs its own client instance
client = valkey.Valkey(host=HOST, port=PORT, db=0, decode_responses=False)
thread = threading.Thread(target=get_worker, args=(client,))
threads.append(thread)
thread.start()
# Wait for all threads to complete
for thread in threads:
thread.join()
end_time = time.time()
total_time = end_time - start_time
# --- Print Results ---
total_requests_done = set_counter + get_counter
requests_per_second = total_requests_done / total_time if total_time > 0 else 0
print("-" * 30)
print("--- Benchmark Results ---")
print(f"Concurrency: {TOTAL_CONCURRENCY} ({set_concurrency} SET / {get_concurrency} GET)")
print(f"Pipeline Depth: {PIPELINE_DEPTH}")
print(f"Data Size: {DATA_SIZE} bytes")
print("-" * 30)
print(f"Total Time: {total_time:.2f} seconds")
print(f"Total Requests: {total_requests_done} (SET: {set_counter}, GET: {get_counter})")
print(f"Errors: {set_errors + get_errors} (SET: {set_errors}, GET: {get_errors})")
print(f"Throughput (RPS): {requests_per_second:.2f} req/s")
print("-" * 30)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment