|
#!/usr/bin/python3 |
|
|
|
""" |
|
Example demonstrating how to simultaneously read from multiple subprocesses |
|
|
|
Author: Nicholas Folse |
|
Date: 2020.03.21 |
|
|
|
Problem: |
|
- How do I simultaneously process output from multiple subprocesses using Python? |
|
- Can I read from multiple subprocesses in parallel using Python? |
|
|
|
Possible Solutions: |
|
- Create a thread for each subprocess, then gather results. This solution |
|
would require complex synchronization of any shared data structures. |
|
|
|
- Use asyncio. This solution would require Python 3.6+ and introduces the |
|
complexity of using asyncio, which seems like overkill for this problem |
|
|
|
- Use selectors. Selectors (Linux only) provide a mechanism to listen for |
|
events on multiple file descriptors with a timeout. |
|
|
|
Best Solution (Linux): Selectors |
|
This solution is based on selectors, specifically epoll selectors. The basic |
|
idea is to wait for EPOLL events on multiple input pipes, read the input as |
|
it is generated, then handle the lines appropriately. |
|
|
|
To do this efficiently, create implementation(s) of HandledProcess, then call |
|
handle_processes(processes, [timeout]), Where processes is an iterable of |
|
HandledProcess. The handle_processes function will setup epoll and |
|
handle incoming data by calling the .handle_line() method of your implementation(s) |
|
of HandledProcess. |
|
|
|
The example below depends on a file "foo.txt" that contains: |
|
Line 1 |
|
Line 2 |
|
Line 3 |
|
Line 4 |
|
|
|
Two implementations of HandledProcess are defined: SlowAWK and CAT. |
|
SlowAWK calls awk in a way that outputs lines with a delay. CAT simply |
|
outputs all lines as fast as possible. |
|
|
|
Three HandledProcess objects are created and passed to handle_processes. |
|
In this example, each line is simply printed to the screen as follows: |
|
|
|
('SlowAWK', 'a', 'line 1\n') |
|
('SlowAWK', 'b', 'line 1\n') |
|
('cat', 'c', 'line 1\n') |
|
('cat', 'c', 'line 2\n') |
|
('cat', 'c', 'line 3\n') |
|
('cat', 'c', 'line 4') |
|
('SlowAWK', 'b', 'line 2\n') |
|
('SlowAWK', 'b', 'line 3\n') |
|
('SlowAWK', 'a', 'line 2\n') |
|
('SlowAWK', 'b', 'line 4\n') |
|
('SlowAWK', 'a', 'line 3\n') |
|
('SlowAWK', 'a', 'line 4\n') |
|
[0, 0, 0] |
|
|
|
This shows that output of multiple subprocesses can be read in an interleaved |
|
non-blocking manner using only a single thread. |
|
|
|
""" |
|
import subprocess |
|
import select |
|
from abc import ABC, abstractmethod |
|
|
|
class HandledProcess(ABC): |
|
@abstractmethod |
|
def handle_line(self, line): |
|
return NotImplemented |
|
|
|
@property |
|
@abstractmethod |
|
def process(self): |
|
return NotImplemented |
|
|
|
@property |
|
def returncode(self): |
|
return self.process.returncode |
|
|
|
@property |
|
def stdout(self): |
|
return self.process.stdout |
|
|
|
@property |
|
def stdin(self): |
|
return self.process.stdin |
|
|
|
class SlowAwkProcess(HandledProcess): |
|
def __init__(self, key, filename, delay): |
|
self._key = key |
|
args = ["awk", "{{print $0; system(\"sleep {}\");}}".format(delay), filename] |
|
self._p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=None) |
|
|
|
@property |
|
def process(self): |
|
return self._p |
|
|
|
def handle_line(self, line): |
|
print(("SlowAWK", self._key, line.decode())) |
|
|
|
class CatProcess(HandledProcess): |
|
def __init__(self, key, filename): |
|
self._key = key |
|
args = ["cat", filename] |
|
self._p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=None) |
|
|
|
@property |
|
def process(self): |
|
return self._p |
|
|
|
def handle_line(self, line): |
|
print(("cat", self._key, line.decode())) |
|
|
|
def handle_processes(processes, timeout=1): |
|
""" |
|
Accepts input from multiple processes and handles the output, line-by-line |
|
|
|
inputs: |
|
processes: iterable of HandledProcess |
|
timeout: how long to wait for events on relevant file desciptors before checking for process completion |
|
|
|
returns: |
|
None |
|
""" |
|
fdmap = {p.stdout.name if p.stdout.name else p.stdout: p for p in processes} |
|
proc_list = [p.process for p in fdmap.values()] |
|
try: |
|
ep = select.epoll() |
|
for fd in fdmap.keys(): |
|
ep.register(fd, select.EPOLLIN | select.EPOLLHUP) |
|
while None in (p.poll() for p in proc_list): |
|
evts = ep.poll(timeout=timeout) |
|
for fd, evt in evts: |
|
if evt & select.EPOLLIN: |
|
proc = fdmap[fd] |
|
proc.handle_line(proc.stdout.readline()) |
|
if evt & select.EPOLLHUP: |
|
proc = fdmap[fd] |
|
for line in proc.stdout: |
|
proc.handle_line(line) |
|
ep.unregister(fd) |
|
finally: |
|
ep.close() |
|
|
|
def main(): |
|
p1 = SlowAwkProcess("a", "foo.txt", 0.02) |
|
p2 = SlowAwkProcess("b", "foo.txt", 0.01) |
|
p3 = CatProcess("c", "foo.txt") |
|
process_list = [p1, p2, p3] |
|
handle_processes(process_list, timeout=0.05) |
|
print([p.returncode for p in process_list]) |
|
|
|
if __name__ == "__main__": |
|
main() |