Created
May 29, 2025 23:53
-
-
Save btakita/b3f00e3621dc12781f0a67714d357898 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
2025-05-27 run on Dell XPS15 2023 | |
ZMQ version: 4.3.5 | |
pyzmq version: 26.4.0 | |
Available IPC support: True | |
Benchmarking with 1,000 messages per iteration | |
Warmup: 1,000 messages before each test | |
Size (bytes) | Protocol | Msgs/sec | MB/sec | |
----------------------------------------------------------- | |
1,024 | Socket | 593,977.04 | 580.06 | |
1,024 | ZMQ_IPC | 1,161,001.29 | 1,133.79 | |
1,024 | ZMQ_TCP | 1,002,655.81 | 979.16 | |
1,024 | ZMQ_INPROC | 416,405.82 | 406.65 | |
1,024 | ZMQ_IPC_SHM | 1,170,570.56 | 1,143.14 | |
1,024 | ZMQ_TCP_SHM | 1,210,964.14 | 1,182.58 | |
1,024 | ZMQ_INPROC_SHM | 1,056,797.28 | 1,032.03 | |
1,024 | STD_QUEUE | 782,225.72 | 763.89 | |
1,024 | ASYNC_QUEUE | 2,837,591.37 | 2,771.09 | |
1,024 | MP_QUEUE | 1,254,349.13 | 1,224.95 | |
1,024 | MP_QUEUE_SHM | 1,200,719.08 | 1,172.58 | |
----------------------------------------------------------- | |
10,240 | Socket | 221,918.30 | 2,167.17 | |
10,240 | ZMQ_IPC | 218,463.23 | 2,133.43 | |
10,240 | ZMQ_TCP | 161,026.23 | 1,572.52 | |
10,240 | ZMQ_INPROC | 200,790.34 | 1,960.84 | |
10,240 | ZMQ_IPC_SHM | 956,264.70 | 9,338.52 | |
10,240 | ZMQ_TCP_SHM | 1,160,042.86 | 11,328.54 | |
10,240 | ZMQ_INPROC_SHM | 951,193.90 | 9,289.00 | |
10,240 | STD_QUEUE | 1,092,957.56 | 10,673.41 | |
10,240 | ASYNC_QUEUE | 3,619,043.57 | 35,342.22 | |
10,240 | MP_QUEUE | 243,694.75 | 2,379.83 | |
10,240 | MP_QUEUE_SHM | 1,184,226.31 | 11,564.71 | |
----------------------------------------------------------- | |
102,400 | Socket | 57,102.27 | 5,576.39 | |
102,400 | ZMQ_IPC | 122,622.60 | 11,974.86 | |
102,400 | ZMQ_TCP | 125,643.48 | 12,269.87 | |
102,400 | ZMQ_INPROC | 140,751.56 | 13,745.27 | |
102,400 | ZMQ_IPC_SHM | 1,355,311.39 | 132,354.63 | |
102,400 | ZMQ_TCP_SHM | 947,396.02 | 92,519.14 | |
102,400 | ZMQ_INPROC_SHM | 1,599,929.13 | 156,243.08 | |
102,400 | STD_QUEUE | 1,109,620.64 | 108,361.39 | |
102,400 | ASYNC_QUEUE | 4,047,317.49 | 395,245.85 | |
102,400 | MP_QUEUE | 27,660.12 | 2,701.18 | |
102,400 | MP_QUEUE_SHM | 1,191,087.80 | 116,317.17 | |
----------------------------------------------------------- | |
1,024,000 | Socket | 2,085.87 | 2,036.98 | |
1,024,000 | ZMQ_IPC | 125,143.93 | 122,210.87 | |
1,024,000 | ZMQ_TCP | 120,410.12 | 117,588.01 | |
1,024,000 | ZMQ_INPROC | 147,484.47 | 144,027.80 | |
1,024,000 | ZMQ_IPC_SHM | 1,388,164.61 | 1,355,629.50 | |
1,024,000 | ZMQ_TCP_SHM | 1,306,191.80 | 1,275,577.93 | |
1,024,000 | ZMQ_INPROC_SHM | 1,025,621.10 | 1,001,583.10 | |
1,024,000 | STD_QUEUE | 1,104,203.62 | 1,078,323.85 | |
1,024,000 | ASYNC_QUEUE | 3,803,019.84 | 3,713,886.56 | |
1,024,000 | MP_QUEUE | 2,275.53 | 2,222.20 | |
1,024,000 | MP_QUEUE_SHM | 1,173,621.55 | 1,146,114.79 | |
----------------------------------------------------------- | |
""" | |
import asyncio | |
import multiprocessing as mp | |
import os | |
import signal | |
import socket | |
import struct | |
import tempfile | |
import threading | |
import time | |
from dataclasses import dataclass | |
from enum import Enum | |
from multiprocessing import shared_memory | |
from queue import Empty, Queue | |
from typing import List, Tuple | |
import zmq | |
class Protocol(Enum): | |
SOCKET = "Socket" | |
ZMQ_IPC = "ZMQ_IPC" | |
ZMQ_TCP = "ZMQ_TCP" | |
ZMQ_INPROC = "ZMQ_INPROC" | |
ZMQ_IPC_SHM = "ZMQ_IPC_SHM" | |
ZMQ_TCP_SHM = "ZMQ_TCP_SHM" | |
ZMQ_INPROC_SHM = "ZMQ_INPROC_SHM" | |
STD_QUEUE = "STD_QUEUE" | |
ASYNC_QUEUE = "ASYNC_QUEUE" | |
MP_QUEUE = "MP_QUEUE" | |
MP_QUEUE_SHM = "MP_QUEUE_SHM" | |
@dataclass | |
class BenchmarkConfig: | |
message_size: int | |
num_messages: int | |
iterations: int = 3 | |
warmup_messages: int = 1000 | |
tcp_port: int = 5555 | |
raw_socket_port: int = 5556 | |
def socket_worker(port: int, ready_flag: mp.Event): | |
"""Raw socket worker process""" | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024 * 1024) # 1MB buffer | |
try: | |
sock.connect(("127.0.0.1", port)) | |
ready_flag.set() | |
while True: | |
# Receive message length first | |
length_data = sock.recv(4) | |
if not length_data: | |
break | |
length = struct.unpack("!I", length_data)[0] | |
# Receive the actual message | |
message = b"" | |
while len(message) < length: | |
chunk = sock.recv(length - len(message)) | |
if not chunk: | |
return | |
message += chunk | |
if message == b"STOP": | |
break | |
finally: | |
sock.close() | |
def benchmark_socket(config: BenchmarkConfig) -> Tuple[float, float]: | |
"""Run raw socket benchmark""" | |
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024 * 1024) | |
server_socket.bind(("127.0.0.1", 0)) | |
server_socket.listen(1) | |
port = server_socket.getsockname()[1] | |
ready_flag = mp.Event() | |
worker = mp.Process(target=socket_worker, args=(port, ready_flag)) | |
worker.start() | |
if not ready_flag.wait(timeout=5.0): | |
raise RuntimeError("Worker failed to start") | |
client_socket, _ = server_socket.accept() | |
try: | |
# Pre-generate data and message | |
data = bytes([x % 256 for x in range(config.message_size)]) | |
length_header = struct.pack("!I", len(data)) | |
message = length_header + data # Combine header and data | |
# Warmup phase | |
for _ in range(config.warmup_messages): | |
client_socket.sendall(message) | |
time.sleep(0.1) | |
# Benchmark phase | |
start = time.perf_counter() | |
for _ in range(config.num_messages): | |
client_socket.sendall(message) | |
elapsed = time.perf_counter() - start | |
# Stop worker | |
stop_msg = struct.pack("!I", 4) + b"STOP" | |
client_socket.sendall(stop_msg) | |
worker.join(timeout=5) | |
if worker.is_alive(): | |
worker.terminate() | |
return elapsed, config.num_messages / elapsed | |
finally: | |
client_socket.close() | |
server_socket.close() | |
def benchmark_zmq(protocol: Protocol, config: BenchmarkConfig) -> Tuple[float, float]: | |
"""Run ZMQ benchmark for TCP, IPC, or INPROC""" | |
ready_flag = mp.Event() | |
# Setup connection based on protocol | |
if protocol == Protocol.ZMQ_TCP: | |
address = f"tcp://127.0.0.1:{config.tcp_port}" | |
elif protocol == Protocol.ZMQ_IPC: | |
temp_dir = tempfile.mkdtemp() | |
ipc_path = os.path.join(temp_dir, "benchmark.ipc") | |
address = f"ipc://{ipc_path}" | |
else: # Protocol.ZMQ_INPROC | |
address = "inproc://benchmark" | |
def cleanup() -> None: | |
socket.close() | |
context.term() | |
if protocol == Protocol.ZMQ_IPC: | |
try: | |
os.unlink(ipc_path) | |
os.rmdir(temp_dir) | |
except OSError: | |
pass | |
try: | |
if protocol == Protocol.ZMQ_INPROC: | |
context = zmq.Context(io_threads=1) # Single IO thread for small messages | |
else: | |
context = zmq.Context() | |
socket = context.socket(zmq.PUSH) | |
socket.setsockopt(zmq.SNDHWM, 0) | |
socket.setsockopt(zmq.LINGER, 0) | |
# Optimize for message size | |
if config.message_size >= 10240: # 10KB threshold | |
socket.setsockopt(zmq.SNDBUF, 8388608) # 8MB buffer | |
else: | |
# For small messages, optimize for latency | |
socket.setsockopt(zmq.IMMEDIATE, 1) | |
if protocol == Protocol.ZMQ_INPROC: | |
socket.setsockopt(zmq.SNDHWM, 1000) # Smaller HWM for better latency | |
socket.setsockopt(zmq.RCVHWM, 1000) | |
socket.setsockopt(zmq.LINGER, 0) | |
if config.message_size <= 1024: | |
# Additional optimizations for small messages | |
socket.setsockopt(zmq.IMMEDIATE, 1) # Immediate delivery | |
socket.setsockopt(zmq.CONFLATE, 0) # No message conflation | |
socket.bind(address) | |
if protocol == Protocol.ZMQ_INPROC: | |
# For inproc, we need to share the context | |
worker = threading.Thread( | |
target=zmq_worker, args=(protocol, address, ready_flag, context) | |
) | |
else: | |
worker = mp.Process(target=zmq_worker, args=(protocol, address, ready_flag)) | |
worker.start() | |
if not ready_flag.wait(timeout=5.0): | |
raise RuntimeError(f"Worker failed to start for {protocol.value}") | |
# Generate data once before the loop - ZMQ's zero-copy optimization will handle it | |
data = bytes([x % 256 for x in range(config.message_size)]) | |
# Warmup phase | |
for _ in range(config.warmup_messages): | |
socket.send(data) | |
time.sleep(0.1) | |
# Benchmark phase | |
start = time.perf_counter() | |
for _ in range(config.num_messages): | |
if config.message_size >= 8192: # 8KB threshold | |
# Use zero-copy for large messages | |
socket.send(data, copy=False) | |
else: | |
# Direct copy for small messages | |
socket.send(data, copy=True) | |
elapsed = time.perf_counter() - start | |
# Stop worker | |
socket.send(b"STOP") | |
worker.join(timeout=5) | |
if isinstance(worker, mp.Process) and worker.is_alive(): | |
worker.terminate() | |
worker.join() | |
return elapsed, config.num_messages / elapsed | |
finally: | |
cleanup() | |
def zmq_worker( | |
protocol: Protocol, address: str, ready_flag: mp.Event, shared_context=None | |
): | |
"""ZMQ worker process/thread for TCP, IPC, and INPROC""" | |
try: | |
context = shared_context if shared_context else zmq.Context() | |
socket = context.socket(zmq.PULL) | |
socket.setsockopt(zmq.RCVHWM, 0) | |
socket.setsockopt(zmq.LINGER, 0) | |
socket.connect(address) | |
ready_flag.set() | |
while True: | |
try: | |
message = socket.recv() | |
if message == b"STOP": | |
break | |
except zmq.ZMQError: | |
break | |
finally: | |
socket.close() | |
if not shared_context: # Only terminate context if we created it | |
context.term() | |
def benchmark_zmq_shm( | |
protocol: Protocol, config: BenchmarkConfig | |
) -> Tuple[float, float]: | |
"""Run ZMQ benchmark with shared memory optimization for large messages""" | |
SHARED_MEM_THRESHOLD = 100_000 # Use shared memory for messages larger than 100KB | |
ready_flag = mp.Event() | |
shm = None | |
# Setup connection based on protocol | |
if protocol == Protocol.ZMQ_TCP_SHM: | |
address = f"tcp://127.0.0.1:{config.tcp_port}" | |
elif protocol == Protocol.ZMQ_IPC_SHM: | |
temp_dir = tempfile.mkdtemp() | |
ipc_path = os.path.join(temp_dir, "benchmark.ipc") | |
address = f"ipc://{ipc_path}" | |
else: # Protocol.ZMQ_INPROC_SHM | |
address = "inproc://benchmark" | |
def cleanup() -> None: | |
socket.close() | |
context.term() | |
if protocol == Protocol.ZMQ_IPC_SHM: | |
try: | |
os.unlink(ipc_path) | |
os.rmdir(temp_dir) | |
except OSError: | |
pass | |
# Let the worker close its shared memory first | |
time.sleep(0.1) | |
if shm is not None: | |
try: | |
shm.close() | |
shm.unlink() | |
except Exception: | |
pass | |
try: | |
context = zmq.Context() | |
socket = context.socket(zmq.PUSH) | |
socket.setsockopt(zmq.SNDHWM, 0) | |
socket.setsockopt(zmq.LINGER, 0) | |
if config.message_size >= 10240: # 10KB+ | |
socket.setsockopt(zmq.SNDBUF, 8388608) # 8MB buffer | |
else: | |
socket.setsockopt(zmq.IMMEDIATE, 1) | |
socket.bind(address) | |
# Create shared memory for large messages | |
shm_name = f"zmq_bench_{os.getpid()}" | |
shm = shared_memory.SharedMemory( | |
name=shm_name, create=True, size=config.message_size | |
) | |
# Pre-generate data directly in shared memory | |
data = bytes([x % 256 for x in range(config.message_size)]) | |
shm.buf[: config.message_size] = data | |
if protocol == Protocol.ZMQ_INPROC_SHM: | |
worker = threading.Thread( | |
target=zmq_shm_worker, | |
args=(protocol, address, ready_flag, context, shm_name, config), | |
) | |
else: | |
worker = mp.Process( | |
target=zmq_shm_worker, | |
args=(protocol, address, ready_flag, None, shm_name, config), | |
) | |
worker.start() | |
if not ready_flag.wait(timeout=5.0): | |
raise RuntimeError(f"Worker failed to start for {protocol.value}") | |
# Warmup phase | |
for _ in range(config.warmup_messages): | |
socket.send(b"NEXT") | |
time.sleep(0.1) | |
# Benchmark phase | |
start = time.perf_counter() | |
for _ in range(config.num_messages): | |
socket.send(b"NEXT") | |
elapsed = time.perf_counter() - start | |
# Stop worker | |
socket.send(b"STOP") | |
worker.join(timeout=5) | |
if isinstance(worker, mp.Process) and worker.is_alive(): | |
worker.terminate() | |
worker.join() | |
return elapsed, config.num_messages / elapsed | |
finally: | |
cleanup() | |
def zmq_shm_worker( | |
protocol: Protocol, | |
address: str, | |
ready_flag: mp.Event, | |
shared_context=None, | |
shm_name: str = None, | |
config: BenchmarkConfig = None, | |
): | |
"""ZMQ worker process/thread with shared memory support""" | |
shm = None | |
try: | |
context = shared_context if shared_context else zmq.Context() | |
socket = context.socket(zmq.PULL) | |
socket.setsockopt(zmq.RCVHWM, 0) | |
socket.setsockopt(zmq.LINGER, 0) | |
if config.message_size >= 10240: # 10KB+ | |
socket.setsockopt(zmq.RCVBUF, 8388608) # 8MB buffer | |
socket.connect(address) | |
# Connect to shared memory | |
shm = shared_memory.SharedMemory(name=shm_name) | |
data = bytes(shm.buf[: config.message_size]) # Make a copy of the data | |
ready_flag.set() | |
while True: | |
try: | |
message = socket.recv() | |
if message == b"STOP": | |
break | |
except zmq.ZMQError: | |
break | |
finally: | |
socket.close() | |
if not shared_context: | |
context.term() | |
if shm is not None: | |
try: | |
shm.close() | |
except Exception: | |
pass | |
def std_queue_worker(q: Queue, ready_flag: threading.Event): | |
"""Worker thread for standard Queue""" | |
ready_flag.set() | |
try: | |
while True: | |
try: | |
message = q.get_nowait() | |
if message == b"STOP": | |
break | |
q.task_done() # Call task_done() only after successful get | |
except Empty: # Use Empty directly since we imported it from queue | |
time.sleep(0) # Yield to other threads | |
except Exception as e: | |
print(f"Worker error: {e}") | |
raise | |
def benchmark_std_queue(config: BenchmarkConfig) -> Tuple[float, float]: | |
"""Run standard Queue benchmark""" | |
q = Queue(maxsize=1000) | |
ready_flag = threading.Event() | |
worker_thread = threading.Thread(target=std_queue_worker, args=(q, ready_flag)) | |
worker_thread.start() | |
if not ready_flag.wait(timeout=5.0): | |
raise RuntimeError("Worker failed to start") | |
try: | |
# Pre-generate data | |
data = bytes([x % 256 for x in range(config.message_size)]) | |
# Warmup phase | |
for _ in range(config.warmup_messages): | |
q.put_nowait(data) | |
time.sleep(0.1) | |
# Benchmark phase | |
start = time.perf_counter() | |
for _ in range(config.num_messages): | |
q.put_nowait(data) | |
elapsed = time.perf_counter() - start | |
# Stop worker | |
q.put(b"STOP") | |
worker_thread.join(timeout=5) | |
return elapsed, config.num_messages / elapsed | |
except Exception as e: | |
print(f"Error in STD_QUEUE benchmark: {str(e)}") | |
raise | |
def benchmark_async_queue(config: BenchmarkConfig) -> Tuple[float, float]: | |
"""Wrapper to run async benchmark in event loop""" | |
return asyncio.run(async_queue_benchmark(config)) | |
async def async_queue_worker(queue: asyncio.Queue, ready_flag: asyncio.Event): | |
"""Worker coroutine for asyncio Queue""" | |
ready_flag.set() | |
try: | |
while True: | |
try: | |
message = queue.get_nowait() | |
if message == b"STOP": | |
break | |
queue.task_done() | |
except asyncio.QueueEmpty: | |
await asyncio.sleep(0) # Yield to event loop | |
except Exception as e: | |
print(f"Worker error: {e}") | |
raise | |
async def async_queue_benchmark(config: BenchmarkConfig) -> Tuple[float, float]: | |
"""Run asyncio Queue benchmark""" | |
queue = asyncio.Queue(maxsize=1000) | |
ready_flag = asyncio.Event() | |
# Create worker task in same process | |
worker_task = asyncio.create_task(async_queue_worker(queue, ready_flag)) | |
async def cleanup() -> None: | |
if not worker_task.done(): | |
worker_task.cancel() | |
try: | |
await worker_task | |
except asyncio.CancelledError: | |
pass | |
try: | |
await asyncio.wait_for(ready_flag.wait(), timeout=5.0) | |
# Pre-generate data | |
data = bytes([x % 256 for x in range(config.message_size)]) | |
# Warmup phase | |
for _ in range(config.warmup_messages): | |
queue.put_nowait(data) | |
await asyncio.sleep(0.1) | |
# Benchmark phase | |
start = time.perf_counter() | |
for _ in range(config.num_messages): | |
queue.put_nowait(data) | |
elapsed = time.perf_counter() - start | |
# Stop worker | |
await queue.put(b"STOP") | |
await worker_task | |
return elapsed, config.num_messages / elapsed | |
except Exception as e: | |
print(f"Error in ASYNC_QUEUE benchmark: {str(e)}") | |
raise | |
finally: | |
await cleanup() | |
def async_queue_worker_process(queue: asyncio.Queue, ready_flag: mp.Event): | |
"""Worker process for asyncio Queue""" | |
asyncio.run(async_queue_worker(queue, ready_flag)) | |
def mp_queue_worker(queue: mp.Queue, ready_flag: mp.Event): | |
"""Worker process for multiprocessing Queue""" | |
ready_flag.set() | |
try: | |
while True: | |
try: | |
message = queue.get() # Use blocking get() instead of get_nowait() | |
if message == b"STOP": | |
break | |
except EOFError: # Handle queue closing | |
break | |
except Exception as e: | |
print(f"Worker error: {e}") | |
raise | |
def benchmark_mp_queue(config: BenchmarkConfig) -> Tuple[float, float]: | |
"""Run multiprocessing Queue benchmark""" | |
# Use larger buffer size for big messages | |
mp_queue = mp.Queue(maxsize=max(2, 1024 * 1024 * 8 // config.message_size)) | |
ready_flag = mp.Event() | |
worker = mp.Process(target=mp_queue_worker, args=(mp_queue, ready_flag)) | |
worker.start() | |
def cleanup() -> None: | |
mp_queue.close() | |
try: | |
mp_queue.join_thread() # Wait for queue thread to finish | |
except Exception: | |
pass | |
try: | |
if not ready_flag.wait(timeout=5.0): | |
raise RuntimeError("Worker failed to start") | |
# Pre-generate data once | |
data = bytes([x % 256 for x in range(config.message_size)]) | |
# Warmup phase | |
for _ in range(config.warmup_messages): | |
mp_queue.put(data) # Use blocking put() for large messages | |
time.sleep(0.1) | |
# Benchmark phase | |
start = time.perf_counter() | |
for _ in range(config.num_messages): | |
mp_queue.put(data) # Use blocking put() for large messages | |
elapsed = time.perf_counter() - start | |
# Stop worker | |
mp_queue.put(b"STOP") | |
worker.join(timeout=10) # Increased timeout for large messages | |
if worker.is_alive(): | |
worker.terminate() | |
worker.join(timeout=1) | |
return elapsed, config.num_messages / elapsed | |
except Exception as e: | |
print(f"Error in MP_QUEUE benchmark: {str(e)}") | |
if worker.is_alive(): | |
worker.terminate() | |
worker.join(timeout=1) | |
raise | |
finally: | |
cleanup() | |
def mp_queue_shm_worker( | |
queue: mp.Queue, | |
ready_flag: mp.Event, | |
shm_name: str | None = None, | |
msg_size: int = 0, | |
): | |
"""Worker process for multiprocessing Queue with shared memory""" | |
ready_flag.set() | |
shm = None | |
try: | |
if shm_name: | |
shm = shared_memory.SharedMemory(name=shm_name) | |
while True: | |
try: | |
message = queue.get() | |
if message == b"STOP": | |
break | |
except EOFError: | |
break | |
except Exception as e: | |
print(f"Worker error: {e}") | |
raise | |
finally: | |
if shm is not None: | |
shm.close() | |
def benchmark_mp_queue_shm(config: BenchmarkConfig) -> Tuple[float, float]: | |
"""Run multiprocessing Queue benchmark with shared memory for large messages""" | |
SHARED_MEM_THRESHOLD = 100_000 # Use shared memory for messages larger than 100KB | |
mp_queue = mp.Queue(maxsize=1000) | |
ready_flag = mp.Event() | |
shm = None | |
def cleanup() -> None: | |
mp_queue.close() | |
try: | |
mp_queue.join_thread() | |
except Exception: | |
pass | |
if shm is not None: | |
try: | |
shm.close() | |
shm.unlink() | |
except Exception: | |
pass | |
try: | |
# Create unique name for shared memory | |
shm_name = f"mp_queue_bench_{os.getpid()}" | |
# Create shared memory for large messages | |
shm = shared_memory.SharedMemory( | |
name=shm_name, create=True, size=config.message_size | |
) | |
# Pre-generate data directly in shared memory | |
data = bytes([x % 256 for x in range(config.message_size)]) | |
shm.buf[: config.message_size] = data | |
worker = mp.Process( | |
target=mp_queue_shm_worker, | |
args=(mp_queue, ready_flag, shm_name, config.message_size), | |
) | |
worker.start() | |
if not ready_flag.wait(timeout=5.0): | |
raise RuntimeError("Worker failed to start") | |
# Warmup phase | |
for _ in range(config.warmup_messages): | |
mp_queue.put_nowait(b"NEXT") | |
time.sleep(0.1) | |
# Benchmark phase | |
start = time.perf_counter() | |
for _ in range(config.num_messages): | |
mp_queue.put_nowait(b"NEXT") | |
elapsed = time.perf_counter() - start | |
# Stop worker | |
mp_queue.put(b"STOP") | |
worker.join(timeout=5) | |
if worker.is_alive(): | |
worker.terminate() | |
worker.join(timeout=1) | |
return elapsed, config.num_messages / elapsed | |
except Exception as e: | |
print(f"Error in MP_QUEUE benchmark: {str(e)}") | |
if worker.is_alive(): | |
worker.terminate() | |
worker.join(timeout=1) | |
raise | |
finally: | |
cleanup() | |
def run_single_benchmark( | |
protocol: Protocol, config: BenchmarkConfig | |
) -> Tuple[float, float]: | |
"""Run benchmark for a single protocol with multiple iterations""" | |
results = [] | |
for i in range(config.iterations): | |
if protocol == Protocol.SOCKET: | |
elapsed, msgs_per_sec = benchmark_socket(config) | |
elif protocol in [Protocol.ZMQ_TCP, Protocol.ZMQ_IPC, Protocol.ZMQ_INPROC]: | |
elapsed, msgs_per_sec = benchmark_zmq(protocol, config) | |
elif protocol in [ | |
Protocol.ZMQ_TCP_SHM, | |
Protocol.ZMQ_IPC_SHM, | |
Protocol.ZMQ_INPROC_SHM, | |
]: | |
elapsed, msgs_per_sec = benchmark_zmq_shm(protocol, config) | |
elif protocol == Protocol.STD_QUEUE: | |
elapsed, msgs_per_sec = benchmark_std_queue(config) | |
elif protocol == Protocol.ASYNC_QUEUE: | |
elapsed, msgs_per_sec = benchmark_async_queue(config) | |
elif protocol == Protocol.MP_QUEUE: | |
elapsed, msgs_per_sec = benchmark_mp_queue(config) | |
elif protocol == Protocol.MP_QUEUE_SHM: | |
elapsed, msgs_per_sec = benchmark_mp_queue_shm(config) | |
else: | |
raise ValueError(f"Unknown protocol: {protocol}") | |
results.append(msgs_per_sec) | |
# Short pause between iterations | |
time.sleep(0.1) | |
# Calculate average messages per second | |
avg_msgs_per_sec = sum(results) / len(results) | |
# Calculate MB/s | |
mb_per_sec = (avg_msgs_per_sec * config.message_size) / (1024 * 1024) | |
return avg_msgs_per_sec, mb_per_sec | |
def run_benchmark(sizes: List[int], num_messages: int = 1000, iterations: int = 3): | |
"""Run benchmarks for all protocols and message sizes""" | |
config = BenchmarkConfig( | |
message_size=0, num_messages=num_messages, iterations=iterations | |
) | |
print(f"\nBenchmarking with {num_messages:,} messages per iteration") | |
print(f"Warmup: {config.warmup_messages:,} messages before each test\n") | |
# Header with aligned columns | |
print( | |
f"{'Size (bytes)':>12} | {'Protocol':<14} | {'Msgs/sec':>12} | {'MB/sec':>12}" | |
) | |
print("-" * 59) | |
try: | |
# Try to set CPU affinity | |
import psutil | |
p = psutil.Process() | |
p.cpu_affinity([0]) | |
except (ImportError, AttributeError): | |
pass | |
for size in sizes: | |
config.message_size = size | |
for protocol in Protocol: | |
try: | |
avg_msgs, mbps = run_single_benchmark(protocol, config) | |
print( | |
f"{size:>12,} | {protocol.value:<14} | {avg_msgs:>12,.2f} | {mbps:>12,.2f}" | |
) | |
except Exception as e: | |
print(f"Error running {protocol.value} benchmark: {str(e)}") | |
print("-" * 59) | |
if __name__ == "__main__": | |
signal.signal(signal.SIGINT, signal.SIG_DFL) | |
# Print version info | |
print(f"ZMQ version: {zmq.zmq_version()}") | |
print(f"pyzmq version: {zmq.__version__}") | |
print("Available IPC support:", zmq.has("ipc")) | |
sizes = [1024, 10240, 102400, 1024000] # 1KB to 1MB | |
run_benchmark(sizes, num_messages=1000, iterations=3) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment