Created
April 2, 2020 04:29
-
-
Save NF1198/e355f97ad7fc42dd69c96b05e035f09a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# process_handler.py | |
# Copyright 2020 Nicholas Folse <[email protected]> | |
# | |
# This module defines a subprocess handler capable of managing | |
# multiple simultaneous subprocess.Popen calls. Results of each subprocess | |
# are processed in parallel. Each subprocess is injected with a line handler | |
# which is responsible for handling input from each call. Line handlers | |
# are called as line_handler(proc, line). If multiple handlers are injected for | |
# a given subprocess, the output of a handler is passed as the input to the next | |
# handler in the chain (with proc as the first argument). Upon termination of the | |
# stream, the handler chain will be called with None as a line argument. | |
# | |
# Refer to the included example for a possibile application. | |
# | |
# This module utlizes the select.epoll mechanism which is only supported on Linux/Unix | |
# | |
# Running this module as __main__ should yield something like the following: | |
# ('uname', u'Linux x86_64 x86_64 x86_64 GNU/Linux\n') | |
# ('ls /', "[u'bin', u'boot', u'dev', u'etc', u'home', ..., u'tmp', u'usr', u'var']") | |
# [0, 0] | |
# | |
from __future__ import print_function | |
import subprocess | |
import select | |
def chain_handlers(handler, *handlers): | |
if len(handlers) == 0: | |
return handler | |
else: | |
def _(proc, line): | |
result = handler(proc, line) | |
for h in handlers: | |
result = h(proc, result) | |
return result | |
return _ | |
class ProcessHandler: | |
def __init__(self): | |
self._epoll = None | |
self._handled_processes = None | |
def __enter__(self): | |
self._epoll = select.epoll() | |
self._handled_processes = {} | |
return self | |
def __exit__(self, exc_type, exc_value, traceback): | |
self._epoll.close() | |
self._epoll = None | |
def inject(self, proc, handler, *handlers): | |
proc_id = proc.stdout.fileno() | |
handler_chain = chain_handlers(handler, *handlers) | |
self._handled_processes[proc_id] = (proc, handler_chain) | |
self._epoll.register(proc_id, select.EPOLLIN | select.EPOLLHUP) | |
def poll(self, timeout=1): | |
finished_procs = [] | |
if len(self._handled_processes) == 0: | |
return [] | |
for fd, evt in self._epoll.poll(timeout): | |
proc, handler = self._handled_processes[fd] | |
if evt & select.EPOLLIN: | |
handler(proc, proc.stdout.readline()) | |
if evt & select.EPOLLHUP: | |
for line in proc.stdout: | |
handler(proc, line) | |
handler(proc, None) | |
finished_procs.append(proc) | |
self._epoll.unregister(fd) | |
del self._handled_processes[fd] | |
return finished_procs | |
@property | |
def done(self): | |
return len(self._handled_processes) == 0 | |
if __name__ == "__main__": | |
uname_proc = lambda: subprocess.Popen(["uname", "-a"], stdout=subprocess.PIPE) | |
ls_proc = lambda dir: subprocess.Popen(["ls", dir], stdout=subprocess.PIPE) | |
def printer(tag): | |
def _(proc, arg): | |
if arg: | |
print((tag, arg)) | |
return arg | |
return _ | |
def uname_handler(): | |
uname = [] | |
def _(proc, line): | |
if line: | |
uname.append(line.decode()) | |
return None | |
else: | |
return uname[0] | |
return _ | |
def ls_handler(): | |
items = [] | |
def _(proc, line): | |
if line: | |
items.append(line.decode().strip()) | |
return None | |
else: | |
return str(items) | |
return _ | |
def linux_tester(process_handler): | |
def _(proc, uname): | |
if uname and uname.startswith("Linux"): | |
process_handler.inject(ls_proc("/"), ls_handler(), printer("ls /")) | |
return uname | |
return _ | |
with ProcessHandler() as h: | |
h.inject(uname_proc(), uname_handler(), linux_tester(h), printer("uname")) | |
done = [] | |
while not h.done: | |
for finished_proc in h.poll(): | |
done.append(finished_proc) | |
print([p.poll() for p in done]) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment