Skip to content

Instantly share code, notes, and snippets.

@gpshead
Last active November 20, 2024 16:15
Show Gist options
  • Save gpshead/85671322160fe3940088d97037237cd1 to your computer and use it in GitHub Desktop.
Save gpshead/85671322160fe3940088d97037237cd1 to your computer and use it in GitHub Desktop.
Microbenchmark of the Python multiprocessing spawn rate.
"""Microbenchmark of multiprocessing process spawn rate.
Optional command line parameters:
The first is the number of repeated runs in order to compute a StdDev
as an indicator of reliability on the Procs/sec result.
The second is the multiprocessing start method to use.
The third overrides the max active processes.
Authors: Claude 3.5 Sonnet (new), with edits and tweaks by Gregory P. Smith.
License: Apache 2.0
"""
import multiprocessing as mp
import sys
import time
from statistics import mean, stdev
from multiprocessing.connection import wait
def run_benchmark(total_processes: int, max_active: int) -> tuple[float, float]:
"""
Benchmark creating and joining processes using wait() to handle processes as they complete
Args:
total_processes: Total number of processes to create
max_active: Number of processes to let run at a time.
Returns:
processes_per_second: Number of processes created and joined per second
total_time: Total time taken
"""
active_processes = {} # sentinel -> process mapping
processes_started = 0
start_time = time.perf_counter()
# Initial batch up to max_active
while len(active_processes) < max_active and processes_started < total_processes:
p = mp.Process()
p.start()
active_processes[p.sentinel] = p
processes_started += 1
# As processes finish, start new ones if needed
while processes_started < total_processes:
# Wait for any process to finish
ready_sentinels = wait(active_processes.keys())
# Handle all finished processes
for sentinel in ready_sentinels:
finished_process = active_processes.pop(sentinel)
finished_process.join() # Clean up the finished process
if processes_started < total_processes:
# Start a new process to replace it
p = mp.Process()
p.start()
active_processes[p.sentinel] = p
processes_started += 1
# Wait for remaining processes
while active_processes:
ready_sentinels = wait(active_processes.keys())
for sentinel in ready_sentinels:
finished_process = active_processes.pop(sentinel)
finished_process.join()
total_time = time.perf_counter() - start_time
processes_per_second = total_processes / total_time
return processes_per_second, total_time
def main():
PROCESS_COUNTS = [32, 128, 384, 1024, 2048, 5120]
num_runs = int(sys.argv[1]) if len(sys.argv) > 1 else 5
max_active = (mp.cpu_count() // 2 - 1) if mp.cpu_count() > 4 else mp.cpu_count() - 1
if len(sys.argv) > 2:
mp.set_start_method(sys.argv[2])
if len(sys.argv) > 3:
max_active = int(sys.argv[3])
print(f"Process Creation Microbenchmark (max {max_active} active processes) ({num_runs} iterations)")
print(f"multiprocessing start method: {mp.get_start_method()}")
print(f"{sys.version=}")
print("-" * 80)
print(f"{'Total':<6} {'Procs/sec':>11} {'Time (s)':>10} {'StdDev':>10}")
print("-" * 80)
for num_processes in PROCESS_COUNTS:
rate_results = []
time_results = []
for _ in range(num_runs):
rate, total_time = run_benchmark(num_processes, max_active=max_active)
rate_results.append(rate)
time_results.append(total_time)
avg_rate = mean(rate_results)
avg_time = mean(time_results)
rate_stddev = stdev(rate_results) if len(rate_results) > 1 else 0
print(f"{num_processes:<6} {avg_rate:>11,.2f}"
f"{avg_time:>11.3f} {rate_stddev:>10.2f}")
if avg_rate < num_processes // 2:
break # The next count would take a long time; little value.
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment