Last active
November 20, 2024 16:15
-
-
Save gpshead/85671322160fe3940088d97037237cd1 to your computer and use it in GitHub Desktop.
Microbenchmark of the Python multiprocessing spawn rate.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Microbenchmark of multiprocessing process spawn rate. | |
Optional command line parameters: | |
The first is the number of repeated runs in order to compute a StdDev | |
as an indicator of reliability on the Procs/sec result. | |
The second is the multiprocessing start method to use. | |
The third overrides the max active processes. | |
Authors: Claude 3.5 Sonnet (new), with edits and tweaks by Gregory P. Smith. | |
License: Apache 2.0 | |
""" | |
import multiprocessing as mp | |
import sys | |
import time | |
from statistics import mean, stdev | |
from multiprocessing.connection import wait | |
def run_benchmark(total_processes: int, max_active: int) -> tuple[float, float]: | |
""" | |
Benchmark creating and joining processes using wait() to handle processes as they complete | |
Args: | |
total_processes: Total number of processes to create | |
max_active: Number of processes to let run at a time. | |
Returns: | |
processes_per_second: Number of processes created and joined per second | |
total_time: Total time taken | |
""" | |
active_processes = {} # sentinel -> process mapping | |
processes_started = 0 | |
start_time = time.perf_counter() | |
# Initial batch up to max_active | |
while len(active_processes) < max_active and processes_started < total_processes: | |
p = mp.Process() | |
p.start() | |
active_processes[p.sentinel] = p | |
processes_started += 1 | |
# As processes finish, start new ones if needed | |
while processes_started < total_processes: | |
# Wait for any process to finish | |
ready_sentinels = wait(active_processes.keys()) | |
# Handle all finished processes | |
for sentinel in ready_sentinels: | |
finished_process = active_processes.pop(sentinel) | |
finished_process.join() # Clean up the finished process | |
if processes_started < total_processes: | |
# Start a new process to replace it | |
p = mp.Process() | |
p.start() | |
active_processes[p.sentinel] = p | |
processes_started += 1 | |
# Wait for remaining processes | |
while active_processes: | |
ready_sentinels = wait(active_processes.keys()) | |
for sentinel in ready_sentinels: | |
finished_process = active_processes.pop(sentinel) | |
finished_process.join() | |
total_time = time.perf_counter() - start_time | |
processes_per_second = total_processes / total_time | |
return processes_per_second, total_time | |
def main(): | |
PROCESS_COUNTS = [32, 128, 384, 1024, 2048, 5120] | |
num_runs = int(sys.argv[1]) if len(sys.argv) > 1 else 5 | |
max_active = (mp.cpu_count() // 2 - 1) if mp.cpu_count() > 4 else mp.cpu_count() - 1 | |
if len(sys.argv) > 2: | |
mp.set_start_method(sys.argv[2]) | |
if len(sys.argv) > 3: | |
max_active = int(sys.argv[3]) | |
print(f"Process Creation Microbenchmark (max {max_active} active processes) ({num_runs} iterations)") | |
print(f"multiprocessing start method: {mp.get_start_method()}") | |
print(f"{sys.version=}") | |
print("-" * 80) | |
print(f"{'Total':<6} {'Procs/sec':>11} {'Time (s)':>10} {'StdDev':>10}") | |
print("-" * 80) | |
for num_processes in PROCESS_COUNTS: | |
rate_results = [] | |
time_results = [] | |
for _ in range(num_runs): | |
rate, total_time = run_benchmark(num_processes, max_active=max_active) | |
rate_results.append(rate) | |
time_results.append(total_time) | |
avg_rate = mean(rate_results) | |
avg_time = mean(time_results) | |
rate_stddev = stdev(rate_results) if len(rate_results) > 1 else 0 | |
print(f"{num_processes:<6} {avg_rate:>11,.2f}" | |
f"{avg_time:>11.3f} {rate_stddev:>10.2f}") | |
if avg_rate < num_processes // 2: | |
break # The next count would take a long time; little value. | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment