As an introduction into Luigi, I am following this tutorial with some modifications, e.g. installation using conda.
The problems and solutions described in the examples below have led to the development of sciluigi, a wrapper for luigi.
Following the example, we use python 2.7.6, (but Luigi is not limited to python 2).
conda create -n luigi luigi certify python=2.7.6
(The certify
module is required to run the luigi daemon.)
Finally, activate the environment
source activate luigi
Luigi workflows consists more or less of tasks
and targets
.
Broadly speaking, the Target
class corresponds to a file on a disk. Or a file on S3.
Or some kind of a checkpoint, like an entry in a database.
You will mostly use the LocalTarget
, S3Target
, or FTPTarget
classes that are
available out of the box. These directly map to a file on the local drive, on S3, or in
FTP, respectively.
In this tutorial we
will only work with luigi.LocalTarget()
's, which are normal files.
Tasks are defined as python classes that subclass the luigi.Task
superclass. Each task
has methods that the workflow designer is supposed to implement:
requires():
should return one or more instantiated tasks that the current task depends on.- You can have as many upstream dependencies as you like, or you can indicate zero
dependencies by returning an empty array (return
[]
), which you'll often do for the Task that kicks off a pipeline. ReturningNone
also works.
- You can have as many upstream dependencies as you like, or you can indicate zero
dependencies by returning an empty array (return
output():
return one or more targets objects, typically representing files, the the current task will produce when run.- When you run a pipeline, Luigi first checks whether its output Targets already exist, and if not, schedules the Task to run. Otherwise, Luigi assumes that the Task is complete, and does not rerun it.
run():
Here goes all the code that the task should run as its job.- In practice, you'll often use pre-baked Task subclasses (like MortarProjectPigscriptTask where the run method is already implemented for you.
In the run method, this is how we use the inputs and outputs to the task:
- For the input, we use the special
input()
method, to get the results of theoutput()
function in our parent task (the task that specified inrequires()
). - The outputs, we get (as
luigi.LocalTarget
objects), by simply callingoutput()
.
Save this example to a file called luigitutorial.py
:
import luigi
class HelloWorld(luigi.Task):
# no upstream requirements at all
def requires(self):
return None
# creates a local file as output
def output(self):
return luigi.LocalTarget('helloworld.txt')
# the actual job to perform
def run(self):
with self.output().open('w') as outfile:
outfile.write('Hello World!\n')
if __name__ == '__main__':
luigi.run()
To run the script, there are only two things you have to specify:
- A scheduler host to use and
- the name of the task to run.
For now, lets just use the --local-scheduler
option, so that we don't need to start
a new scheduler, and of course, we specify the HelloWorld
that we have defined above:
python luigitutorial.py --local-scheduler HelloWorld
This example shows how to use a mock file to print to the console - useful for development and testing.
To add dependencies between tasks, we need one more task, and we need to return something
more than just None
in the requires()
function of the downstream one.
Let's try adding another task, NameSubstituter
, that will take the file we created
in our HelloWorld
task, and replace World
with some name.
But lets save time and take two steps in one, so let's take the name to substitute with a
parameter! So, look at how this looks, in the new NameSubstituter
class/task (the
HelloWorld
task remains unchanged):
import luigi
class HelloWorld(luigi.Task):
def requires(self):
return None
def output(self):
return luigi.LocalTarget('helloworld.txt')
def run(self):
with self.output().open('w') as outfile:
outfile.write('Hello World!\n')
class NameSubstituter(luigi.Task):
# store the contents of luigi.Parameter() in the variable 'name'
name = luigi.Parameter()
# define upstream task
def requires(self):
return HelloWorld()
# define output file, whose name is constructed from the input file
def output(self):
return luigi.LocalTarget(self.input().path + '.name_' + self.name)
# the task to execute: read input file and substitute 'World'
def run(self):
with self.input().open() as infile, self.output().open('w') as outfile:
text = infile.read()
text = text.replace('World', self.name)
outfile.write(text)
if __name__ == '__main__':
luigi.run()
Let's run this new workflow.
name = luigi.Parameter()
, this argument is now required!
python luigitutorial.py --local-scheduler NameSubstituter --name Alfred
💡 Padding a new string to the file name of the target from the parent task is a useful pattern, since it helps us keep track of what tasks have been ran in order to produce a particular file, and also the values of the parameters to those tasks, if we choose to include them, like we did with the name parameter here.
We can use luigi.IntParameter
rather than the default parameter class to require
that the provided parameters is of type integer
. Parameters can also have
default values, e.g.
n = luigi.IntParameter(default=10)
[Source](http://luigi.readthedocs.io/en/stable/configuration.html#configuration
In order to see what's happening before the workflow is finished, we need to add a little sleep to the tasks, since they are running so fast. So, let's add a sleep of 15 seconds before and after the main chunk of work in each of the tasks:
import luigi
import time
class HelloWorld(luigi.Task):
def requires(self):
return None
def output(self):
return luigi.LocalTarget('helloworld.txt')
def run(self):
time.sleep(15)
with self.output().open('w') as outfile:
outfile.write('Hello World!\n')
time.sleep(15)
class NameSubstituter(luigi.Task):
name = luigi.Parameter()
def requires(self):
return HelloWorld()
def output(self):
return luigi.LocalTarget(self.input().path + '.name_' + self.name)
def run(self):
time.sleep(15)
with self.input().open() as infile, self.output().open('w') as outfile:
text = infile.read()
text = text.replace('World', self.name)
outfile.write(text)
time.sleep(15)
if __name__ == '__main__':
luigi.run()
We also need to run the luigi daemon, not just the local-scheduler as before:
luigid
To run the daemon in the background (but still only in the active shell):
luigid --background
In a browser, fire up the following web address: http://localhost:8082
Then, in a separate terminal window, start the luigi workflow we created above:
python luigitutorial.py --scheduler-host localhost NameSubstituter --name YourName
Go back to http://localhost:8082 andh refresh!
The simplest way to re-use luigi components, is to just subclass an existing task class,
and override it's requires()
method.
See for example this code example, where we have a TaskA
and TaskB
, and then a TaskC
that depends on TaskA
.
Then, in the bottom, we have subclassed TaskC
into MyTaskC
, and by overriding the
requires()
method, changed the dependency from TaskA
to TaskB
:
import luigi
class TaskA(luigi.Task):
def requires(self):
return None
def output(self):
return luigi.LocalTarget('task_a')
def run(self):
with self.output().open('w') as outfile:
outfile.write('foo')
class TaskB(luigi.Task):
def requires(self):
return None
def output(self):
return luigi.LocalTarget('task_b')
def run(self):
with self.output().open('w') as outfile:
outfile.write('bar')
class TaskC(luigi.Task):
def requires(self):
return TaskA() # <-- Notice this dependency!
def output(self):
return luigi.LocalTarget(self.input().path + '.task_c')
def run(self):
with self.input().open() as infile, self.output().open('w') as outfile:
for line in infile:
outfile.write(line)
# Let's create an own "copy" of TaskC, that depends on TaskB instead of TaskA:
class MyTaskC(TaskC):
def requires(self):
return TaskB() # <-- Notice how we switched the dependency in TaskC!
if __name__ == '__main__':
luigi.run()
Try now to run this workflow, by executing the last task in the workflow:
python luigi_reuse_depinject.py --local-scheduler TaskC
The default way of re-using luigi tasks, by sub-classing, as demonstrated above, but for reasons we will not go into depth about here (but that you can read more about in this blog post ), we need a more flexible and dynamic way of building up workflows based on existing luigi tasks.
Based on our experimentation, we have found that the following method works very well:
import luigi
class TaskA(luigi.Task):
def requires(self):
return None
def output(self):
return luigi.LocalTarget('task_a')
def run(self):
with self.output().open('w') as outfile:
outfile.write('foo')
class TaskB(luigi.Task):
def requires(self):
return None
def output(self):
return luigi.LocalTarget('task_b')
def run(self):
with self.output().open('w') as outfile:
outfile.write('bar')
class TaskC(luigi.Task):
# store the upstream dependency as a property of class 'TaskC', which can be set
# as an argument at instantiation
upstream_task = luigi.Parameter(default=TaskA())
# Notice this dependency!
def requires(self):
return self.upstream_task
def output(self):
return luigi.LocalTarget(self.input().path + '.task_c')
def run(self):
with self.input().open() as infile, self.output().open('w') as outfile:
for line in infile:
outfile.write(line)
# Let's create a workflow task "MyWorkflow", that requires TaskC, but with a
# different upstream dependency (TaskB) instead of the default TaskA
class MyWorkflow(luigi.Task):
def requires(self):
# return an instance of TaskC that
return TaskC(
upstream_task=TaskB()
)
def output(self):
return self.input()
if __name__ == '__main__':
luigi.run()
Notice in TaskC above, we are taking the upstream dependency as a parameter
, rather than
hard-coding it. This makes it possible to change how the workflow is connected together at
any time.
Notice also, in the MyWorkflow
task, how we have created this task just for the sake of
encapsulating the workflow - it does not even implement a run()
method.
Try now, after deleting the previously created output, to run this workflow, by
executing our new and special workflow task, MyWorkflow
:
python luigi_reuse_depinject.py --local-scheduler MyWorkflow
You might not have realized it yet, but the second method will prove to be much preferable to the first one: it will be much easier to create modular workflows that don't require changes in any of the tasks themselves, when re-using tasks in new workflows.
Consider the following (sketchy) example luigi workflow, created using the first, sub-classing, approach:
class TaskA(luigi.Task):
param1 = luigi.Parameter()
...
class TaskB(luigi.Task):
param1 = luigi.Parameter()
param2 = luigi.Parameter()
def requires(self):
return TaskA(param1=self.param1)
...
class TaskC(luigi.Task):
param1 = luigi.Parameter()
param2 = luigi.Parameter()
param3 = luigi.Parameter()
def requires(self):
return TaskA(param1=self.param1,
param2=self.param2)
...
class TaskD(luigi.Task):
param1 = luigi.Parameter()
param2 = luigi.Parameter()
param3 = luigi.Parameter()
param4 = luigi.Parameter()
def requires(self):
return TaskA(param1=self.param1,
param2=self.param2,
param3=self.param3)
...
Notice how parameters introduced higher up in the workflow graph have to be duplicated all the way down to the last task, TaskD, and passed along, through all intermediate tasks.
Can you imagine what happens e.g. if we want to add an existing task somewhere in the
middle, e.g. between TaskB
and TaskC
? Then we need to firstly add all the parameters
which need just need to pass through this task, until it reaches its upstream goal.
If that new task takes any parameters, we will also need to duplicate those parameters
in all downstream tasks (in this case TaskC
and TaskD
), in order to be able to execute
the whole workflow?
As a consequence the tasks are no longer interchangeable nor truly modular.
Then, consider the following workflow, where instead, each task just contains its own parameter(s), and those parameters are only duplicated (once) if / when the task is used in a workflow task, along the line of our example further above:
class TaskA(luigi.Task):
param1 = luigi.Parameter()
...
class TaskB(luigi.Task):
param2 = luigi.Parameter()
...
class TaskC(luigi.Task):
param3 = luigi.Parameter()
...
class TaskD(luigi.Task):
param4 = luigi.Parameter()
...
class MyWorkflow(luigi.Task):
param1 = luigi.Parameter()
param2 = luigi.Parameter()
param3 = luigi.Parameter()
param4 = luigi.Parameter()
def requires(self):
task_a = TaskA(
param1 = self.param1)
task_b = TaskB(
upstream_task=task_a,
param2 = self.param2)
task_c = TaskC(
upstream_task=task_b,
param3 = self.param3)
task_d = TaskD(
upstream_task=task_c,
param4 = self.param4)
return task_d
def output(self):
return self.input()
Here, he tasks themselves never need to change and so become completely modular, easy to stitch in to any workflow!
One can return multiple outputs (and take multiple inputs) in tasks in luigi,
by letting the output()
function of a task return a list, but even better, a dict()
,
with luigi.Target()
's.
It is a good idea to check if the task can be broken down into smaller units (i.e. multiple tasks):
- Do all these outputs logically belong together?
- Do you have dependencies between them?
If you can’t break the task down, you might consider producing a text file with the names
and the timestamps of all the individual files created by the task itself.
(The log file name can be formatted e.g. as TaskName_timestamp_param1value_param2value
.)
Below is an example of how TaskB
can depend on two outputs from TaskA
(So TaskA
's
two outputs, become TaskB
's two inputs):
import luigi
class TaskA(luigi.Task):
def requires(self):
return None
def output(self):
return {'output1' : luigi.LocalTarget('task_a_out1'),
'output2' : luigi.LocalTarget('task_a_out2')}
def run(self):
with self.output()['output1'].open('w') as outfile:
outfile.write('foo\n')
with self.output()['output2'].open('w') as outfile:
outfile.write('bar\n')
class TaskB(luigi.Task):
def requires(self):
return TaskA()
def output(self):
return luigi.LocalTarget('task_b')
# this task simply concatenates the two outputs of TaskA
def run(self):
# Notice how we need to know the name of TaskA's output!
with self.input()['output1'].open() as infile1:
# ... and same here ...
with self.input()['output2'].open() as infile2:
with self.output().open('w') as outfile:
for line in infile1:
outfile.write(line)
for line in infile2:
outfile.write(line)
if __name__ == '__main__':
luigi.run()
But do you notice that, in the run()
method of TaskB
, we have to know the names of the
outputs of TaskA
? That is of course less than optimal, since now we have to know
internals of another task inside our task. Then our tasks are neither independent nor
truly modular.
But this is not even the worst we can get ... look at what happens when one task (TaskC
)
depends on TWO upstream tasks, EACH OF WHICH returns two outputs. Then we have to look up
two dict structures (the dict returned from our requires()
method, and the one returned
from each upstream tasks output()
function):
import luigi
class TaskA(luigi.Task):
def requires(self):
return None
def output(self):
return {'output1' : luigi.LocalTarget('task_a_out1'),
'output2' : luigi.LocalTarget('task_a_out2')}
def run(self):
with self.output()['output1'].open('w') as outfile:
outfile.write('foo\n')
with self.output()['output2'].open('w') as outfile:
outfile.write('foo\n')
class TaskB(luigi.Task):
def requires(self):
return None
def output(self):
return {'output1' : luigi.LocalTarget('task_b_out1'),
'output2' : luigi.LocalTarget('task_b_out2')}
def run(self):
with self.output()['output1'].open('w') as outfile:
outfile.write('bar\n')
with self.output()['output2'].open('w') as outfile:
outfile.write('bar\n')
class TaskC(luigi.Task):
def requires(self):
return {'input_a' : TaskA(),
'input_b' : TaskB()}
def output(self):
return luigi.LocalTarget(self.input()['input_a']['output1'].path + '.task_c')
def run(self):
# we need to know the name of TaskA's output
with self.input()['input_a']['output1'].open() as infile_a1:
# ... and same here ...
with self.input()['input_a']['output2'].open() as infile_a2:
# we need to know the name of TaskB's output
with self.input()['input_b']['output1'].open() as infile_b1:
# ... and same here ...
with self.input()['input_b']['output2'].open() as infile_b2:
with self.output().open('w') as outfile:
for line in infile_a1:
outfile.write(line)
for line in infile_a2:
outfile.write(line)
for line in infile_b1:
outfile.write(line)
for line in infile_b2:
outfile.write(line)
if __name__ == '__main__':
luigi.run()
Notice the double dictionary look-ups, in the run()
method, for example:
self.input()['input_a']['output1'].open() as infile_a1
This get's rather messy after a while.
The way we have found to work around this, is the following:
Don't send upstream dependencies as parameters, as we suggested in an earlier section. Instead,
- send a dict-structure containing the upstream task as a parameter, along with the name of the output, to plug into this task.
- Make one such parameter per input that the task will use.
- Create a special method
get_input()
, stored in a meta class, that can be used to retrieve the correct input, based on the dictionary structures sent as parameters.
Instead of specifying dependencies between tasks, we specify how tasks depend on and export targets.
##--- Meta class
class DependencyMetaTask(luigi.Task):
# METHODS FOR AUTOMATING DEPENDENCY MANAGEMENT
def requires(self):
upstream_tasks = []
for param_val in self.param_args:
if type(param_val) is dict:
if 'upstream' in param_val:
upstream_tasks.append(param_val['upstream']['task'])
return upstream_tasks
def get_input(self, input_name):
param = self.param_kwargs[input_name]
if type(param) is dict and 'upstream' in param:
return param['upstream']['task'].output()[param['upstream']['port']]
else:
return param
##--- Normal classes
class TaskA(DependencyMetaTask):
# INPUT TARGETS
in1_target = luigi.Parameter()
param_a1 = luigi.Parameter()
# DEFINE OUTPUTS
def output(self):
return { 'out1' :
luigi.LocalTarget(
self.get_input('in1_target').path + '.out1'),
'out2' :
luigi.LocalTarget(
self.get_input('in1_target').path + '.out2') } }
# WHAT THE TASK DOES
def run(self):
with open(self.get_input('in1_target').path) as infile:
for line in infile:
do_something(line)
class TaskB():
# INPUT TARGETS
in1_target = luigi.Parameter()
in2_target = luigi.Parameter()
param_b1 = luigi.Parameter()
param_b2 = luigi.Parameter()
def run(self):
# Do something with both in1 and in2
pass
##--- THE ACTUAL WORKFLOW / DEPENDENCY GRAPH DEFINITION
class MyWorkFlow(luigi.Task):
# We only need to duplicate all parameters
# once, which is here in the workflow task
param_a1 = luigi.Parameter()
param_b1 = luigi.Parameter()
param_b2 = luigi.Parameter()
# Here the whole workflow definition resides:
def requires(self):
task_a = TaskA(
param_a1 = self.param_a1
)
task_b = TaskB(
param_b1 = self.param_b1,
param_b2 = self.param_b2,
# Here below, we connect the output out1 from TaskA
# to in1_target of TaskB ...
in1_target =
{ 'upstream' : { 'task' : task_a,
'port' : 'out1' } }
# ... and again, out2 of TaskA, to in2_target of
# TaskB, using our special syntax.
in2_target =
{ 'upstream' : { 'task' : task_a,
'port' : 'out2' } }
)
The key here is the get_input()
method in the DependencyMetaTask
meta class: It allows
us to send as parameters to tasks, a double dict structure looking like so:
some_target = { 'upstream' : { 'task' : TaskA() , 'port' : 'output1' } }
Then, as you can see in the run() method of TaskA above, we can just use the get_input()
function to get a specific input target (and not just a dependent task, or dict of tasks,
like with the normal input()
method).
Maybe you also notice that this lets us do all the wiring of how outputs from TaskA is mapped to inputs (those special parameters which take a specification for how to find a certain target), in TaskB. In effect we have separated the workflow definition from the tasks themselves, and thereby made the tasks wholly independent and truly modular, just as we sought to do.
In Luigi, it is very easy to implement a task's run()
method with just some python code
that does something.
But in bioinformatics, most of the time we want to execute some external program, that is accessible only via its command line interface (CLI).
We have found that creating some small helper functions can help a lot in making this easier to work with.
We typically create a MetaTask or Mixin, with helper functions for executing commands, and let all tasks subclass (or mix in the Mixin). Below you find our implementation of a helper function for executing shell commands, and how to use it:
import luigi
import commands
class TaskHelpers():
# We here show the simplest version needed to execute commands in our preferred way:
def execute_command(self, command):
return commands.getstatusoutput(command)
# And we can also have a really short "alias" of the execute_command method
def x(self, command):
return self.execute_command(command)
# Then we can use the above TaskHelper mixin, like this (taken from a real-world example):
class GenerateFingerPrint(luigi.Task, TaskHelpers):
# INPUT TARGETS
dataset_target = luigi.Parameter()
# PARAMETERS
fingerprint_type = luigi.Parameter()
# DEFINE OUTPUTS
def output(self):
return { 'fingerprints' : luigi.LocalTarget(
self.get_input('dataset_target').path + '.' + self.fingerprint_type + '.csr')
}
def run(self):
self.x([JAVA_PATH, '-jar jars/FingerprintsGenerator.jar',
'-fp', self.fingerprint_type,
'-inputfile', self.get_input('dataset_target').path,
'-parser', '1',
'-outputfile', self.output()['fingerprints'].path])
In the run()
method of the GenerateFingerPrint
task, you see how we can execute
commands by sending a python list of command parts, to the x()
method, for execution
(we could send a string as well, but a list of command parts has turned out to be easier
to work with, when many of the parts are dynamically generated from input file names,
parameters etc.)
Currently all output will be persisted indefinitely. The upside of that is that if you try to run X -> Y, and Y crashes, you can resume with the previously built X. The downside is that you will have a lot of intermediate results on your file system. A useful pattern is to put these files in a special directory and have some kind of periodical garbage collection clean it up.
Luigi simply checks whether output files exist, but NOT whether they are complete. Therefore, it is a bad idea to incrementally write to an output file. Instead, write to a temporary location first - and only move it to the final output when the task is complete.
A convenient pattern is to have a dummy Task at the end of several dependency chains, so you can trigger a multitude of pipelines by specifying just one task in command line, similarly to how e.g. make works.
class AllReports(luigi.WrapperTask):
date = luigi.DateParameter(default=datetime.date.today())
def requires(self):
yield SomeReport(self.date)
yield SomeOtherReport(self.date)
yield CropReport(self.date)
yield TPSReport(self.date)
yield FooBarBazReport(self.date)
This simple task will not do anything itself, but will invoke a bunch of other tasks. Per each invocation, Luigi will perform as many of the pending jobs as possible (those which have all their dependencies present).
You’ll need to use WrapperTask
for this instead of the usual Task
class, because this
job will not produce any output of its own, and as such needs a way to indicate when it’s
complete. This class is used for tasks that only wrap other tasks and that
by definition are done if all their requirements exist.
**Note: ** requires()
cannot return a Target
object, e.g. monitor whether a file
exists on AWS S3. If you have a simple Target
object that is created externally you
can wrap it in a Task
class like this:
class LogFiles(luigi.ExternalTask):
def output(self):
return luigi.contrib.hdfs.HdfsTarget('/log')
This also makes it easier to add parameters:
class LogFiles(luigi.ExternalTask):
date = luigi.DateParameter()
def output(self):
return luigi.contrib.hdfs.HdfsTarget(self.date.strftime('/log/%Y-%m-%d'))
To facilitate the use of Luigi for bioinformatics, the sciluigi
wrapper has been
developed
and
published.
This slide deck provides details and examples.
sciluigi
provides the following features:
- Separation of dependency definitions from the tasks themselves, for improved modularity and composability.
- Inputs and outputs implemented as separate fields, a.k.a. "ports", to allow specifying dependencies between specific input and output-targets rather than just between tasks. This is again to let such details of the network definition reside outside the tasks.
- The fact that inputs and outputs are object fields, also allows auto-completion support to ease the network connection work (Works great e.g. with jedi-vim).
- Inputs and outputs are connected with an intuitive "single-assignment syntax".
- "Good default" high-level logging of workflow tasks and execution times.
- Produces an easy to read audit-report with high level information per task.
- Integration with some HPC workload managers. (So far only SLURM though).