Created
September 22, 2018 15:31
-
-
Save haydenflinner/97eec5d3a0f84312bc221367acd3bfa2 to your computer and use it in GitHub Desktop.
A pyinoke tasks.py that implements make-like file dependencies.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from invoke import task, Collection | |
import invoke | |
import functools, itertools | |
import structlog | |
import os | |
log = structlog.get_logger() | |
def create_timestamp_differ(file_outputs_query, file_inputs_query, precursor=None): | |
""" | |
Creates an `invoke.task` that will return True if all of the files | |
specified by file_outputs_query are newer than all of the filenames | |
listed in ctx[file_inputs_query]. | |
Takes `precursor` so that we can set `outputs` after resolving paths, but we | |
can't resolve paths until we have ctx (when the task actually gets called). | |
This is so that programs can modify ctx at runtime without strange behavior. | |
""" | |
def resolve_for_filenames(ctx, ctx_query): | |
""" | |
>>>resolve_for_filenames(ctx, 'buildprogram.outputfiles') | |
['outputfile1', 'outputfile2'] # If ctx.buildprogram.outputfiles == ['outputfile1', ...] | |
""" | |
returning_filenames = [] | |
if not ctx_query: | |
return returning_filenames | |
strs = ctx_query.split('.') | |
# TODO try defaulting to task-name.outputs | |
assert(len(strs) > 0) # You have to give a path into your config! | |
last_result = ctx | |
for i, key in enumerate(strs): | |
if key not in last_result: | |
raise ValueError("ctx query resulted in None. Did you forget to configure it at ctx.{}?".format(ctx_query)) | |
last_result = last_result[key] | |
returning_filenames.extend(last_result) | |
return returning_filenames | |
@task | |
def timestamp_differ(ctx): | |
""" | |
Will be executed as a check before running a `make_task`. | |
Returns True if task can be skipped. | |
""" | |
input_filenames = resolve_for_filenames(ctx, file_inputs_query) | |
# Leave the filenames resolved for our user | |
if precursor: | |
precursor.outputs = input_filenames | |
precursor.output = input_filenames[0] if input_filenames else None | |
output_filenames = resolve_for_filenames(ctx, file_outputs_query) | |
# Always run things that don't produce a file | |
if not output_filenames: | |
log.debug(event="timestamp_differ.returning.have_to_run", skipping=False) | |
return False | |
# If any files are missing (whether inputs or outputs), | |
# run the task. We run when missing inputs because hopefully | |
# the task will error out and notify the user, rather than silently | |
# ignore that it was supposed to do something. | |
from pathlib import Path | |
if any(not Path(p).exists() | |
for p in itertools.chain(input_filenames, output_filenames)): | |
log.debug(event="timestamp_differ.returning.filemissing", | |
input_filenames=input_filenames) | |
return False | |
# All exist, now make sure oldest output is older than youngest input. | |
def make_timestamp_list(l): | |
return sorted(Path(p).stat().st_mtime for p in l) | |
oldest_output = make_timestamp_list(output_filenames)[0] | |
youngest_input = make_timestamp_list(input_filenames)[-1] | |
skipping = youngest_input < oldest_output | |
log.debug(event="timestamp_differ.returning.haveoutput", | |
youngest_input=youngest_input, oldest_output=oldest_output, | |
skipping=skipping) | |
return skipping | |
return skipping | |
return timestamp_differ | |
class MakeTask(invoke.tasks.Task): | |
""" | |
An `invoke.task` replacement that supports make-like file dependencies. | |
make_task works just like GNU-make: by checking the timestamps on the last | |
update of each file that you depend on against the timestamp of the files | |
you create, we can decide whether or not you need to run. | |
@param outputs: List of strings that will be used to index into your `ctx` | |
to determine the filepath that you output to. Example: | |
``` | |
@make_task(pre=[my_earlier_task], outputs=['build.outputfilenames']) | |
def build(ctx, myparam1): | |
pass | |
ns.configure({"build" : {"outputfilenames": ["outfile1"]}}) | |
``` | |
@param pre: List of tasks that this task depends on. If they are | |
`make_tasks`, they will only run if needed. Note that you can access the | |
output of a pretask at pretaskname.output regardless of how it configures | |
its output in ctx or with make_task. That is: | |
``` | |
@make_task(pre=[my_earlier_task], outputs=['build.outputfilenames']) | |
def build(ctx): | |
file.open(my_earlier_task.output) | |
return 'Found it!' | |
``` | |
will always work as long as my_earlier_task is a `make_task` and configured | |
at least one output file. Note that `output` is just a provided shortcut for | |
`outputs[0]`. | |
`pre` was chosen to overload the `invoke` pre because the transition should | |
be seamless; If you specify regular pre's that aren't make_tasks, they will | |
run as they always did. If they are make_tasks, they will skip if they | |
aren't required. | |
""" | |
def __init__(self, task, | |
file_inputs=None, pre=None, checks=None, outputs=None, | |
*args, **kwargs): | |
self.outputs = outputs | |
pre = pre or [] | |
checks = checks or [] | |
for precursor in pre: | |
if hasattr(precursor, 'outputs'): | |
# One of US! | |
# Rely on invoke to do our dirty work with `checks`. | |
# Alternatively, we might be able to reimplement something | |
# like they have by basically wrapping the task in another task, | |
checks.append(create_timestamp_differ(self.outputs, | |
file_inputs_query=precursor.outputs, | |
precursor=precursor | |
)) | |
if file_inputs: | |
checks.append(create_timestamp_differ(self.outputs, | |
file_inputs_query=file_inputs)) | |
super(MakeTask, self).__init__(task, pre=pre, checks=checks, *args, **kwargs) | |
make_task = functools.partial(task, klass=MakeTask) | |
@make_task(file_inputs='step1.inputs', outputs='step1.outputfiles') | |
def step1(ctx): | |
path = ctx.step1.inputs[0] # Could be a .c file for example | |
assert os.path.exists(path) | |
ctx.run('touch {}'.format(ctx.step1.outputfiles[0])) | |
@make_task(pre=[step1], outputs='step2.out') | |
def step2(ctx): | |
""" | |
Note that even though step1 specified ctx.something.outputfiles, | |
we can get its output through step1.output. This is a feature | |
of make_task: Once it figures out the task's output filenames | |
by checking the given query in the ctx, it will store that result | |
on that_task.outputs and the first output filename at that_task.output! | |
""" | |
path = step1.output | |
log.info(event='step2.checking', path=(path)) | |
if os.path.exists(path): | |
ctx.run('touch {}'.format(ctx.step2.out[0])) | |
else: | |
raise RuntimeError("Huh?") | |
@make_task(pre=[step2], file_inputs='step2.out') | |
def step3(ctx): | |
""" | |
Here we specify both a filename (by checking step2's spot in config, that's | |
what you get for digging around in another task's private data) | |
and a pre function that would create it -- this is just for testing, you should | |
just use pre=[step2] usually. | |
""" | |
path = step2.output | |
log.info(event='step3.checking', path=(path)) | |
if os.path.exists(path): | |
ctx.run('touch {}'.format(ctx.step3.outputs[0])) | |
else: | |
raise RuntimeError("Huh?") | |
@make_task(pre=[step3]) | |
def build(ctx): | |
log.info(event="build.succeeded") | |
print('==============================================') | |
@task | |
def test(ctx): | |
full_run = ['touch x', 'touch y', 'touch z'] | |
# Whole pipeline should run when source.c changes. | |
log.info(event="test.wholepipeline") | |
ctx.run('touch {}'.format(ctx.step1.inputs[0])) | |
res = ctx.run('invoke build') | |
assert(all(stmt in res.stdout for stmt in full_run)) | |
# Test 2, Only last step should run if next to last step's output changed. | |
log.info(event="test.laststeponly") | |
ctx.run('touch {}'.format(ctx.step2.out[0])) | |
res = ctx.run('invoke build') | |
assert(all(stmt not in res.stdout for stmt in full_run[:2])) | |
assert(full_run[2] in res.stdout) | |
# Test 3, For good measure, try kicking the middle step. | |
# I hope this constant switch between ctx.step.outputfiles or ctx.step.out | |
# or ctx.step.outputs will encourage you to use task.outputs, which will | |
# always be there regardless of how upstream tasks are configured :) | |
log.info(event="test.middledown") | |
ctx.run('touch {}'.format(ctx.step1.outputfiles[0])) | |
res = ctx.run('invoke build') | |
assert(full_run[0] not in res.stdout) | |
assert(all(stmt in res.stdout for stmt in full_run[1:])) | |
# Test 4, Make sure make_tasks with just a pre and nothing else works. | |
log.info(event="test.justsuccess") | |
res = ctx.run('invoke build') | |
# touch z has to run because I forgot to put a file_outputs tag on it. | |
# That's fine, won't hurt anything :P | |
assert(all(stmt not in res.stdout for stmt in full_run[:2])) | |
assert("succeeded" in res.stdout) | |
ns = Collection(step1, step2, step3, build, test) | |
ns.configure( | |
{ # Using the name scheme { task_name: inputs/outputs } is only convention, | |
# you could config each task from anywhere you'd like and name | |
# inputs/outputs whatever, too. | |
'step1': { | |
'inputs' : ['source.c'], | |
'outputfiles': ['x'], | |
}, | |
'step2': { | |
'out' : ['y'] | |
}, | |
'step3': { | |
'outputs' : ['z'] | |
}, | |
'run': { 'echo': True, 'pty': True} | |
}) | |
""" | |
It would be better to follow a convention for these things, and maybe not nest them in the global ctx, i.e.: | |
ns.configure( | |
{ "buildinfo": { | |
'step1': { | |
'ins' : ['source.c'], | |
'outs': ['x'], | |
}, | |
'step2': { | |
# TODO Add the ability to give a callable here, like this: | |
'ins' : [step1, 'regular_file'] | |
# To allow configuring pres from the config, if that's what you're into. | |
'out' : ['y'] | |
}, | |
'step3': { | |
'outs' : ['z'] | |
}}, | |
'run': { 'echo': True, 'pty': True} | |
}) | |
Now that we have this convention, we can iterate over all of the steps in a | |
pipeline and apply some transformation, or generate extra steps based on | |
the type of files present as inputs. Anything is possible with Python as your | |
config language :D | |
""" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment