Last active
April 30, 2021 06:52
-
-
Save kakarukeys/555110c3ef92280e887b87a817663297 to your computer and use it in GitHub Desktop.
python multiprocessing, clean up child processes upon shutdown
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import random | |
from signal import signal, SIGINT, SIGTERM | |
from multiprocessing import Process, Event | |
""" | |
There are two examples: | |
(1) with graceful shutdown | |
(2) without graceful shutdown | |
Use 2 only when the child process is not possible of losing data, or it's okay to lose data, or the code is too difficult to change. | |
(In the last scenario, you can only shut down the application during off hours when no data is being processed.) | |
Otherwise use 1 (best practice). | |
For each project, make a decision whether do 1 or 2. The code you need to add is marked with a comment "(ADD CODE)" as below. | |
| need to change child process code? | child process daemon=? | | |
----------------------+------------------------------------+------------------------+ | |
w graceful shutdown | yes | any value | | |
w/o graceful shutdown | no | must be True | | |
""" | |
def child_task_w_graceful_shutdown(shutdown): | |
# (ADD CODE) prevent KeyboardInterrupt exception | |
signal(SIGINT, lambda signalnum, frame: None) | |
while True: | |
# simulate the running task | |
time.sleep(10) | |
dice = random.randint(1, 10) | |
if dice == 1: | |
print("child process crashed") | |
raise ValueError | |
elif dice == 2: | |
print("child process completed") | |
break | |
# (ADD CODE) handle shutdown event at a time when there is no more unprocessed data | |
if shutdown.is_set(): | |
print("shutting down...") | |
break | |
def child_task(): | |
while True: | |
# simulate the running task | |
time.sleep(10) | |
dice = random.randint(1, 10) | |
if dice == 1: | |
print("child process crashed") | |
raise ValueError | |
elif dice == 2: | |
print("child process completed") | |
break | |
if __name__ == "__main__": | |
# (ADD CODE) handle signals for both cases | |
shutdown = Event() | |
for num in [SIGINT, SIGTERM]: | |
signal(num, lambda signalnum, frame: shutdown.set()) | |
# Example 1: with graceful shutdown | |
# (ADD CODE) pass shutdown event as argument into all child processes | |
processes = [Process(target=child_task_w_graceful_shutdown, name="dice thrower", kwargs={"shutdown": shutdown}) for _ in range(20)] | |
[p.start() for p in processes] | |
[p.join() for p in processes] | |
[p.close() for p in processes] | |
print("main bye") | |
# Example 2: without graceful shutdown | |
# (ADD CODE) pass daemon=True | |
processes = [Process(target=child_task, name="dice thrower", daemon=True) for _ in range(20)] | |
[p.start() for p in processes] | |
# (ADD CODE) replace join() and close() by a custom while loop | |
while any(p.is_alive() for p in processes) and not shutdown.is_set(): | |
time.sleep(3) | |
print("main bye") | |
""" | |
daemon when main program exits normally SIGINT SIGTERM | |
False won't exit even without join() kill child processes won't kill child processes | |
True kill child processes kill child processes won't kill child processes | |
""" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment