Skip to content

Instantly share code, notes, and snippets.

@qoda
Last active November 2, 2023 17:29
Show Gist options
  • Save qoda/53a3bb50e9c7503892c3c092780ca2e8 to your computer and use it in GitHub Desktop.
Save qoda/53a3bb50e9c7503892c3c092780ca2e8 to your computer and use it in GitHub Desktop.
Managed Multiprocessing
import logging
import multiprocessing
import sys
import time
logger = logging.getLogger(__name__)
multiprocessing.set_start_method('fork')
class ManagedMultiProcess:
"""
Managed multiprocessing with support for respawning failed workers.
Example:
```
import random
class HelloWorld(ManagedMultiProcess):
def command(self, *args, **kwargs):
# Introduces a TypeError to test respawning
while True:
random_number = random.choice(list(range(0, 20)) + [None])
print(f'Hello World [{int(random_number)}]')
time.sleep(0.5)
hello_world = HelloWorld(respawn=True)
hello_world.start()
```
"""
processes = []
def __init__(self, workers=4, respawn=False, monitor_interval=0.1):
self.workers = workers
self.respawn = respawn
self.monitor_interval = monitor_interval
def command(self, *args, **kwargs):
"""
Override this function with your own command.
"""
self.respawn = False
raise NotImplementedError('Main command not implimented')
def _start_process(self, worker_id, *args, **kwargs):
"""
Starts a single process and appends to the process list.
"""
process = multiprocessing.Process(
target=self.command, args=args, kwargs=kwargs
)
process.start()
self.processes[worker_id] = process
def _monitor_processes(self, *args, **kwargs):
"""
Monitors workers and respawns a worker if respawn option is enabled.
"""
while True:
alive_processes = all(p.is_alive() for p in self.processes)
if not alive_processes and not self.respawn:
sys.exit(1)
for worker_id, process in enumerate(self.processes):
if process is not None and not process.is_alive():
process.terminate()
logger.info(f'Worker {worker_id}: Restarting')
if self.respawn:
self._start_process(worker_id, *args, **kwargs)
time.sleep(self.monitor_interval)
def _handle(self, *args, **kwargs):
"""
Starts up the inital workers and appends them to the processes list.
"""
for worker_id in range(self.workers):
self.processes.append(None)
self._start_process(worker_id, *args, **kwargs)
self._monitor_processes(*args, **kwargs)
def start(self, *args, **kwargs):
"""
Main entry point. Starts up the handler and catches a keyboard interrupt in
order to terminate all processes.
"""
try:
self._handle(*args, **kwargs)
except KeyboardInterrupt:
for worker_id, process in enumerate(self.processes):
logger.warning(f'Exiting child process with id: {worker_id}')
process.terminate()
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment