Skip to content

Instantly share code, notes, and snippets.

@btakita
Created May 29, 2025 23:53
Show Gist options
  • Save btakita/b3f00e3621dc12781f0a67714d357898 to your computer and use it in GitHub Desktop.
Save btakita/b3f00e3621dc12781f0a67714d357898 to your computer and use it in GitHub Desktop.
"""
2025-05-27 run on Dell XPS15 2023
ZMQ version: 4.3.5
pyzmq version: 26.4.0
Available IPC support: True
Benchmarking with 1,000 messages per iteration
Warmup: 1,000 messages before each test
Size (bytes) | Protocol | Msgs/sec | MB/sec
-----------------------------------------------------------
1,024 | Socket | 593,977.04 | 580.06
1,024 | ZMQ_IPC | 1,161,001.29 | 1,133.79
1,024 | ZMQ_TCP | 1,002,655.81 | 979.16
1,024 | ZMQ_INPROC | 416,405.82 | 406.65
1,024 | ZMQ_IPC_SHM | 1,170,570.56 | 1,143.14
1,024 | ZMQ_TCP_SHM | 1,210,964.14 | 1,182.58
1,024 | ZMQ_INPROC_SHM | 1,056,797.28 | 1,032.03
1,024 | STD_QUEUE | 782,225.72 | 763.89
1,024 | ASYNC_QUEUE | 2,837,591.37 | 2,771.09
1,024 | MP_QUEUE | 1,254,349.13 | 1,224.95
1,024 | MP_QUEUE_SHM | 1,200,719.08 | 1,172.58
-----------------------------------------------------------
10,240 | Socket | 221,918.30 | 2,167.17
10,240 | ZMQ_IPC | 218,463.23 | 2,133.43
10,240 | ZMQ_TCP | 161,026.23 | 1,572.52
10,240 | ZMQ_INPROC | 200,790.34 | 1,960.84
10,240 | ZMQ_IPC_SHM | 956,264.70 | 9,338.52
10,240 | ZMQ_TCP_SHM | 1,160,042.86 | 11,328.54
10,240 | ZMQ_INPROC_SHM | 951,193.90 | 9,289.00
10,240 | STD_QUEUE | 1,092,957.56 | 10,673.41
10,240 | ASYNC_QUEUE | 3,619,043.57 | 35,342.22
10,240 | MP_QUEUE | 243,694.75 | 2,379.83
10,240 | MP_QUEUE_SHM | 1,184,226.31 | 11,564.71
-----------------------------------------------------------
102,400 | Socket | 57,102.27 | 5,576.39
102,400 | ZMQ_IPC | 122,622.60 | 11,974.86
102,400 | ZMQ_TCP | 125,643.48 | 12,269.87
102,400 | ZMQ_INPROC | 140,751.56 | 13,745.27
102,400 | ZMQ_IPC_SHM | 1,355,311.39 | 132,354.63
102,400 | ZMQ_TCP_SHM | 947,396.02 | 92,519.14
102,400 | ZMQ_INPROC_SHM | 1,599,929.13 | 156,243.08
102,400 | STD_QUEUE | 1,109,620.64 | 108,361.39
102,400 | ASYNC_QUEUE | 4,047,317.49 | 395,245.85
102,400 | MP_QUEUE | 27,660.12 | 2,701.18
102,400 | MP_QUEUE_SHM | 1,191,087.80 | 116,317.17
-----------------------------------------------------------
1,024,000 | Socket | 2,085.87 | 2,036.98
1,024,000 | ZMQ_IPC | 125,143.93 | 122,210.87
1,024,000 | ZMQ_TCP | 120,410.12 | 117,588.01
1,024,000 | ZMQ_INPROC | 147,484.47 | 144,027.80
1,024,000 | ZMQ_IPC_SHM | 1,388,164.61 | 1,355,629.50
1,024,000 | ZMQ_TCP_SHM | 1,306,191.80 | 1,275,577.93
1,024,000 | ZMQ_INPROC_SHM | 1,025,621.10 | 1,001,583.10
1,024,000 | STD_QUEUE | 1,104,203.62 | 1,078,323.85
1,024,000 | ASYNC_QUEUE | 3,803,019.84 | 3,713,886.56
1,024,000 | MP_QUEUE | 2,275.53 | 2,222.20
1,024,000 | MP_QUEUE_SHM | 1,173,621.55 | 1,146,114.79
-----------------------------------------------------------
"""
import asyncio
import multiprocessing as mp
import os
import signal
import socket
import struct
import tempfile
import threading
import time
from dataclasses import dataclass
from enum import Enum
from multiprocessing import shared_memory
from queue import Empty, Queue
from typing import List, Tuple
import zmq
class Protocol(Enum):
SOCKET = "Socket"
ZMQ_IPC = "ZMQ_IPC"
ZMQ_TCP = "ZMQ_TCP"
ZMQ_INPROC = "ZMQ_INPROC"
ZMQ_IPC_SHM = "ZMQ_IPC_SHM"
ZMQ_TCP_SHM = "ZMQ_TCP_SHM"
ZMQ_INPROC_SHM = "ZMQ_INPROC_SHM"
STD_QUEUE = "STD_QUEUE"
ASYNC_QUEUE = "ASYNC_QUEUE"
MP_QUEUE = "MP_QUEUE"
MP_QUEUE_SHM = "MP_QUEUE_SHM"
@dataclass
class BenchmarkConfig:
message_size: int
num_messages: int
iterations: int = 3
warmup_messages: int = 1000
tcp_port: int = 5555
raw_socket_port: int = 5556
def socket_worker(port: int, ready_flag: mp.Event):
"""Raw socket worker process"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024 * 1024) # 1MB buffer
try:
sock.connect(("127.0.0.1", port))
ready_flag.set()
while True:
# Receive message length first
length_data = sock.recv(4)
if not length_data:
break
length = struct.unpack("!I", length_data)[0]
# Receive the actual message
message = b""
while len(message) < length:
chunk = sock.recv(length - len(message))
if not chunk:
return
message += chunk
if message == b"STOP":
break
finally:
sock.close()
def benchmark_socket(config: BenchmarkConfig) -> Tuple[float, float]:
"""Run raw socket benchmark"""
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024 * 1024)
server_socket.bind(("127.0.0.1", 0))
server_socket.listen(1)
port = server_socket.getsockname()[1]
ready_flag = mp.Event()
worker = mp.Process(target=socket_worker, args=(port, ready_flag))
worker.start()
if not ready_flag.wait(timeout=5.0):
raise RuntimeError("Worker failed to start")
client_socket, _ = server_socket.accept()
try:
# Pre-generate data and message
data = bytes([x % 256 for x in range(config.message_size)])
length_header = struct.pack("!I", len(data))
message = length_header + data # Combine header and data
# Warmup phase
for _ in range(config.warmup_messages):
client_socket.sendall(message)
time.sleep(0.1)
# Benchmark phase
start = time.perf_counter()
for _ in range(config.num_messages):
client_socket.sendall(message)
elapsed = time.perf_counter() - start
# Stop worker
stop_msg = struct.pack("!I", 4) + b"STOP"
client_socket.sendall(stop_msg)
worker.join(timeout=5)
if worker.is_alive():
worker.terminate()
return elapsed, config.num_messages / elapsed
finally:
client_socket.close()
server_socket.close()
def benchmark_zmq(protocol: Protocol, config: BenchmarkConfig) -> Tuple[float, float]:
"""Run ZMQ benchmark for TCP, IPC, or INPROC"""
ready_flag = mp.Event()
# Setup connection based on protocol
if protocol == Protocol.ZMQ_TCP:
address = f"tcp://127.0.0.1:{config.tcp_port}"
elif protocol == Protocol.ZMQ_IPC:
temp_dir = tempfile.mkdtemp()
ipc_path = os.path.join(temp_dir, "benchmark.ipc")
address = f"ipc://{ipc_path}"
else: # Protocol.ZMQ_INPROC
address = "inproc://benchmark"
def cleanup() -> None:
socket.close()
context.term()
if protocol == Protocol.ZMQ_IPC:
try:
os.unlink(ipc_path)
os.rmdir(temp_dir)
except OSError:
pass
try:
if protocol == Protocol.ZMQ_INPROC:
context = zmq.Context(io_threads=1) # Single IO thread for small messages
else:
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.setsockopt(zmq.SNDHWM, 0)
socket.setsockopt(zmq.LINGER, 0)
# Optimize for message size
if config.message_size >= 10240: # 10KB threshold
socket.setsockopt(zmq.SNDBUF, 8388608) # 8MB buffer
else:
# For small messages, optimize for latency
socket.setsockopt(zmq.IMMEDIATE, 1)
if protocol == Protocol.ZMQ_INPROC:
socket.setsockopt(zmq.SNDHWM, 1000) # Smaller HWM for better latency
socket.setsockopt(zmq.RCVHWM, 1000)
socket.setsockopt(zmq.LINGER, 0)
if config.message_size <= 1024:
# Additional optimizations for small messages
socket.setsockopt(zmq.IMMEDIATE, 1) # Immediate delivery
socket.setsockopt(zmq.CONFLATE, 0) # No message conflation
socket.bind(address)
if protocol == Protocol.ZMQ_INPROC:
# For inproc, we need to share the context
worker = threading.Thread(
target=zmq_worker, args=(protocol, address, ready_flag, context)
)
else:
worker = mp.Process(target=zmq_worker, args=(protocol, address, ready_flag))
worker.start()
if not ready_flag.wait(timeout=5.0):
raise RuntimeError(f"Worker failed to start for {protocol.value}")
# Generate data once before the loop - ZMQ's zero-copy optimization will handle it
data = bytes([x % 256 for x in range(config.message_size)])
# Warmup phase
for _ in range(config.warmup_messages):
socket.send(data)
time.sleep(0.1)
# Benchmark phase
start = time.perf_counter()
for _ in range(config.num_messages):
if config.message_size >= 8192: # 8KB threshold
# Use zero-copy for large messages
socket.send(data, copy=False)
else:
# Direct copy for small messages
socket.send(data, copy=True)
elapsed = time.perf_counter() - start
# Stop worker
socket.send(b"STOP")
worker.join(timeout=5)
if isinstance(worker, mp.Process) and worker.is_alive():
worker.terminate()
worker.join()
return elapsed, config.num_messages / elapsed
finally:
cleanup()
def zmq_worker(
protocol: Protocol, address: str, ready_flag: mp.Event, shared_context=None
):
"""ZMQ worker process/thread for TCP, IPC, and INPROC"""
try:
context = shared_context if shared_context else zmq.Context()
socket = context.socket(zmq.PULL)
socket.setsockopt(zmq.RCVHWM, 0)
socket.setsockopt(zmq.LINGER, 0)
socket.connect(address)
ready_flag.set()
while True:
try:
message = socket.recv()
if message == b"STOP":
break
except zmq.ZMQError:
break
finally:
socket.close()
if not shared_context: # Only terminate context if we created it
context.term()
def benchmark_zmq_shm(
protocol: Protocol, config: BenchmarkConfig
) -> Tuple[float, float]:
"""Run ZMQ benchmark with shared memory optimization for large messages"""
SHARED_MEM_THRESHOLD = 100_000 # Use shared memory for messages larger than 100KB
ready_flag = mp.Event()
shm = None
# Setup connection based on protocol
if protocol == Protocol.ZMQ_TCP_SHM:
address = f"tcp://127.0.0.1:{config.tcp_port}"
elif protocol == Protocol.ZMQ_IPC_SHM:
temp_dir = tempfile.mkdtemp()
ipc_path = os.path.join(temp_dir, "benchmark.ipc")
address = f"ipc://{ipc_path}"
else: # Protocol.ZMQ_INPROC_SHM
address = "inproc://benchmark"
def cleanup() -> None:
socket.close()
context.term()
if protocol == Protocol.ZMQ_IPC_SHM:
try:
os.unlink(ipc_path)
os.rmdir(temp_dir)
except OSError:
pass
# Let the worker close its shared memory first
time.sleep(0.1)
if shm is not None:
try:
shm.close()
shm.unlink()
except Exception:
pass
try:
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.setsockopt(zmq.SNDHWM, 0)
socket.setsockopt(zmq.LINGER, 0)
if config.message_size >= 10240: # 10KB+
socket.setsockopt(zmq.SNDBUF, 8388608) # 8MB buffer
else:
socket.setsockopt(zmq.IMMEDIATE, 1)
socket.bind(address)
# Create shared memory for large messages
shm_name = f"zmq_bench_{os.getpid()}"
shm = shared_memory.SharedMemory(
name=shm_name, create=True, size=config.message_size
)
# Pre-generate data directly in shared memory
data = bytes([x % 256 for x in range(config.message_size)])
shm.buf[: config.message_size] = data
if protocol == Protocol.ZMQ_INPROC_SHM:
worker = threading.Thread(
target=zmq_shm_worker,
args=(protocol, address, ready_flag, context, shm_name, config),
)
else:
worker = mp.Process(
target=zmq_shm_worker,
args=(protocol, address, ready_flag, None, shm_name, config),
)
worker.start()
if not ready_flag.wait(timeout=5.0):
raise RuntimeError(f"Worker failed to start for {protocol.value}")
# Warmup phase
for _ in range(config.warmup_messages):
socket.send(b"NEXT")
time.sleep(0.1)
# Benchmark phase
start = time.perf_counter()
for _ in range(config.num_messages):
socket.send(b"NEXT")
elapsed = time.perf_counter() - start
# Stop worker
socket.send(b"STOP")
worker.join(timeout=5)
if isinstance(worker, mp.Process) and worker.is_alive():
worker.terminate()
worker.join()
return elapsed, config.num_messages / elapsed
finally:
cleanup()
def zmq_shm_worker(
protocol: Protocol,
address: str,
ready_flag: mp.Event,
shared_context=None,
shm_name: str = None,
config: BenchmarkConfig = None,
):
"""ZMQ worker process/thread with shared memory support"""
shm = None
try:
context = shared_context if shared_context else zmq.Context()
socket = context.socket(zmq.PULL)
socket.setsockopt(zmq.RCVHWM, 0)
socket.setsockopt(zmq.LINGER, 0)
if config.message_size >= 10240: # 10KB+
socket.setsockopt(zmq.RCVBUF, 8388608) # 8MB buffer
socket.connect(address)
# Connect to shared memory
shm = shared_memory.SharedMemory(name=shm_name)
data = bytes(shm.buf[: config.message_size]) # Make a copy of the data
ready_flag.set()
while True:
try:
message = socket.recv()
if message == b"STOP":
break
except zmq.ZMQError:
break
finally:
socket.close()
if not shared_context:
context.term()
if shm is not None:
try:
shm.close()
except Exception:
pass
def std_queue_worker(q: Queue, ready_flag: threading.Event):
"""Worker thread for standard Queue"""
ready_flag.set()
try:
while True:
try:
message = q.get_nowait()
if message == b"STOP":
break
q.task_done() # Call task_done() only after successful get
except Empty: # Use Empty directly since we imported it from queue
time.sleep(0) # Yield to other threads
except Exception as e:
print(f"Worker error: {e}")
raise
def benchmark_std_queue(config: BenchmarkConfig) -> Tuple[float, float]:
"""Run standard Queue benchmark"""
q = Queue(maxsize=1000)
ready_flag = threading.Event()
worker_thread = threading.Thread(target=std_queue_worker, args=(q, ready_flag))
worker_thread.start()
if not ready_flag.wait(timeout=5.0):
raise RuntimeError("Worker failed to start")
try:
# Pre-generate data
data = bytes([x % 256 for x in range(config.message_size)])
# Warmup phase
for _ in range(config.warmup_messages):
q.put_nowait(data)
time.sleep(0.1)
# Benchmark phase
start = time.perf_counter()
for _ in range(config.num_messages):
q.put_nowait(data)
elapsed = time.perf_counter() - start
# Stop worker
q.put(b"STOP")
worker_thread.join(timeout=5)
return elapsed, config.num_messages / elapsed
except Exception as e:
print(f"Error in STD_QUEUE benchmark: {str(e)}")
raise
def benchmark_async_queue(config: BenchmarkConfig) -> Tuple[float, float]:
"""Wrapper to run async benchmark in event loop"""
return asyncio.run(async_queue_benchmark(config))
async def async_queue_worker(queue: asyncio.Queue, ready_flag: asyncio.Event):
"""Worker coroutine for asyncio Queue"""
ready_flag.set()
try:
while True:
try:
message = queue.get_nowait()
if message == b"STOP":
break
queue.task_done()
except asyncio.QueueEmpty:
await asyncio.sleep(0) # Yield to event loop
except Exception as e:
print(f"Worker error: {e}")
raise
async def async_queue_benchmark(config: BenchmarkConfig) -> Tuple[float, float]:
"""Run asyncio Queue benchmark"""
queue = asyncio.Queue(maxsize=1000)
ready_flag = asyncio.Event()
# Create worker task in same process
worker_task = asyncio.create_task(async_queue_worker(queue, ready_flag))
async def cleanup() -> None:
if not worker_task.done():
worker_task.cancel()
try:
await worker_task
except asyncio.CancelledError:
pass
try:
await asyncio.wait_for(ready_flag.wait(), timeout=5.0)
# Pre-generate data
data = bytes([x % 256 for x in range(config.message_size)])
# Warmup phase
for _ in range(config.warmup_messages):
queue.put_nowait(data)
await asyncio.sleep(0.1)
# Benchmark phase
start = time.perf_counter()
for _ in range(config.num_messages):
queue.put_nowait(data)
elapsed = time.perf_counter() - start
# Stop worker
await queue.put(b"STOP")
await worker_task
return elapsed, config.num_messages / elapsed
except Exception as e:
print(f"Error in ASYNC_QUEUE benchmark: {str(e)}")
raise
finally:
await cleanup()
def async_queue_worker_process(queue: asyncio.Queue, ready_flag: mp.Event):
"""Worker process for asyncio Queue"""
asyncio.run(async_queue_worker(queue, ready_flag))
def mp_queue_worker(queue: mp.Queue, ready_flag: mp.Event):
"""Worker process for multiprocessing Queue"""
ready_flag.set()
try:
while True:
try:
message = queue.get() # Use blocking get() instead of get_nowait()
if message == b"STOP":
break
except EOFError: # Handle queue closing
break
except Exception as e:
print(f"Worker error: {e}")
raise
def benchmark_mp_queue(config: BenchmarkConfig) -> Tuple[float, float]:
"""Run multiprocessing Queue benchmark"""
# Use larger buffer size for big messages
mp_queue = mp.Queue(maxsize=max(2, 1024 * 1024 * 8 // config.message_size))
ready_flag = mp.Event()
worker = mp.Process(target=mp_queue_worker, args=(mp_queue, ready_flag))
worker.start()
def cleanup() -> None:
mp_queue.close()
try:
mp_queue.join_thread() # Wait for queue thread to finish
except Exception:
pass
try:
if not ready_flag.wait(timeout=5.0):
raise RuntimeError("Worker failed to start")
# Pre-generate data once
data = bytes([x % 256 for x in range(config.message_size)])
# Warmup phase
for _ in range(config.warmup_messages):
mp_queue.put(data) # Use blocking put() for large messages
time.sleep(0.1)
# Benchmark phase
start = time.perf_counter()
for _ in range(config.num_messages):
mp_queue.put(data) # Use blocking put() for large messages
elapsed = time.perf_counter() - start
# Stop worker
mp_queue.put(b"STOP")
worker.join(timeout=10) # Increased timeout for large messages
if worker.is_alive():
worker.terminate()
worker.join(timeout=1)
return elapsed, config.num_messages / elapsed
except Exception as e:
print(f"Error in MP_QUEUE benchmark: {str(e)}")
if worker.is_alive():
worker.terminate()
worker.join(timeout=1)
raise
finally:
cleanup()
def mp_queue_shm_worker(
queue: mp.Queue,
ready_flag: mp.Event,
shm_name: str | None = None,
msg_size: int = 0,
):
"""Worker process for multiprocessing Queue with shared memory"""
ready_flag.set()
shm = None
try:
if shm_name:
shm = shared_memory.SharedMemory(name=shm_name)
while True:
try:
message = queue.get()
if message == b"STOP":
break
except EOFError:
break
except Exception as e:
print(f"Worker error: {e}")
raise
finally:
if shm is not None:
shm.close()
def benchmark_mp_queue_shm(config: BenchmarkConfig) -> Tuple[float, float]:
"""Run multiprocessing Queue benchmark with shared memory for large messages"""
SHARED_MEM_THRESHOLD = 100_000 # Use shared memory for messages larger than 100KB
mp_queue = mp.Queue(maxsize=1000)
ready_flag = mp.Event()
shm = None
def cleanup() -> None:
mp_queue.close()
try:
mp_queue.join_thread()
except Exception:
pass
if shm is not None:
try:
shm.close()
shm.unlink()
except Exception:
pass
try:
# Create unique name for shared memory
shm_name = f"mp_queue_bench_{os.getpid()}"
# Create shared memory for large messages
shm = shared_memory.SharedMemory(
name=shm_name, create=True, size=config.message_size
)
# Pre-generate data directly in shared memory
data = bytes([x % 256 for x in range(config.message_size)])
shm.buf[: config.message_size] = data
worker = mp.Process(
target=mp_queue_shm_worker,
args=(mp_queue, ready_flag, shm_name, config.message_size),
)
worker.start()
if not ready_flag.wait(timeout=5.0):
raise RuntimeError("Worker failed to start")
# Warmup phase
for _ in range(config.warmup_messages):
mp_queue.put_nowait(b"NEXT")
time.sleep(0.1)
# Benchmark phase
start = time.perf_counter()
for _ in range(config.num_messages):
mp_queue.put_nowait(b"NEXT")
elapsed = time.perf_counter() - start
# Stop worker
mp_queue.put(b"STOP")
worker.join(timeout=5)
if worker.is_alive():
worker.terminate()
worker.join(timeout=1)
return elapsed, config.num_messages / elapsed
except Exception as e:
print(f"Error in MP_QUEUE benchmark: {str(e)}")
if worker.is_alive():
worker.terminate()
worker.join(timeout=1)
raise
finally:
cleanup()
def run_single_benchmark(
protocol: Protocol, config: BenchmarkConfig
) -> Tuple[float, float]:
"""Run benchmark for a single protocol with multiple iterations"""
results = []
for i in range(config.iterations):
if protocol == Protocol.SOCKET:
elapsed, msgs_per_sec = benchmark_socket(config)
elif protocol in [Protocol.ZMQ_TCP, Protocol.ZMQ_IPC, Protocol.ZMQ_INPROC]:
elapsed, msgs_per_sec = benchmark_zmq(protocol, config)
elif protocol in [
Protocol.ZMQ_TCP_SHM,
Protocol.ZMQ_IPC_SHM,
Protocol.ZMQ_INPROC_SHM,
]:
elapsed, msgs_per_sec = benchmark_zmq_shm(protocol, config)
elif protocol == Protocol.STD_QUEUE:
elapsed, msgs_per_sec = benchmark_std_queue(config)
elif protocol == Protocol.ASYNC_QUEUE:
elapsed, msgs_per_sec = benchmark_async_queue(config)
elif protocol == Protocol.MP_QUEUE:
elapsed, msgs_per_sec = benchmark_mp_queue(config)
elif protocol == Protocol.MP_QUEUE_SHM:
elapsed, msgs_per_sec = benchmark_mp_queue_shm(config)
else:
raise ValueError(f"Unknown protocol: {protocol}")
results.append(msgs_per_sec)
# Short pause between iterations
time.sleep(0.1)
# Calculate average messages per second
avg_msgs_per_sec = sum(results) / len(results)
# Calculate MB/s
mb_per_sec = (avg_msgs_per_sec * config.message_size) / (1024 * 1024)
return avg_msgs_per_sec, mb_per_sec
def run_benchmark(sizes: List[int], num_messages: int = 1000, iterations: int = 3):
"""Run benchmarks for all protocols and message sizes"""
config = BenchmarkConfig(
message_size=0, num_messages=num_messages, iterations=iterations
)
print(f"\nBenchmarking with {num_messages:,} messages per iteration")
print(f"Warmup: {config.warmup_messages:,} messages before each test\n")
# Header with aligned columns
print(
f"{'Size (bytes)':>12} | {'Protocol':<14} | {'Msgs/sec':>12} | {'MB/sec':>12}"
)
print("-" * 59)
try:
# Try to set CPU affinity
import psutil
p = psutil.Process()
p.cpu_affinity([0])
except (ImportError, AttributeError):
pass
for size in sizes:
config.message_size = size
for protocol in Protocol:
try:
avg_msgs, mbps = run_single_benchmark(protocol, config)
print(
f"{size:>12,} | {protocol.value:<14} | {avg_msgs:>12,.2f} | {mbps:>12,.2f}"
)
except Exception as e:
print(f"Error running {protocol.value} benchmark: {str(e)}")
print("-" * 59)
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal.SIG_DFL)
# Print version info
print(f"ZMQ version: {zmq.zmq_version()}")
print(f"pyzmq version: {zmq.__version__}")
print("Available IPC support:", zmq.has("ipc"))
sizes = [1024, 10240, 102400, 1024000] # 1KB to 1MB
run_benchmark(sizes, num_messages=1000, iterations=3)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment