Last active
July 17, 2022 16:46
-
-
Save icedraco/83028ecc2f9cd03274b002cdf453b3f5 to your computer and use it in GitHub Desktop.
Multiprocessing example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import multiprocessing as mp | |
def main(): | |
# how many processes should we spawn? | |
num_parallel_procs = 12 | |
# queue that will pass work to each worker | |
q = mp.Queue(num_parallel_procs) | |
# create/init process objects to spawn; provides our q to each worker | |
print(f"Creating {num_parallel_procs} workers...") | |
procs = [ | |
mp.Process(target=proc_do_stuff, name=f'worker-{i}', args=(q,), daemon=True) | |
for i | |
in range(num_parallel_procs) | |
] | |
# each Process must be started - do it now | |
[p.start() for p in procs] | |
# this will take a while; make sure we handle CTRL+C gracefully | |
try: | |
print("Workers started - sending workloads...") | |
for i in range(100): | |
# note: this call is blocking - it will pause the main process for | |
# as long as the queue remains full! KeyboardInterrupt will | |
# interrupt the paused state, see below. | |
q.put(f"Workload #{i}") | |
# send out X termination commands (one for each worker) | |
# so that when workers go through all the assignments, | |
# their next "assignment" would be to shut down. | |
print(f"Sending termination commands to {num_parallel_procs} workers...") | |
[q.put(None) for _ in procs] | |
except KeyboardInterrupt: | |
print("Main process caught CTRL+C: telling workers to wrap up earlier!") | |
# this will interrupt the loop above - not all workloads will be sent | |
finally: | |
# If we just caught CTRL+C, DON'T send shutdown command to workers! | |
# This is because each worker also gets a CTRL+C and will shut down | |
# before even seeing that command! | |
# This also means that you risk having the main process block again | |
# trying to send Nones into a full queue that hasn't been (and will never be) | |
# fully processed! | |
pass | |
print("Waiting for all workers to shut down...") | |
[p.join() for p in procs] | |
print("ALL DONE!") | |
return 0 | |
def proc_do_stuff(q_input): | |
""" | |
This function will be run by each process we spawn with mp.Process(...).start() | |
:param mp.Queue q_input: input queue through which the MainProcess sends us work | |
""" | |
my_name = mp.current_process().name | |
# capture CTRL+C from stdin | |
try: | |
print(f"{my_name}: Started; processing work...") | |
while True: | |
# get the next workload from the queue | |
# this call blocks (pauses the process) for as long as the queue | |
# is empty | |
workload = q_input.get() | |
# check if we are asked to shut down | |
if workload is None: | |
break | |
# process workload | |
print(f"{my_name}: Processing workload {workload}") | |
time.sleep(0.8) # let's say we're calculating something here | |
# done; we can dump the result to a file, or a q_output, if one | |
# was provided, and let a "sink" process handle all the outputs. | |
print(f"{my_name}: Done processing {workload}!") | |
except KeyboardInterrupt: | |
print(f"{my_name}: CAUGHT CTRL+C!") | |
finally: | |
print(f"{my_name}: Shutting down...") | |
if __name__ == '__main__': | |
raise SystemExit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment