Last active
July 20, 2025 19:53
-
-
Save nat-n/a92ab056872c6972c1c1cf50e02ddd89 to your computer and use it in GitHub Desktop.
A demo of managing subprocesses with asyncio and manipulating their output streams.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This script demonstrates how to run multiple subprocesses in Python using asyncio, | |
capturing their outputs, and processing them in real-time. It can be run with an | |
output file or with `-` to capture outputs directly in the console, and will then | |
create subprocesses that generate output asynchronously by calling itself with | |
task IDs. | |
""" | |
import asyncio | |
import random | |
import sys | |
import time | |
class TaskOrchestrator: | |
""" | |
A class to orchestrate tasks and manage their outputs. | |
""" | |
def __init__(self, output_path): | |
self.output_path = output_path | |
self.output_stream = ( | |
asyncio.subprocess.PIPE if output_path == "-" else open(output_path, "a") | |
) | |
self.subprocs= {} | |
self.return_codes = {} | |
async def run_tasks(self, num_tasks=5): | |
try: | |
await asyncio.gather( | |
*[ | |
asyncio.create_task(self._run_task(task_number)) | |
for task_number in range(num_tasks) | |
# for task_number, subproc in self.subprocs | |
] | |
) | |
except asyncio.CancelledError: | |
print("Cancelling tasks:") | |
self.kill_tasks() | |
async def _run_task(self, task_number): | |
""" | |
Process task outputs as a stream. | |
""" | |
task_name = f"Task {task_number}" | |
subproc = await asyncio.create_subprocess_exec( | |
# Run this script with a task number and -u flag to force unbuffered | |
# output | |
sys.executable, "-u", __file__, self.output_path, str(task_number), | |
stdout=self.output_stream | |
) | |
self.subprocs[task_name] = subproc | |
self.return_codes[task_name] = -1 | |
if subproc.stdout: | |
while line := await subproc.stdout.readline(): | |
print(f"{task_name} | {line.decode().rstrip()}") | |
await subproc.wait() | |
self.return_codes[task_name] = subproc.returncode | |
print(f"Task complete: {task_name}") | |
def kill_tasks(self): | |
""" | |
Kill all running tasks. | |
""" | |
for task_name, subproc in self.subprocs.items(): | |
if subproc.returncode is None: | |
print(f"Killing task {task_name}...") | |
subproc.kill() | |
def generate_output(task_id, num_lines=100): | |
for i in range(num_lines): | |
time.sleep(random.random()) | |
print(f"Output {i} from task {task_id}", file=sys.stdout) | |
async def main(): | |
if len(sys.argv) < 1: | |
print("Usage: python io_test.py <output_file> [task_id]") | |
print("Set output_file to '-' to capture and process outputs.") | |
sys.exit(1) | |
if len(sys.argv) > 2: | |
# task_id was provided, make some task output! | |
task_id = int(sys.argv[2]) | |
return generate_output(task_id) | |
orchestrator = TaskOrchestrator(output_path = sys.argv[1]) | |
await orchestrator.run_tasks() | |
print("Return codes:", orchestrator.return_codes) | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment