Skip to content

Instantly share code, notes, and snippets.

@btakita
Created May 29, 2025 23:56
Show Gist options
  • Save btakita/5df3c6308ad058a003eece54b25076c9 to your computer and use it in GitHub Desktop.
Save btakita/5df3c6308ad058a003eece54b25076c9 to your computer and use it in GitHub Desktop.
"""
2025-05-27 run on Dell XPS15 2023
Ray version: 2.46.0
Python Multiprocessing available: fork
Benchmarking with 1,000 messages per iteration
Warmup: 1,000 messages before each test
Size (bytes) | Protocol | Msgs/sec | MB/sec
--------------------------------------------------------------
1,024 | MP_QUEUE | 19,601.62 | 19.14
1,024 | RAY_QUEUE | 702.01 | 0.69
1,024 | RAY_QUEUE_NO_AWAIT | 6,104.63 | 5.96
--------------------------------------------------------------
10,240 | MP_QUEUE | 2,210.08 | 21.58
10,240 | RAY_QUEUE | 542.75 | 5.30
10,240 | RAY_QUEUE_NO_AWAIT | 1,935.86 | 18.90
--------------------------------------------------------------
102,400 | MP_QUEUE | 222.01 | 21.68
102,400 | RAY_QUEUE | 149.84 | 14.63
102,400 | RAY_QUEUE_NO_AWAIT | 211.62 | 20.67
--------------------------------------------------------------
"""
import asyncio
import multiprocessing as mp
import signal
import time
from dataclasses import dataclass
from enum import Enum
from typing import List, Tuple
import ray.util.queue
class Protocol(Enum):
MP_QUEUE = "MP_QUEUE"
RAY_QUEUE = "RAY_QUEUE"
RAY_QUEUE_NO_AWAIT = "RAY_QUEUE_NO_AWAIT"
@dataclass
class BenchmarkConfig:
message_size: int
num_messages: int
iterations: int = 3
warmup_messages: int = 1000
def mp_queue_worker(queue: mp.Queue, ready_flag: mp.Event):
"""Worker process for multiprocessing Queue"""
ready_flag.set()
while True:
message = queue.get()
if message == b"STOP":
break
def benchmark_mp_queue(config: BenchmarkConfig) -> Tuple[float, float]:
"""Run multiprocessing Queue benchmark"""
queue = mp.Queue(maxsize=0) # Unlimited queue size
ready_flag = mp.Event()
worker = mp.Process(target=mp_queue_worker, args=(queue, ready_flag))
worker.start()
if not ready_flag.wait(timeout=5.0):
raise RuntimeError("Worker failed to start for multiprocessing Queue")
try:
# Warmup phase
for _ in range(config.warmup_messages):
data = bytes([x % 256 for x in range(config.message_size)])
queue.put(data)
time.sleep(0.1)
# Benchmark phase
start = time.perf_counter()
for _ in range(config.num_messages):
data = bytes([x % 256 for x in range(config.message_size)])
queue.put(data)
elapsed = time.perf_counter() - start
# Stop worker
queue.put(b"STOP")
worker.join(timeout=5)
if worker.is_alive():
worker.terminate()
worker.join()
return elapsed, config.num_messages / elapsed
except Exception as e:
print(f"Error in MP_QUEUE benchmark: {str(e)}")
if worker.is_alive():
worker.terminate()
raise
@ray.remote
class RayQueueWorker:
def __init__(self, queue: ray.util.queue.Queue):
self.queue = queue
async def run(self):
while True:
message = await self.queue.actor.get.remote()
if message == b"STOP":
break
async def benchmark_ray_queue(config: BenchmarkConfig, no_await: bool = False) -> Tuple[float, float]:
"""Run Ray Queue benchmark"""
queue = ray.util.queue.Queue(maxsize=0)
# Start worker
worker = RayQueueWorker.remote(queue)
worker_task = worker.run.remote()
try:
# Warmup phase
for _ in range(config.warmup_messages):
# Create new data for each message to avoid caching
data = bytes([x % 256 for x in range(config.message_size)])
if no_await:
queue.actor.put_nowait.remote(data)
else:
await queue.actor.put_nowait.remote(data)
await asyncio.sleep(0.1)
# Benchmark phase
start = time.perf_counter()
for _ in range(config.num_messages):
# Create new data for each message to avoid caching
data = bytes([x % 256 for x in range(config.message_size)])
if no_await:
queue.actor.put_nowait.remote(data)
else:
await queue.actor.put_nowait.remote(data)
elapsed = time.perf_counter() - start
# Stop worker
queue.actor.put_nowait.remote(b"STOP")
await worker_task
return elapsed, config.num_messages / elapsed
except Exception as e:
print(f"Error in RAY_QUEUE benchmark: {str(e)}")
raise
async def run_single_benchmark(
protocol: Protocol, config: BenchmarkConfig
) -> Tuple[float, float]:
"""Run benchmark for a single protocol with multiple iterations"""
results = []
for i in range(config.iterations):
if protocol == Protocol.MP_QUEUE:
elapsed, msgs_per_sec = benchmark_mp_queue(config)
elif protocol == Protocol.RAY_QUEUE:
elapsed, msgs_per_sec = await benchmark_ray_queue(config, False)
elif protocol == Protocol.RAY_QUEUE_NO_AWAIT:
elapsed, msgs_per_sec = await benchmark_ray_queue(config, True)
else:
raise ValueError(f"Unknown protocol: {protocol}")
results.append(msgs_per_sec)
# Short pause between iterations
await asyncio.sleep(0.1)
# Calculate average messages per second
avg_msgs_per_sec = sum(results) / len(results)
# Calculate MB/s
mb_per_sec = (avg_msgs_per_sec * config.message_size) / (1024 * 1024)
return avg_msgs_per_sec, mb_per_sec
async def run_benchmark(
sizes: List[int], num_messages: int = 1000, iterations: int = 3
):
"""Run benchmarks for all protocols and message sizes"""
config = BenchmarkConfig(
message_size=0, num_messages=num_messages, iterations=iterations
)
print(f"\nBenchmarking with {num_messages:,} messages per iteration")
print(f"Warmup: {config.warmup_messages:,} messages before each test\n")
print(
f"{'Size (bytes)':>12} | {'Protocol':<18} | {'Msgs/sec':>12} | {'MB/sec':>10}"
)
print("-" * 62)
try:
# Try to set CPU affinity
import psutil
p = psutil.Process()
p.cpu_affinity([0])
except (ImportError, AttributeError):
pass
for size in sizes:
config.message_size = size
for protocol in Protocol:
try:
avg_msgs, mbps = await run_single_benchmark(protocol, config)
print(
f"{size:>12,} | {protocol.value:<18} | {avg_msgs:>12,.2f} | {mbps:>10,.2f}"
)
except Exception as e:
print(f"Error running {protocol.value} benchmark: {str(e)}")
print("-" * 62)
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal.SIG_DFL)
# Initialize Ray
ray.init()
print(f"Ray version: {ray.__version__}")
print(f"Python Multiprocessing available: {mp.get_start_method()}")
sizes = [1024, 10240, 102400] # 1KB to 100KB
asyncio.run(run_benchmark(sizes, num_messages=1000, iterations=3))
# Shutdown Ray
ray.shutdown()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment