Skip to content

Instantly share code, notes, and snippets.

@nat-n
Last active July 20, 2025 19:53
Show Gist options
  • Save nat-n/a92ab056872c6972c1c1cf50e02ddd89 to your computer and use it in GitHub Desktop.
Save nat-n/a92ab056872c6972c1c1cf50e02ddd89 to your computer and use it in GitHub Desktop.
A demo of managing subprocesses with asyncio and manipulating their output streams.
"""
This script demonstrates how to run multiple subprocesses in Python using asyncio,
capturing their outputs, and processing them in real-time. It can be run with an
output file or with `-` to capture outputs directly in the console, and will then
create subprocesses that generate output asynchronously by calling itself with
task IDs.
"""
import asyncio
import random
import sys
import time
class TaskOrchestrator:
"""
A class to orchestrate tasks and manage their outputs.
"""
def __init__(self, output_path):
self.output_path = output_path
self.output_stream = (
asyncio.subprocess.PIPE if output_path == "-" else open(output_path, "a")
)
self.subprocs= {}
self.return_codes = {}
async def run_tasks(self, num_tasks=5):
try:
await asyncio.gather(
*[
asyncio.create_task(self._run_task(task_number))
for task_number in range(num_tasks)
# for task_number, subproc in self.subprocs
]
)
except asyncio.CancelledError:
print("Cancelling tasks:")
self.kill_tasks()
async def _run_task(self, task_number):
"""
Process task outputs as a stream.
"""
task_name = f"Task {task_number}"
subproc = await asyncio.create_subprocess_exec(
# Run this script with a task number and -u flag to force unbuffered
# output
sys.executable, "-u", __file__, self.output_path, str(task_number),
stdout=self.output_stream
)
self.subprocs[task_name] = subproc
self.return_codes[task_name] = -1
if subproc.stdout:
while line := await subproc.stdout.readline():
print(f"{task_name} | {line.decode().rstrip()}")
await subproc.wait()
self.return_codes[task_name] = subproc.returncode
print(f"Task complete: {task_name}")
def kill_tasks(self):
"""
Kill all running tasks.
"""
for task_name, subproc in self.subprocs.items():
if subproc.returncode is None:
print(f"Killing task {task_name}...")
subproc.kill()
def generate_output(task_id, num_lines=100):
for i in range(num_lines):
time.sleep(random.random())
print(f"Output {i} from task {task_id}", file=sys.stdout)
async def main():
if len(sys.argv) < 1:
print("Usage: python io_test.py <output_file> [task_id]")
print("Set output_file to '-' to capture and process outputs.")
sys.exit(1)
if len(sys.argv) > 2:
# task_id was provided, make some task output!
task_id = int(sys.argv[2])
return generate_output(task_id)
orchestrator = TaskOrchestrator(output_path = sys.argv[1])
await orchestrator.run_tasks()
print("Return codes:", orchestrator.return_codes)
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment