Last active
November 2, 2023 17:29
-
-
Save qoda/53a3bb50e9c7503892c3c092780ca2e8 to your computer and use it in GitHub Desktop.
Managed Multiprocessing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import multiprocessing | |
import sys | |
import time | |
logger = logging.getLogger(__name__) | |
multiprocessing.set_start_method('fork') | |
class ManagedMultiProcess: | |
""" | |
Managed multiprocessing with support for respawning failed workers. | |
Example: | |
``` | |
import random | |
class HelloWorld(ManagedMultiProcess): | |
def command(self, *args, **kwargs): | |
# Introduces a TypeError to test respawning | |
while True: | |
random_number = random.choice(list(range(0, 20)) + [None]) | |
print(f'Hello World [{int(random_number)}]') | |
time.sleep(0.5) | |
hello_world = HelloWorld(respawn=True) | |
hello_world.start() | |
``` | |
""" | |
processes = [] | |
def __init__(self, workers=4, respawn=False, monitor_interval=0.1): | |
self.workers = workers | |
self.respawn = respawn | |
self.monitor_interval = monitor_interval | |
def command(self, *args, **kwargs): | |
""" | |
Override this function with your own command. | |
""" | |
self.respawn = False | |
raise NotImplementedError('Main command not implimented') | |
def _start_process(self, worker_id, *args, **kwargs): | |
""" | |
Starts a single process and appends to the process list. | |
""" | |
process = multiprocessing.Process( | |
target=self.command, args=args, kwargs=kwargs | |
) | |
process.start() | |
self.processes[worker_id] = process | |
def _monitor_processes(self, *args, **kwargs): | |
""" | |
Monitors workers and respawns a worker if respawn option is enabled. | |
""" | |
while True: | |
alive_processes = all(p.is_alive() for p in self.processes) | |
if not alive_processes and not self.respawn: | |
sys.exit(1) | |
for worker_id, process in enumerate(self.processes): | |
if process is not None and not process.is_alive(): | |
process.terminate() | |
logger.info(f'Worker {worker_id}: Restarting') | |
if self.respawn: | |
self._start_process(worker_id, *args, **kwargs) | |
time.sleep(self.monitor_interval) | |
def _handle(self, *args, **kwargs): | |
""" | |
Starts up the inital workers and appends them to the processes list. | |
""" | |
for worker_id in range(self.workers): | |
self.processes.append(None) | |
self._start_process(worker_id, *args, **kwargs) | |
self._monitor_processes(*args, **kwargs) | |
def start(self, *args, **kwargs): | |
""" | |
Main entry point. Starts up the handler and catches a keyboard interrupt in | |
order to terminate all processes. | |
""" | |
try: | |
self._handle(*args, **kwargs) | |
except KeyboardInterrupt: | |
for worker_id, process in enumerate(self.processes): | |
logger.warning(f'Exiting child process with id: {worker_id}') | |
process.terminate() | |
sys.exit(1) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment