Created
February 18, 2018 09:04
-
-
Save taskie/c14829eabc2be4ef07ef2fa652c6abcd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io | |
import subprocess | |
import functools | |
import copy | |
import os | |
import sys | |
from collections import ChainMap | |
__version__ = '0.0.1' | |
_debug = False | |
def get_last_run_result(run_result): | |
last_run_result = run_result | |
while isinstance(last_run_result.children, list): | |
if len(last_run_result.children): | |
last_run_result = last_run_result.children[-1] | |
else: | |
break | |
return last_run_result | |
class RunResult(object): | |
def __init__(self, proc, popen, children=None): | |
self.proc = proc | |
self.popen = popen | |
self.children = [] if children is None else children | |
self.completed = False | |
self.stdout_data = None | |
self.stderr_data = None | |
def __repr__(self): | |
return "RunResult(proc={}, popen={}, len(children)={}, completed={})".format(self.proc, self.popen, len(self.children), self.completed) | |
class Runner(object): | |
def __init__(self, ctx): | |
self.ctx = ctx | |
def run(self, proc): | |
return getattr(self, "_run_" + proc.__class__.__name__)(proc) | |
def _run_Identity(self, identity): | |
child = self.ctx.run(identity.proc) | |
return RunResult(identity, None, [child]) | |
def _run_System(self, system): | |
popen = subprocess.Popen(system.args, env=self.ctx.env, **self.ctx.popen_keywords) | |
return RunResult(system, popen) | |
def _run_Serial(self, serial): | |
children = [] | |
for i, proc in enumerate(serial.procs): | |
ctx = self.ctx | |
if i != len(serial.procs) - 1: | |
ctx = ctx.child().export(Context(should_wait=True)) | |
sub_run_result = ctx.run(proc) | |
children.append(sub_run_result) | |
return RunResult(serial, None, children) | |
def _run_Pipeline(self, pipeline): | |
children = [] | |
if not pipeline.procs: | |
return RunResult(pipeline, None, children) | |
next_stdin = None | |
for i, proc in enumerate(pipeline.procs): | |
ctx = self.ctx | |
if next_stdin is not None: | |
ctx = ctx.child().export(Context(stdin=next_stdin)) | |
if i != len(pipeline.procs) - 1: | |
ctx = ctx.child().export(Context(should_wait=False, stdout=subprocess.PIPE)) | |
sub_run_result = ctx.run(proc) | |
children.append(sub_run_result) | |
next_stdin = get_last_run_result(sub_run_result).popen.stdout | |
return RunResult(pipeline, None, children) | |
class Waiter(object): | |
def __init__(self, ctx): | |
self.ctx = ctx | |
def wait(self, run_result): | |
return getattr(self, "_wait_" + run_result.proc.__class__.__name__)(run_result) | |
def _wait_Identity(self, run_result): | |
child = run_result.children[0] | |
child_result = self.ctx.wait_if_needed(child) | |
run_result.completed = child_result.completed | |
run_result.stdout_data = child_result.stdout_data | |
run_result.stderr_data = child_result.stderr_data | |
return run_result | |
def _wait_System(self, run_result): | |
input = self.ctx.input | |
if isinstance(input, OnceInput): | |
input = input.read() | |
stdout_data, stderr_data = run_result.popen.communicate(input=input) | |
run_result.completed = True | |
run_result.stdout_data = stdout_data | |
run_result.stderr_data = stderr_data | |
return run_result | |
def _wait_Serial(self, run_result): | |
child_results = [self.ctx.wait_if_needed(child) for child in run_result.children] | |
run_result.completed = child_results[-1].completed | |
if run_result.stdout_data is None: | |
run_result.stdout_data = b'' | |
if run_result.stderr_data is None: | |
run_result.stderr_data = b'' | |
for child_result in child_results: | |
if child_result.stdout_data is not None: | |
run_result.stdout_data += child_result.stdout_data | |
if child_result.stderr_data is not None: | |
run_result.stderr_data += child_result.stderr_data | |
return run_result | |
def _wait_Pipeline(self, run_result): | |
child_results = [self.ctx.wait_if_needed(child) for child in run_result.children] | |
child_result = child_results[-1] | |
run_result.completed = child_result.completed | |
if run_result.stdout_data is None: | |
run_result.stdout_data = b'' | |
if run_result.stderr_data is None: | |
run_result.stderr_data = b'' | |
for child_result in child_results: | |
if child_result.stdout_data is not None: | |
run_result.stdout_data += child_result.stdout_data | |
if child_result.stderr_data is not None: | |
run_result.stderr_data += child_result.stderr_data | |
return run_result | |
class Context(object): | |
def __init__(self, parent=None, depth=None, resources_by_depth=None, should_wait=None, input=None, env=None, **kwargs): | |
self.parent = parent | |
self.depth = 0 if depth is None else depth | |
self.resources_by_depth = {} if resources_by_depth is None else resources_by_depth | |
self.should_wait = should_wait | |
self.input = input | |
if env is None: | |
self.env = ChainMap({}) | |
else: | |
if isinstance(env, ChainMap): | |
self.env = env.new_child() | |
else: | |
self.env = ChainMap({}, env) | |
self.popen_keywords = ChainMap({}, kwargs) | |
self.runner = Runner(self) | |
self.waiter = Waiter(self) | |
def child(self): | |
return Context(parent=self, | |
depth=self.depth + 1, | |
resources_by_depth=self.resources_by_depth, | |
should_wait=self.should_wait, | |
input=self.input, | |
env=self.env, | |
**self.popen_keywords) | |
def export(self, ctx): | |
if ctx.should_wait is not None: | |
self.should_wait = ctx.should_wait | |
for k, v in ctx.env.items(): | |
self.env[k] = v | |
for k, v in ctx.popen_keywords.items(): | |
self.popen_keywords[k] = v | |
if ctx.input is not None: | |
self.input = ctx.input | |
return self | |
def _debug_log(self, name, *args, **kwargs): | |
if _debug: | |
print("{: 4d} {: <10s}".format(self.depth, name), *args, file=sys.stderr, **kwargs) | |
def run(self, proc): | |
ctx = self.child().export(proc.ctx) | |
for k in ["stdin", "stdout", "stderr"]: | |
f = ctx.popen_keywords.get(k) | |
if isinstance(f, Opener): | |
if not f.opened: | |
f.open() | |
ctx.add_resource(f) | |
ctx.popen_keywords[k] = f.file | |
# FIXME: input is buggy | |
if ctx.input is not None: | |
ctx.popen_keywords["stdin"] = subprocess.PIPE | |
self._debug_log("RUN", ctx.should_wait, proc) | |
run_result = ctx.runner.run(proc) | |
return ctx.wait_if_needed(run_result) | |
def wait(self, run_result): | |
wait_result = self.waiter.wait(run_result) | |
self._debug_log("WAIT", wait_result) | |
for depth, resources in sorted(self.resources_by_depth.items(), key=lambda t: -t[0]): | |
if depth < self.depth: | |
break | |
for resource in reversed(resources): | |
self._debug_log("CLOSE_RES", depth, resource) | |
resource.close() | |
del self.resources_by_depth[depth] | |
return wait_result | |
def wait_if_needed(self, run_result): | |
if self.should_wait and not run_result.completed: | |
return self.wait(run_result) | |
else: | |
return run_result | |
def waiting_ancestor(self): | |
ctx = self | |
while True: | |
if ctx is None: | |
break | |
if ctx.should_wait: | |
break | |
ctx = ctx.parent | |
return ctx | |
def add_resource(self, resource): | |
self._debug_log("ADD_RES", resource) | |
resources = self.resources_by_depth.get(self.depth) | |
if resources is None: | |
resources = [] | |
self.resources_by_depth[self.depth] = resources | |
resources.append(resource) | |
def __str__(self): | |
return "{}({})".format(self.__class__.__name__, self.__dict__) | |
class Proc(object): | |
def __init__(self, **kwargs): | |
self.ctx = Context(**kwargs) | |
def run(self, ctx): | |
ctx = ctx.child().export(Context(should_wait=True)) | |
return ctx.run(self) | |
def __or__(self, other): | |
return Pipeline([self, other]) | |
def __lt__(self, other): | |
if isinstance(other, str): | |
other = Opener(other, "r") | |
return Identity(self, stdin=other) | |
def __gt__(self, other): | |
if isinstance(other, str): | |
other = Opener(other, "w") | |
return Identity(self, stdout=other) | |
def __rshift__(self, other): | |
if isinstance(other, str): | |
other = Opener(other, "a") | |
return Identity(self, stdout=other) | |
def __lshift__(self, other): | |
if not isinstance(other, OnceInput): | |
other = OnceInput(other) | |
return Identity(self, input=other) | |
def __getitem__(self, attr): | |
if isinstance(attr, Redirector): | |
kwargs = {} | |
if attr.fileno == 0: | |
kwargs["stdin"] = attr.file | |
elif attr.fileno == 1: | |
kwargs["stdout"] = attr.file | |
elif attr.fileno == 2: | |
kwargs["stderr"] = attr.file | |
else: | |
raise Exception("Redirect: invalid fileno: " + str(attr.fileno)) | |
return Identity(self, **kwargs) | |
elif isinstance(attr, OnceInput): | |
return Identity(self, input=attr) | |
else: | |
raise Exception("invalid attr: " + str(attr)) | |
class OnceInput(object): | |
def __init__(self, value): | |
self.value = value | |
def read(self): | |
value = self.value | |
self.value = None | |
if value is None: | |
return None | |
elif isinstance(value, str): | |
return bytes(value, encoding=sys.getdefaultencoding()) | |
elif isinstance(value, bytes): | |
return value | |
else: | |
return value.read() | |
class Opener(object): | |
def __init__(self, *args, **kwargs): | |
self.args = args | |
self.keywords = kwargs | |
self.file = None | |
self.opened = False | |
def open(self): | |
if not self.opened: | |
self.file = open(*self.args, **self.keywords) | |
self.opened = True | |
return self | |
def close(self): | |
if self.opened: | |
self.file.close() | |
def __enter__(self): | |
return self.open() | |
def __exit__(self, exc_type, exc_value, traceback): | |
self.close() | |
return True | |
def __repr__(self): | |
return "Opener(args={}, keywords={})".format(self.args, self.keywords) | |
class Identity(Proc): | |
def __init__(self, proc, **kwargs): | |
super().__init__(**kwargs) | |
self.proc = proc | |
class System(Proc): | |
def __init__(self, args, **kwargs): | |
super().__init__(**kwargs) | |
self.args = args | |
def __repr__(self): | |
return "System(args={})".format(self.args) | |
class Serial(Proc): | |
def __init__(self, procs, **kwargs): | |
super().__init__(**kwargs) | |
self.procs = procs | |
class Pipeline(Proc): | |
def __init__(self, procs, **kwargs): | |
super().__init__(**kwargs) | |
self.procs = procs | |
class FD(object): | |
def __init__(self, fileno=None): | |
self.fileno = fileno | |
def __lt__(self, other): | |
fileno = 0 if self.fileno is None else self.fileno | |
other = Opener(other, "r") if isinstance(other, str) else other | |
return Redirector(fileno, other) | |
def __gt__(self, other): | |
fileno = 1 if self.fileno is None else self.fileno | |
other = Opener(other, "w") if isinstance(other, str) else other | |
return Redirector(fileno, other) | |
def __rshift__(self, other): | |
fileno = 1 if self.fileno is None else self.fileno | |
other = Opener(other, "a") if isinstance(other, str) else other | |
return Redirector(fileno, other) | |
class Redirector(object): | |
def __init__(self, fileno, file): | |
self.fileno = fileno | |
self.file = file | |
def run(*args, **kwargs): | |
return Serial(args).run(Context(**kwargs)) | |
def out(*args, **kwargs): | |
return str(run(*args, stdout=subprocess.PIPE).stdout_data, encoding=sys.getdefaultencoding()) | |
def serial(*args, **kwargs): | |
return Serial(args, **kwargs) | |
def system(*args, **kwargs): | |
return System(args, **kwargs) | |
def pipeline(*args, **kwargs): | |
return Pipeline(args, **kwargs) | |
def _print_run_result(run_result, depth=0): | |
print(" " * depth, run_result.proc.__class__.__name__, run_result.completed, repr(run_result.stdout_data), repr(run_result.stderr_data)) | |
for child in run_result.children: | |
_print_run_result(child, depth + 1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment