Skip to content

Instantly share code, notes, and snippets.

@inactivist
Forked from tomsing1/luigi_first_steps.md
Last active July 13, 2023 09:12
Show Gist options
  • Save inactivist/aac41d587f7a264428938209595fe2cf to your computer and use it in GitHub Desktop.
Save inactivist/aac41d587f7a264428938209595fe2cf to your computer and use it in GitHub Desktop.
First steps with the Luigi workflow manager

First steps with the Luigi workflow manager

As an introduction into Luigi, I am following this tutorial with some modifications, e.g. installation using conda.

The problems and solutions described in the examples below have led to the development of sciluigi, a wrapper for luigi.

Installation

Following the example, we use python 2.7.6, (but Luigi is not limited to python 2).

conda create -n luigi luigi certify python=2.7.6

(The certify module is required to run the luigi daemon.)

Finally, activate the environment

source activate luigi

Hello world

Luigi workflows consists more or less of tasks and targets.

Targets

Broadly speaking, the Target class corresponds to a file on a disk. Or a file on S3. Or some kind of a checkpoint, like an entry in a database.

You will mostly use the LocalTarget, S3Target, or FTPTarget classes that are available out of the box. These directly map to a file on the local drive, on S3, or in FTP, respectively.

In this tutorial we will only work with luigi.LocalTarget()'s, which are normal files.

Tasks

Tasks are defined as python classes that subclass the luigi.Task superclass. Each task has methods that the workflow designer is supposed to implement:

  • requires(): should return one or more instantiated tasks that the current task depends on.
    • You can have as many upstream dependencies as you like, or you can indicate zero dependencies by returning an empty array (return []), which you'll often do for the Task that kicks off a pipeline. Returning None also works.
  • output(): return one or more targets objects, typically representing files, the the current task will produce when run.
    • When you run a pipeline, Luigi first checks whether its output Targets already exist, and if not, schedules the Task to run. Otherwise, Luigi assumes that the Task is complete, and does not rerun it.
  • run(): Here goes all the code that the task should run as its job.
    • In practice, you'll often use pre-baked Task subclasses (like MortarProjectPigscriptTask where the run method is already implemented for you.

In the run method, this is how we use the inputs and outputs to the task:

  • For the input, we use the special input() method, to get the results of the output() function in our parent task (the task that specified in requires()).
  • The outputs, we get (as luigi.LocalTarget objects), by simply calling output().

Save this example to a file called luigitutorial.py:

import luigi

class HelloWorld(luigi.Task):
    # no upstream requirements at all
    def requires(self):
        return None
    # creates a local file as output
    def output(self):
        return luigi.LocalTarget('helloworld.txt')
    # the actual job to perform
    def run(self):
        with self.output().open('w') as outfile:
            outfile.write('Hello World!\n')

if __name__ == '__main__':
    luigi.run()

To run the script, there are only two things you have to specify:

  1. A scheduler host to use and
  2. the name of the task to run.

For now, lets just use the --local-scheduler option, so that we don't need to start a new scheduler, and of course, we specify the HelloWorld that we have defined above:

python luigitutorial.py --local-scheduler HelloWorld

Using Mockfiles

This example shows how to use a mock file to print to the console - useful for development and testing.

Adding dependencies and parameters

To add dependencies between tasks, we need one more task, and we need to return something more than just None in the requires() function of the downstream one.

Let's try adding another task, NameSubstituter, that will take the file we created in our HelloWorld task, and replace World with some name.

But lets save time and take two steps in one, so let's take the name to substitute with a parameter! So, look at how this looks, in the new NameSubstituter class/task (the HelloWorld task remains unchanged):

import luigi

class HelloWorld(luigi.Task):
    def requires(self):
        return None
    def output(self):
        return luigi.LocalTarget('helloworld.txt')
    def run(self):
        with self.output().open('w') as outfile:
            outfile.write('Hello World!\n')

class NameSubstituter(luigi.Task):

	# store the contents of luigi.Parameter() in the variable 'name'
    name = luigi.Parameter()

    # define upstream task
    def requires(self):
        return HelloWorld()
    # define output file, whose name is constructed from the input file
    def output(self):
        return luigi.LocalTarget(self.input().path + '.name_' + self.name)
    # the task to execute: read input file and substitute 'World'
    def run(self):
        with self.input().open() as infile, self.output().open('w') as outfile:
            text = infile.read()
            text = text.replace('World', self.name)
            outfile.write(text)

if __name__ == '__main__':
    luigi.run()

Let's run this new workflow.

⚠️ Because we included name = luigi.Parameter(), this argument is now required!

python luigitutorial.py --local-scheduler NameSubstituter --name Alfred

💡 Padding a new string to the file name of the target from the parent task is a useful pattern, since it helps us keep track of what tasks have been ran in order to produce a particular file, and also the values of the parameters to those tasks, if we choose to include them, like we did with the name parameter here.

Parameter types

We can use luigi.IntParameter rather than the default parameter class to require that the provided parameters is of type integer. Parameters can also have default values, e.g.

n = luigi.IntParameter(default=10)

Reading parameters from config files

[Source](http://luigi.readthedocs.io/en/stable/configuration.html#configuration

Visualizing running workflows

In order to see what's happening before the workflow is finished, we need to add a little sleep to the tasks, since they are running so fast. So, let's add a sleep of 15 seconds before and after the main chunk of work in each of the tasks:

import luigi
import time

class HelloWorld(luigi.Task):
    def requires(self):
        return None
    def output(self):
        return luigi.LocalTarget('helloworld.txt')
    def run(self):
        time.sleep(15)
        with self.output().open('w') as outfile:
            outfile.write('Hello World!\n')
        time.sleep(15)

class NameSubstituter(luigi.Task):
    name = luigi.Parameter()

    def requires(self):
        return HelloWorld()
    def output(self):
        return luigi.LocalTarget(self.input().path + '.name_' + self.name)
    def run(self):
        time.sleep(15)
        with self.input().open() as infile, self.output().open('w') as outfile:
            text = infile.read()
            text = text.replace('World', self.name)
            outfile.write(text)
        time.sleep(15)

if __name__ == '__main__':
    luigi.run()

We also need to run the luigi daemon, not just the local-scheduler as before:

luigid

To run the daemon in the background (but still only in the active shell):

luigid --background

In a browser, fire up the following web address: http://localhost:8082

Then, in a separate terminal window, start the luigi workflow we created above:

python luigitutorial.py --scheduler-host localhost NameSubstituter --name YourName

Go back to http://localhost:8082 andh refresh!

Re-using components in multiple workflows

The default way: Sub-classing

The simplest way to re-use luigi components, is to just subclass an existing task class, and override it's requires() method.

See for example this code example, where we have a TaskA and TaskB, and then a TaskC that depends on TaskA.

Then, in the bottom, we have subclassed TaskC into MyTaskC, and by overriding the requires() method, changed the dependency from TaskA to TaskB:

import luigi

class TaskA(luigi.Task):
    def requires(self):
        return None
    def output(self):
        return luigi.LocalTarget('task_a')
    def run(self):
        with self.output().open('w') as outfile:
            outfile.write('foo')

class TaskB(luigi.Task):
    def requires(self):
        return None
    def output(self):
        return luigi.LocalTarget('task_b')
    def run(self):
        with self.output().open('w') as outfile:
            outfile.write('bar')

class TaskC(luigi.Task):
    def requires(self):
        return TaskA() # <-- Notice this dependency!
    def output(self):
        return luigi.LocalTarget(self.input().path + '.task_c')
    def run(self):
        with self.input().open() as infile, self.output().open('w') as outfile:
            for line in infile:
                outfile.write(line)
    
# Let's create an own "copy" of TaskC, that depends on TaskB instead of TaskA:

class MyTaskC(TaskC):
    def requires(self):
        return TaskB() # <-- Notice how we switched the dependency in TaskC!

if __name__ == '__main__':
    luigi.run()

Try now to run this workflow, by executing the last task in the workflow:

python luigi_reuse_depinject.py --local-scheduler TaskC 

Alternative

The default way of re-using luigi tasks, by sub-classing, as demonstrated above, but for reasons we will not go into depth about here (but that you can read more about in this blog post ), we need a more flexible and dynamic way of building up workflows based on existing luigi tasks.

Based on our experimentation, we have found that the following method works very well:

import luigi

class TaskA(luigi.Task):
    def requires(self):
        return None
    def output(self):
        return luigi.LocalTarget('task_a')
    def run(self):
        with self.output().open('w') as outfile:
            outfile.write('foo')

class TaskB(luigi.Task):
    def requires(self):
        return None
    def output(self):
        return luigi.LocalTarget('task_b')
    def run(self):
        with self.output().open('w') as outfile:
            outfile.write('bar')

class TaskC(luigi.Task):
    # store the upstream dependency as a property of class 'TaskC', which can be set
    # as an argument at instantiation
    upstream_task = luigi.Parameter(default=TaskA())
    # Notice this dependency!
    def requires(self):
        return self.upstream_task
    def output(self):
        return luigi.LocalTarget(self.input().path + '.task_c')
    def run(self):
        with self.input().open() as infile, self.output().open('w') as outfile:
            for line in infile:
                outfile.write(line)

# Let's create a workflow task "MyWorkflow", that requires TaskC, but with a 
# different upstream dependency (TaskB) instead of the default TaskA
class MyWorkflow(luigi.Task):
    def requires(self):
    	# return an instance of TaskC that 
        return TaskC(
          upstream_task=TaskB()
        )
    def output(self):
        return self.input()

if __name__ == '__main__':
    luigi.run()

Notice in TaskC above, we are taking the upstream dependency as a parameter, rather than hard-coding it. This makes it possible to change how the workflow is connected together at any time.

Notice also, in the MyWorkflow task, how we have created this task just for the sake of encapsulating the workflow - it does not even implement a run() method.

Try now, after deleting the previously created output, to run this workflow, by executing our new and special workflow task, MyWorkflow:

python luigi_reuse_depinject.py --local-scheduler MyWorkflow

Passing parameters between tasks

You might not have realized it yet, but the second method will prove to be much preferable to the first one: it will be much easier to create modular workflows that don't require changes in any of the tasks themselves, when re-using tasks in new workflows.

Sub-classing

Consider the following (sketchy) example luigi workflow, created using the first, sub-classing, approach:

class TaskA(luigi.Task):
  param1 = luigi.Parameter()
  ... 
 
class TaskB(luigi.Task):
  param1 = luigi.Parameter()
  param2 = luigi.Parameter()  
  def requires(self):
    return TaskA(param1=self.param1)
  ...
 
class TaskC(luigi.Task):
  param1 = luigi.Parameter()
  param2 = luigi.Parameter()  
  param3 = luigi.Parameter()  
  def requires(self):
    return TaskA(param1=self.param1, 
                 param2=self.param2)
  ...
 
class TaskD(luigi.Task):
  param1 = luigi.Parameter()
  param2 = luigi.Parameter()  
  param3 = luigi.Parameter()  
  param4 = luigi.Parameter()  
  def requires(self):
    return TaskA(param1=self.param1, 
                 param2=self.param2,
                 param3=self.param3)
  ...

Notice how parameters introduced higher up in the workflow graph have to be duplicated all the way down to the last task, TaskD, and passed along, through all intermediate tasks.

Can you imagine what happens e.g. if we want to add an existing task somewhere in the middle, e.g. between TaskB and TaskC? Then we need to firstly add all the parameters which need just need to pass through this task, until it reaches its upstream goal.

If that new task takes any parameters, we will also need to duplicate those parameters in all downstream tasks (in this case TaskC and TaskD), in order to be able to execute the whole workflow?

As a consequence the tasks are no longer interchangeable nor truly modular.

Alternative

Then, consider the following workflow, where instead, each task just contains its own parameter(s), and those parameters are only duplicated (once) if / when the task is used in a workflow task, along the line of our example further above:

class TaskA(luigi.Task):
    param1 = luigi.Parameter()
    ... 
 
class TaskB(luigi.Task):
    param2 = luigi.Parameter()    
    ...
 
class TaskC(luigi.Task):
    param3 = luigi.Parameter()    
    ...
 
class TaskD(luigi.Task):
    param4 = luigi.Parameter()    
    ...

class MyWorkflow(luigi.Task):
    param1 = luigi.Parameter()
    param2 = luigi.Parameter()
    param3 = luigi.Parameter()
    param4 = luigi.Parameter()
    
    def requires(self):
        task_a = TaskA(
            param1 = self.param1)        
        task_b = TaskB(
            upstream_task=task_a,
            param2 = self.param2)        
        task_c = TaskC(
            upstream_task=task_b,
            param3 = self.param3)        
        task_d = TaskD(
            upstream_task=task_c,
            param4 = self.param4)        
        return task_d
        
    def output(self):
        return self.input()

Here, he tasks themselves never need to change and so become completely modular, easy to stitch in to any workflow!

Multiple inputs and outputs

One can return multiple outputs (and take multiple inputs) in tasks in luigi, by letting the output() function of a task return a list, but even better, a dict(), with luigi.Target()'s.

It is a good idea to check if the task can be broken down into smaller units (i.e. multiple tasks):

  • Do all these outputs logically belong together?
  • Do you have dependencies between them?

If you can’t break the task down, you might consider producing a text file with the names and the timestamps of all the individual files created by the task itself. (The log file name can be formatted e.g. as TaskName_timestamp_param1value_param2value.)

The problem

Below is an example of how TaskB can depend on two outputs from TaskA (So TaskA 's two outputs, become TaskB's two inputs):

import luigi

class TaskA(luigi.Task):
    def requires(self):
        return None
    def output(self):
        return {'output1' : luigi.LocalTarget('task_a_out1'),
                'output2' : luigi.LocalTarget('task_a_out2')}
    def run(self):
        with self.output()['output1'].open('w') as outfile:
            outfile.write('foo\n')
        with self.output()['output2'].open('w') as outfile:
            outfile.write('bar\n')

class TaskB(luigi.Task):
    def requires(self):
        return TaskA()
    def output(self):
        return luigi.LocalTarget('task_b')
    # this task simply concatenates the two outputs of TaskA
    def run(self):
        # Notice how we need to know the name of TaskA's output!
        with self.input()['output1'].open() as infile1: 
            # ... and same here ...
            with self.input()['output2'].open() as infile2:
                with self.output().open('w') as outfile:
                    for line in infile1:
                        outfile.write(line)
                    for line in infile2:
                        outfile.write(line)

if __name__ == '__main__':
    luigi.run()

But do you notice that, in the run() method of TaskB, we have to know the names of the outputs of TaskA? That is of course less than optimal, since now we have to know internals of another task inside our task. Then our tasks are neither independent nor truly modular.

But this is not even the worst we can get ... look at what happens when one task (TaskC) depends on TWO upstream tasks, EACH OF WHICH returns two outputs. Then we have to look up two dict structures (the dict returned from our requires() method, and the one returned from each upstream tasks output() function):

import luigi

class TaskA(luigi.Task):
    def requires(self):
        return None
    def output(self):
        return {'output1' : luigi.LocalTarget('task_a_out1'),
                'output2' : luigi.LocalTarget('task_a_out2')}
    def run(self):
        with self.output()['output1'].open('w') as outfile:
            outfile.write('foo\n')
        with self.output()['output2'].open('w') as outfile:
            outfile.write('foo\n')

class TaskB(luigi.Task):
    def requires(self):
        return None
    def output(self):
        return {'output1' : luigi.LocalTarget('task_b_out1'),
                'output2' : luigi.LocalTarget('task_b_out2')}
    def run(self):
        with self.output()['output1'].open('w') as outfile:
            outfile.write('bar\n')
        with self.output()['output2'].open('w') as outfile:
            outfile.write('bar\n')


class TaskC(luigi.Task):
    def requires(self):
        return {'input_a' : TaskA(),
                'input_b' : TaskB()}
    def output(self):
        return luigi.LocalTarget(self.input()['input_a']['output1'].path + '.task_c')
    def run(self):
        # we need to know the name of TaskA's output
        with self.input()['input_a']['output1'].open() as infile_a1: 
            # ... and same here ...
            with self.input()['input_a']['output2'].open() as infile_a2: 
                # we need to know the name of TaskB's output
                with self.input()['input_b']['output1'].open() as infile_b1:
                    # ... and same here ...
                    with self.input()['input_b']['output2'].open() as infile_b2:
                        with self.output().open('w') as outfile:
                            for line in infile_a1:
                                outfile.write(line)
                            for line in infile_a2:
                                outfile.write(line)
                            for line in infile_b1:
                                outfile.write(line)
                            for line in infile_b2:
                                outfile.write(line)

if __name__ == '__main__':
    luigi.run()

Notice the double dictionary look-ups, in the run() method, for example: self.input()['input_a']['output1'].open() as infile_a1 This get's rather messy after a while.

A solution

The way we have found to work around this, is the following:

Don't send upstream dependencies as parameters, as we suggested in an earlier section. Instead,

  1. send a dict-structure containing the upstream task as a parameter, along with the name of the output, to plug into this task.
  2. Make one such parameter per input that the task will use.
  3. Create a special method get_input(), stored in a meta class, that can be used to retrieve the correct input, based on the dictionary structures sent as parameters.

Instead of specifying dependencies between tasks, we specify how tasks depend on and export targets.

##--- Meta class
 
class DependencyMetaTask(luigi.Task):
    # METHODS FOR AUTOMATING DEPENDENCY MANAGEMENT
    def requires(self):
        upstream_tasks = []
        for param_val in self.param_args:
            if type(param_val) is dict:
                if 'upstream' in param_val:
                    upstream_tasks.append(param_val['upstream']['task'])
        return upstream_tasks
 
    def get_input(self, input_name):
        param = self.param_kwargs[input_name]
        if type(param) is dict and 'upstream' in param:
            return param['upstream']['task'].output()[param['upstream']['port']]
        else: 
            return param
 
 
##--- Normal classes
 
class TaskA(DependencyMetaTask):
    # INPUT TARGETS
    in1_target = luigi.Parameter()
    param_a1 = luigi.Parameter()
 
   # DEFINE OUTPUTS
    def output(self):
        return { 'out1' : 
            luigi.LocalTarget(
                self.get_input('in1_target').path + '.out1'),
                 'out2' : 
            luigi.LocalTarget(
                self.get_input('in1_target').path + '.out2') } }
 
    # WHAT THE TASK DOES
    def run(self):
        with open(self.get_input('in1_target').path) as infile:
            for line in infile:
                do_something(line)
 
 
class TaskB():
    # INPUT TARGETS
    in1_target = luigi.Parameter()
    in2_target = luigi.Parameter()
    param_b1 = luigi.Parameter()
    param_b2 = luigi.Parameter()
 
    def run(self):
        # Do something with both in1 and in2
	pass
 
##--- THE ACTUAL WORKFLOW / DEPENDENCY GRAPH DEFINITION
 
class MyWorkFlow(luigi.Task):
    # We only need to duplicate all parameters 
    # once, which is here in the workflow task
    param_a1 = luigi.Parameter()
    param_b1 = luigi.Parameter()
    param_b2 = luigi.Parameter()
 
    # Here the whole workflow definition resides:
    def requires(self):
        task_a = TaskA( 
            param_a1 = self.param_a1 
        )
 
        task_b = TaskB( 
            param_b1 = self.param_b1,
            param_b2 = self.param_b2,
            # Here below, we connect the output out1 from TaskA
            # to in1_target of TaskB ...
            in1_target = 
                { 'upstream' : { 'task' : task_a,
                                 'port' : 'out1' } }
            # ... and again, out2 of TaskA, to in2_target of
            # TaskB, using our special syntax.
            in2_target = 
                { 'upstream' : { 'task' : task_a,
                                 'port' : 'out2' } }
        )

The key here is the get_input() method in the DependencyMetaTask meta class: It allows us to send as parameters to tasks, a double dict structure looking like so:

some_target = { 'upstream' : { 'task' : TaskA() , 'port' : 'output1' } }

Then, as you can see in the run() method of TaskA above, we can just use the get_input() function to get a specific input target (and not just a dependent task, or dict of tasks, like with the normal input() method).

Maybe you also notice that this lets us do all the wiring of how outputs from TaskA is mapped to inputs (those special parameters which take a specification for how to find a certain target), in TaskB. In effect we have separated the workflow definition from the tasks themselves, and thereby made the tasks wholly independent and truly modular, just as we sought to do.

Example

In Luigi, it is very easy to implement a task's run() method with just some python code that does something.

But in bioinformatics, most of the time we want to execute some external program, that is accessible only via its command line interface (CLI).

We have found that creating some small helper functions can help a lot in making this easier to work with.

We typically create a MetaTask or Mixin, with helper functions for executing commands, and let all tasks subclass (or mix in the Mixin). Below you find our implementation of a helper function for executing shell commands, and how to use it:

import luigi
import commands

class TaskHelpers():
    # We here show the simplest version needed to execute commands in our preferred way:
    def execute_command(self, command):
        return commands.getstatusoutput(command)

    # And we can also have a really short "alias" of the execute_command method
    def x(self, command):
        return self.execute_command(command)


# Then we can use the above TaskHelper mixin, like this (taken from a real-world example):
class GenerateFingerPrint(luigi.Task, TaskHelpers):
    # INPUT TARGETS
    dataset_target = luigi.Parameter()

    # PARAMETERS
    fingerprint_type = luigi.Parameter()

    # DEFINE OUTPUTS
    def output(self):
        return { 'fingerprints' : luigi.LocalTarget(
            self.get_input('dataset_target').path + '.' + self.fingerprint_type + '.csr')
            }

    def run(self):
        self.x([JAVA_PATH, '-jar jars/FingerprintsGenerator.jar',
                '-fp', self.fingerprint_type,
                '-inputfile', self.get_input('dataset_target').path,
                '-parser', '1',
                '-outputfile', self.output()['fingerprints'].path])

In the run() method of the GenerateFingerPrint task, you see how we can execute commands by sending a python list of command parts, to the x() method, for execution (we could send a string as well, but a list of command parts has turned out to be easier to work with, when many of the parts are dynamically generated from input file names, parameters etc.)

Patterns

Source

Temporary files

Source

Currently all output will be persisted indefinitely. The upside of that is that if you try to run X -> Y, and Y crashes, you can resume with the previously built X. The downside is that you will have a lot of intermediate results on your file system. A useful pattern is to put these files in a special directory and have some kind of periodical garbage collection clean it up.

Incomplete output

Source

Luigi simply checks whether output files exist, but NOT whether they are complete. Therefore, it is a bad idea to incrementally write to an output file. Instead, write to a temporary location first - and only move it to the final output when the task is complete.

Triggering many tasks at once

Source

A convenient pattern is to have a dummy Task at the end of several dependency chains, so you can trigger a multitude of pipelines by specifying just one task in command line, similarly to how e.g. make works.

class AllReports(luigi.WrapperTask):
    date = luigi.DateParameter(default=datetime.date.today())
    def requires(self):
        yield SomeReport(self.date)
        yield SomeOtherReport(self.date)
        yield CropReport(self.date)
        yield TPSReport(self.date)
        yield FooBarBazReport(self.date)

This simple task will not do anything itself, but will invoke a bunch of other tasks. Per each invocation, Luigi will perform as many of the pending jobs as possible (those which have all their dependencies present).

You’ll need to use WrapperTask for this instead of the usual Task class, because this job will not produce any output of its own, and as such needs a way to indicate when it’s complete. This class is used for tasks that only wrap other tasks and that by definition are done if all their requirements exist.

Interacting with AWS S3

Source

**Note: ** requires() cannot return a Target object, e.g. monitor whether a file exists on AWS S3. If you have a simple Target object that is created externally you can wrap it in a Task class like this:

class LogFiles(luigi.ExternalTask):
    def output(self):
        return luigi.contrib.hdfs.HdfsTarget('/log')

This also makes it easier to add parameters:

class LogFiles(luigi.ExternalTask):
    date = luigi.DateParameter()
    def output(self):
        return luigi.contrib.hdfs.HdfsTarget(self.date.strftime('/log/%Y-%m-%d'))

Source

Sciluigi

To facilitate the use of Luigi for bioinformatics, the sciluigi wrapper has been developed and published.

This slide deck provides details and examples.

sciluigi provides the following features:

  • Separation of dependency definitions from the tasks themselves, for improved modularity and composability.
  • Inputs and outputs implemented as separate fields, a.k.a. "ports", to allow specifying dependencies between specific input and output-targets rather than just between tasks. This is again to let such details of the network definition reside outside the tasks.
  • The fact that inputs and outputs are object fields, also allows auto-completion support to ease the network connection work (Works great e.g. with jedi-vim).
  • Inputs and outputs are connected with an intuitive "single-assignment syntax".
  • "Good default" high-level logging of workflow tasks and execution times.
  • Produces an easy to read audit-report with high level information per task.
  • Integration with some HPC workload managers. (So far only SLURM though).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment