Created
December 5, 2018 20:24
-
-
Save tino/89891af0c336d8b27fa80a031815e285 to your computer and use it in GitHub Desktop.
Master with worker processes in Python asyncio
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3.6 | |
# or higher | |
import asyncio | |
import random | |
import signal | |
import subprocess | |
import sys | |
NUM_WORKERS = 2 | |
_stop = False | |
_workers = [] | |
_worker_log_readers = [] | |
async def start_workers(): | |
global _worker, _worker_log_reader | |
for i in range(1, NUM_WORKERS + 1): | |
worker = await asyncio.create_subprocess_exec( | |
"python", | |
"-u", | |
"test.py", | |
"worker", | |
str(i), | |
stdout=subprocess.PIPE, | |
stderr=subprocess.STDOUT, | |
) | |
_workers.append(worker) | |
_worker_log_readers.append(loop.create_task(stdout_reader(worker.stdout, i))) | |
async def worker(): | |
global _stop | |
print("worker started") | |
while True: | |
await asyncio.sleep(2) | |
print("heartbeat") | |
if _stop and random.random() > 0.3: # Perhaps it hangs! | |
break | |
print("π") | |
async def master(loop): | |
global _stop | |
await start_workers() | |
while True: | |
await asyncio.sleep(0.2) | |
if _stop: | |
await stop_workers(loop) | |
break | |
print("π") | |
async def stop_workers(loop): | |
global _workers, _worker_log_readers | |
async def stop_worker(i): | |
print("Shutting down worker... ") | |
_workers[i].terminate() | |
try: | |
await asyncio.wait_for(_workers[i].wait(), timeout=3) | |
except asyncio.TimeoutError: | |
print(f"Worker {i + 1} didn't terminate, sending kill... π") | |
_workers[i].kill() | |
_worker_log_readers[i].cancel() | |
await asyncio.gather(*[stop_worker(i) for i in range(NUM_WORKERS)]) | |
loop.stop() | |
print("done.") | |
async def stdout_reader(stdout, i): | |
while True: | |
line = await stdout.readline() | |
if not line: | |
print(f"worker {i} gone") | |
break | |
print(f"[worker-{i}]: {line.decode('utf-8')}", end="") | |
def stop(signum, stack,): | |
global _stop | |
print(f"Recieved {signum}, shutting down") | |
_stop = True | |
def noop(signum, stack): | |
print(f"Received {signum}, ignoring") | |
if __name__ == "__main__": | |
# Run `python main.py` to start the master. Ctrl-C to shut down gracefully | |
loop = asyncio.get_event_loop() | |
try: | |
if len(sys.argv) == 1: | |
print("starting master") | |
signal.signal(signal.SIGINT, stop) | |
signal.signal(signal.SIGTERM, stop) | |
loop.create_task(master(loop)) | |
loop.run_forever() | |
else: | |
print("starting worker") | |
# Ignoring SIGING (Ctrl-C) is necessary, as the worker will receive | |
# the signal too when the master is ctrl-c'ed. It will then die | |
# before proper shutdown. | |
signal.signal(signal.SIGINT, noop) | |
signal.signal(signal.SIGTERM, stop) | |
loop.run_until_complete(worker()) | |
finally: | |
loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment