Created
May 29, 2025 23:56
-
-
Save btakita/5df3c6308ad058a003eece54b25076c9 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
2025-05-27 run on Dell XPS15 2023 | |
Ray version: 2.46.0 | |
Python Multiprocessing available: fork | |
Benchmarking with 1,000 messages per iteration | |
Warmup: 1,000 messages before each test | |
Size (bytes) | Protocol | Msgs/sec | MB/sec | |
-------------------------------------------------------------- | |
1,024 | MP_QUEUE | 19,601.62 | 19.14 | |
1,024 | RAY_QUEUE | 702.01 | 0.69 | |
1,024 | RAY_QUEUE_NO_AWAIT | 6,104.63 | 5.96 | |
-------------------------------------------------------------- | |
10,240 | MP_QUEUE | 2,210.08 | 21.58 | |
10,240 | RAY_QUEUE | 542.75 | 5.30 | |
10,240 | RAY_QUEUE_NO_AWAIT | 1,935.86 | 18.90 | |
-------------------------------------------------------------- | |
102,400 | MP_QUEUE | 222.01 | 21.68 | |
102,400 | RAY_QUEUE | 149.84 | 14.63 | |
102,400 | RAY_QUEUE_NO_AWAIT | 211.62 | 20.67 | |
-------------------------------------------------------------- | |
""" | |
import asyncio | |
import multiprocessing as mp | |
import signal | |
import time | |
from dataclasses import dataclass | |
from enum import Enum | |
from typing import List, Tuple | |
import ray.util.queue | |
class Protocol(Enum): | |
MP_QUEUE = "MP_QUEUE" | |
RAY_QUEUE = "RAY_QUEUE" | |
RAY_QUEUE_NO_AWAIT = "RAY_QUEUE_NO_AWAIT" | |
@dataclass | |
class BenchmarkConfig: | |
message_size: int | |
num_messages: int | |
iterations: int = 3 | |
warmup_messages: int = 1000 | |
def mp_queue_worker(queue: mp.Queue, ready_flag: mp.Event): | |
"""Worker process for multiprocessing Queue""" | |
ready_flag.set() | |
while True: | |
message = queue.get() | |
if message == b"STOP": | |
break | |
def benchmark_mp_queue(config: BenchmarkConfig) -> Tuple[float, float]: | |
"""Run multiprocessing Queue benchmark""" | |
queue = mp.Queue(maxsize=0) # Unlimited queue size | |
ready_flag = mp.Event() | |
worker = mp.Process(target=mp_queue_worker, args=(queue, ready_flag)) | |
worker.start() | |
if not ready_flag.wait(timeout=5.0): | |
raise RuntimeError("Worker failed to start for multiprocessing Queue") | |
try: | |
# Warmup phase | |
for _ in range(config.warmup_messages): | |
data = bytes([x % 256 for x in range(config.message_size)]) | |
queue.put(data) | |
time.sleep(0.1) | |
# Benchmark phase | |
start = time.perf_counter() | |
for _ in range(config.num_messages): | |
data = bytes([x % 256 for x in range(config.message_size)]) | |
queue.put(data) | |
elapsed = time.perf_counter() - start | |
# Stop worker | |
queue.put(b"STOP") | |
worker.join(timeout=5) | |
if worker.is_alive(): | |
worker.terminate() | |
worker.join() | |
return elapsed, config.num_messages / elapsed | |
except Exception as e: | |
print(f"Error in MP_QUEUE benchmark: {str(e)}") | |
if worker.is_alive(): | |
worker.terminate() | |
raise | |
@ray.remote | |
class RayQueueWorker: | |
def __init__(self, queue: ray.util.queue.Queue): | |
self.queue = queue | |
async def run(self): | |
while True: | |
message = await self.queue.actor.get.remote() | |
if message == b"STOP": | |
break | |
async def benchmark_ray_queue(config: BenchmarkConfig, no_await: bool = False) -> Tuple[float, float]: | |
"""Run Ray Queue benchmark""" | |
queue = ray.util.queue.Queue(maxsize=0) | |
# Start worker | |
worker = RayQueueWorker.remote(queue) | |
worker_task = worker.run.remote() | |
try: | |
# Warmup phase | |
for _ in range(config.warmup_messages): | |
# Create new data for each message to avoid caching | |
data = bytes([x % 256 for x in range(config.message_size)]) | |
if no_await: | |
queue.actor.put_nowait.remote(data) | |
else: | |
await queue.actor.put_nowait.remote(data) | |
await asyncio.sleep(0.1) | |
# Benchmark phase | |
start = time.perf_counter() | |
for _ in range(config.num_messages): | |
# Create new data for each message to avoid caching | |
data = bytes([x % 256 for x in range(config.message_size)]) | |
if no_await: | |
queue.actor.put_nowait.remote(data) | |
else: | |
await queue.actor.put_nowait.remote(data) | |
elapsed = time.perf_counter() - start | |
# Stop worker | |
queue.actor.put_nowait.remote(b"STOP") | |
await worker_task | |
return elapsed, config.num_messages / elapsed | |
except Exception as e: | |
print(f"Error in RAY_QUEUE benchmark: {str(e)}") | |
raise | |
async def run_single_benchmark( | |
protocol: Protocol, config: BenchmarkConfig | |
) -> Tuple[float, float]: | |
"""Run benchmark for a single protocol with multiple iterations""" | |
results = [] | |
for i in range(config.iterations): | |
if protocol == Protocol.MP_QUEUE: | |
elapsed, msgs_per_sec = benchmark_mp_queue(config) | |
elif protocol == Protocol.RAY_QUEUE: | |
elapsed, msgs_per_sec = await benchmark_ray_queue(config, False) | |
elif protocol == Protocol.RAY_QUEUE_NO_AWAIT: | |
elapsed, msgs_per_sec = await benchmark_ray_queue(config, True) | |
else: | |
raise ValueError(f"Unknown protocol: {protocol}") | |
results.append(msgs_per_sec) | |
# Short pause between iterations | |
await asyncio.sleep(0.1) | |
# Calculate average messages per second | |
avg_msgs_per_sec = sum(results) / len(results) | |
# Calculate MB/s | |
mb_per_sec = (avg_msgs_per_sec * config.message_size) / (1024 * 1024) | |
return avg_msgs_per_sec, mb_per_sec | |
async def run_benchmark( | |
sizes: List[int], num_messages: int = 1000, iterations: int = 3 | |
): | |
"""Run benchmarks for all protocols and message sizes""" | |
config = BenchmarkConfig( | |
message_size=0, num_messages=num_messages, iterations=iterations | |
) | |
print(f"\nBenchmarking with {num_messages:,} messages per iteration") | |
print(f"Warmup: {config.warmup_messages:,} messages before each test\n") | |
print( | |
f"{'Size (bytes)':>12} | {'Protocol':<18} | {'Msgs/sec':>12} | {'MB/sec':>10}" | |
) | |
print("-" * 62) | |
try: | |
# Try to set CPU affinity | |
import psutil | |
p = psutil.Process() | |
p.cpu_affinity([0]) | |
except (ImportError, AttributeError): | |
pass | |
for size in sizes: | |
config.message_size = size | |
for protocol in Protocol: | |
try: | |
avg_msgs, mbps = await run_single_benchmark(protocol, config) | |
print( | |
f"{size:>12,} | {protocol.value:<18} | {avg_msgs:>12,.2f} | {mbps:>10,.2f}" | |
) | |
except Exception as e: | |
print(f"Error running {protocol.value} benchmark: {str(e)}") | |
print("-" * 62) | |
if __name__ == "__main__": | |
signal.signal(signal.SIGINT, signal.SIG_DFL) | |
# Initialize Ray | |
ray.init() | |
print(f"Ray version: {ray.__version__}") | |
print(f"Python Multiprocessing available: {mp.get_start_method()}") | |
sizes = [1024, 10240, 102400] # 1KB to 100KB | |
asyncio.run(run_benchmark(sizes, num_messages=1000, iterations=3)) | |
# Shutdown Ray | |
ray.shutdown() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment