| 
          #!/usr/bin/python3 | 
        
        
           | 
          
 | 
        
        
           | 
          """ | 
        
        
           | 
          Example demonstrating how to simultaneously read from multiple subprocesses | 
        
        
           | 
          
 | 
        
        
           | 
          Author: Nicholas Folse | 
        
        
           | 
          Date:   2020.03.21 | 
        
        
           | 
          
 | 
        
        
           | 
          Problem: | 
        
        
           | 
          - How do I simultaneously process output from multiple subprocesses using Python? | 
        
        
           | 
          - Can I read from multiple subprocesses in parallel using Python? | 
        
        
           | 
          
 | 
        
        
           | 
          Possible Solutions: | 
        
        
           | 
          - Create a thread for each subprocess, then gather results. This solution | 
        
        
           | 
            would require complex synchronization of any shared data structures. | 
        
        
           | 
          
 | 
        
        
           | 
          - Use asyncio. This solution would require Python 3.6+ and introduces the  | 
        
        
           | 
            complexity of using asyncio, which seems like overkill for this problem | 
        
        
           | 
          
 | 
        
        
           | 
          - Use selectors. Selectors (Linux only) provide a mechanism to listen for  | 
        
        
           | 
            events on multiple file descriptors with a timeout.  | 
        
        
           | 
          
 | 
        
        
           | 
          Best Solution (Linux): Selectors | 
        
        
           | 
          This solution is based on selectors, specifically epoll selectors. The basic | 
        
        
           | 
          idea is to wait for EPOLL events on multiple input pipes, read the input as | 
        
        
           | 
          it is generated, then handle the lines appropriately. | 
        
        
           | 
          
 | 
        
        
           | 
          To do this efficiently, create implementation(s) of HandledProcess, then call | 
        
        
           | 
          handle_processes(processes, [timeout]), Where processes is an iterable of  | 
        
        
           | 
          HandledProcess. The handle_processes function will setup epoll and  | 
        
        
           | 
          handle incoming data by calling the .handle_line() method of your implementation(s) | 
        
        
           | 
          of HandledProcess. | 
        
        
           | 
          
 | 
        
        
           | 
          The example below depends on a file "foo.txt" that contains: | 
        
        
           | 
          Line 1 | 
        
        
           | 
          Line 2 | 
        
        
           | 
          Line 3 | 
        
        
           | 
          Line 4 | 
        
        
           | 
          
 | 
        
        
           | 
          Two implementations of HandledProcess are defined: SlowAWK and CAT. | 
        
        
           | 
          SlowAWK calls awk in a way that outputs lines with a delay. CAT simply | 
        
        
           | 
          outputs all lines as fast as possible. | 
        
        
           | 
          
 | 
        
        
           | 
          Three HandledProcess objects are created and passed to handle_processes. | 
        
        
           | 
          In this example, each line is simply printed to the screen as follows: | 
        
        
           | 
          
 | 
        
        
           | 
          ('SlowAWK', 'a', 'line 1\n') | 
        
        
           | 
          ('SlowAWK', 'b', 'line 1\n') | 
        
        
           | 
          ('cat', 'c', 'line 1\n') | 
        
        
           | 
          ('cat', 'c', 'line 2\n') | 
        
        
           | 
          ('cat', 'c', 'line 3\n') | 
        
        
           | 
          ('cat', 'c', 'line 4') | 
        
        
           | 
          ('SlowAWK', 'b', 'line 2\n') | 
        
        
           | 
          ('SlowAWK', 'b', 'line 3\n') | 
        
        
           | 
          ('SlowAWK', 'a', 'line 2\n') | 
        
        
           | 
          ('SlowAWK', 'b', 'line 4\n') | 
        
        
           | 
          ('SlowAWK', 'a', 'line 3\n') | 
        
        
           | 
          ('SlowAWK', 'a', 'line 4\n') | 
        
        
           | 
          [0, 0, 0] | 
        
        
           | 
          
 | 
        
        
           | 
          This shows that output of multiple subprocesses can be read in an interleaved | 
        
        
           | 
          non-blocking manner using only a single thread. | 
        
        
           | 
          
 | 
        
        
           | 
          """ | 
        
        
           | 
          import subprocess | 
        
        
           | 
          import select | 
        
        
           | 
          from abc import ABC, abstractmethod | 
        
        
           | 
          
 | 
        
        
           | 
          class HandledProcess(ABC): | 
        
        
           | 
              @abstractmethod | 
        
        
           | 
              def handle_line(self, line): | 
        
        
           | 
                  return NotImplemented | 
        
        
           | 
          
 | 
        
        
           | 
              @property | 
        
        
           | 
              @abstractmethod | 
        
        
           | 
              def process(self): | 
        
        
           | 
                  return NotImplemented | 
        
        
           | 
          
 | 
        
        
           | 
              @property | 
        
        
           | 
              def returncode(self): | 
        
        
           | 
                  return self.process.returncode | 
        
        
           | 
          
 | 
        
        
           | 
              @property | 
        
        
           | 
              def stdout(self): | 
        
        
           | 
                  return self.process.stdout | 
        
        
           | 
          
 | 
        
        
           | 
              @property | 
        
        
           | 
              def stdin(self): | 
        
        
           | 
                  return self.process.stdin | 
        
        
           | 
          
 | 
        
        
           | 
          class SlowAwkProcess(HandledProcess): | 
        
        
           | 
              def __init__(self, key, filename, delay): | 
        
        
           | 
                  self._key = key | 
        
        
           | 
                  args = ["awk", "{{print $0; system(\"sleep {}\");}}".format(delay), filename] | 
        
        
           | 
                  self._p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=None) | 
        
        
           | 
          
 | 
        
        
           | 
              @property | 
        
        
           | 
              def process(self): | 
        
        
           | 
                  return self._p | 
        
        
           | 
          
 | 
        
        
           | 
              def handle_line(self, line): | 
        
        
           | 
                  print(("SlowAWK", self._key, line.decode())) | 
        
        
           | 
          
 | 
        
        
           | 
          class CatProcess(HandledProcess): | 
        
        
           | 
              def __init__(self, key, filename): | 
        
        
           | 
                  self._key = key | 
        
        
           | 
                  args = ["cat", filename] | 
        
        
           | 
                  self._p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=None) | 
        
        
           | 
               | 
        
        
           | 
              @property | 
        
        
           | 
              def process(self): | 
        
        
           | 
                  return self._p | 
        
        
           | 
               | 
        
        
           | 
              def handle_line(self, line): | 
        
        
           | 
                  print(("cat", self._key, line.decode())) | 
        
        
           | 
          
 | 
        
        
           | 
          def handle_processes(processes, timeout=1): | 
        
        
           | 
              """ | 
        
        
           | 
              Accepts input from multiple processes and handles the output, line-by-line | 
        
        
           | 
          
 | 
        
        
           | 
              inputs: | 
        
        
           | 
              processes: iterable of HandledProcess | 
        
        
           | 
              timeout:   how long to wait for events on relevant file desciptors before checking for process completion | 
        
        
           | 
          
 | 
        
        
           | 
              returns: | 
        
        
           | 
              None | 
        
        
           | 
              """ | 
        
        
           | 
              fdmap = {p.stdout.name if p.stdout.name else p.stdout: p for p in processes} | 
        
        
           | 
              proc_list = [p.process for p in fdmap.values()] | 
        
        
           | 
              try: | 
        
        
           | 
                  ep = select.epoll() | 
        
        
           | 
                  for fd in fdmap.keys(): | 
        
        
           | 
                      ep.register(fd, select.EPOLLIN | select.EPOLLHUP) | 
        
        
           | 
                  while None in (p.poll() for p in proc_list): | 
        
        
           | 
                      evts = ep.poll(timeout=timeout) | 
        
        
           | 
                      for fd, evt in evts: | 
        
        
           | 
                          if evt & select.EPOLLIN: | 
        
        
           | 
                              proc = fdmap[fd] | 
        
        
           | 
                              proc.handle_line(proc.stdout.readline()) | 
        
        
           | 
                          if evt & select.EPOLLHUP: | 
        
        
           | 
                              proc = fdmap[fd] | 
        
        
           | 
                              for line in proc.stdout: | 
        
        
           | 
                                  proc.handle_line(line) | 
        
        
           | 
                              ep.unregister(fd) | 
        
        
           | 
              finally: | 
        
        
           | 
                  ep.close() | 
        
        
           | 
                           | 
        
        
           | 
          def main(): | 
        
        
           | 
              p1 = SlowAwkProcess("a", "foo.txt", 0.02) | 
        
        
           | 
              p2 = SlowAwkProcess("b", "foo.txt", 0.01) | 
        
        
           | 
              p3 = CatProcess("c", "foo.txt") | 
        
        
           | 
              process_list = [p1, p2, p3] | 
        
        
           | 
              handle_processes(process_list, timeout=0.05) | 
        
        
           | 
              print([p.returncode for p in process_list]) | 
        
        
           | 
          
 | 
        
        
           | 
          if __name__ == "__main__": | 
        
        
           | 
              main() |