Skip to content

Instantly share code, notes, and snippets.

@mpkocher
Last active August 29, 2015 14:05
Show Gist options
  • Save mpkocher/8bb1673da78246e59f4c to your computer and use it in GitHub Desktop.
Save mpkocher/8bb1673da78246e59f4c to your computer and use it in GitHub Desktop.
Dependency Inject Task definition model inspired by angularjs

DI Task API Model inspired by angularjs

# Very simple task
one_proc = 1
opts_schema = {} # task has no options

@register_task('pbsmrtpipe.tasks.simple_task_01',
                TaskTypes.LOCAL,
                (FileTypes.FASTA, ),
                (FileTypes.REPORT, ),
                opts_schema, one_proc, (),
                output_file_names = (('my_awesome_report', 'json'), ))
def to_cmd(input_files, output_files, resolved_opts, nproc, resources):
    """Simple Hello world task. fasta -> report """
    _d = dict(i=input_files[0], o=output_files[0], n=nproc)
    return "my-simple-cmd.sh --nproc {n} {i} {o}".format(**_d)

The register_task deco signature (task_id, task_type, input_types, output_types, task_option_schema, nproc, resource_types, output_file_names=None). The decorator is syntactic sugar for creating a MetaTask instance and registering the meta task in a globally accessible registry by id.

The to_cmd function will have the same signature (with the exception of task_id and task_type and kwargs), but the values passed to the function will be resolved (e.g., input_types will be replaced with actual paths to files, or resolved options).

Details (or order of function signature)

  • task_id This the global unique identifier for the task (it must begin with "pbsmrtpipe.tasks")
  • task_type is an enum of pbsmrtpipe.constants.local_task or pbsmrtpipe.constants.distributed_task (referenced by class constants TaskTypes.LOCAL and TaskTypes.DISTRIBUTED)
  • input file types by ids (e.g., ['pbsmrtpipe.files.fasta', 'pbsmrtpipe.files.movies']) The can be referenced using class constants of FileTypes. For example, FileTypes.FASTA, FileTypes.REPORT)
  • output file types see input_types_files
  • opts_schema are task options as a dict of {option_id:jsonschema}
  • nproc is the number of processors used.
  • resource_types are files or dirs are 'resources' (e.g, ResourceTypes.TMP_FILE, ResourceTypes.TMP_DIR, ResourceTypes.LOG_FILE) that can be accessed positionally into the list. For example, in to_cmd, you can reference resources[0].
  • output_file_names is a keyword argument that allows you to specify the name of each output file. The format is a list of tuples (base name, extension)

For task_type, opts, nproc and nchunks (introduced later), the values can be provided as corresponding primitive data type (e.g., int for nproc), or they can be configured via a Dependency Injection Model. This model allows the description of the computation of the value to be specified by a custom function and reference other values. For example, nproc can be computed from the task type, and, or the resolved options.

This allows for a great deal of flexibility and expressiveness. These custom function can be defined to have explicit dependencies on values that will be resolved. These are called symbols and are prefixed $ value (e.g., $max_nproc).

Symbols Definitions

  • $max_nproc the max number of processors (no dependencies, accessible by any DI configurable option)
  • $max_nchunks the max number of chunks to use (no dependencies, accessible by any DI configurable option)
  • $opts_schema the options id schema that is provided to the task decorator
  • $opts is the options provided by the user
  • $task_type the value computed from task type DI model or primitive value provided
  • $ropts the resolved and validated options
  • $nproc the value computed from nproc DI model or primitive value provided
  • $nchunks the value computed from nchunks DI model or primitive value provided in the register_scatter_task deco

Dependency Injection Model

The format for the DI model is a list of values with the last item being a custom function that can be called.

The general Dependency Injection model works as follows to compute the number of processors to use.

To define how to compute $nproc, you can specify an int, a $max_nproc or use a dependency injection model by specifying a list of dependencies (e.g, resolved options, task type, nchunks) and a function in a list.

The function is specified as the last element of the list, and the signature of the function must have a signature of list[:-1].

The only constraint when defining dependencies is to not create cyclic dependencies. For example, determining the value of '$nproc' has a dependent on resolved task options ('$ropt') and the resolved options have a dependency on '$nproc'. This creates a cycle.

TLDR

The task type, options, nproc and number of chunks can be given as primitives, or they can be provided as a Dependency Injection list model.

Using the DI list model, each computation of task type, nproc, resolved opts, number of chunks can reference other 'resolved' values ($task_type, $nproc, $ropts, $nchunks, respectively), or provided Symbol values ($max_nproc, $max_nchunks). The only constraint is DO NOT create a cycle between the dependencies.

(Please see this ipython notebook for an example of the resolved DI models for several example tasks)

Example #1

Specify the computation of the number of processors to use based on the resolved options. In the register_task deco would replace the primitive value (1) in the task with a Dependency Injection Model list definition.

def my_func(resolved_opts, n, m):
    return 8

['$ropts', 5, 3.14159, my_func]

my_func is a custom function and expected to have the signature (resolve_opts, n, m) and '$ropts' will be injected with the resolved task options.

Example #2

Specify the number of processors for the task to use based on the max number of processors and the resolved options.

def my_nproc_func(nproc, resolved_opts):
    return nproc / 2

['$max_nproc', '$ropts', my_nproc_func]

Accessing Data in Files

Passing data at runtime to task options, nproc, task_type, or nchunks can be performed by accessing a input file type that is pbreport JSON file (i.e., FileTypes.REPORT).

Example of a pbreport JSON file.

{
"_changelist": 130583,
"_version": "2.3",
"attributes": [
    {
        "id": "pbreports_example_report.nmovies",
        "name": null,
        "value": 19
    },
    {
        "id": "pbreports_example_report.ncontigs",
        "name": null,
        "value": 5
    },
    {
        "id": "pbreports_example_report.my_attribute_id",
        "name": null,
        "value": 10.0
    }
],
"id": "pbreports_example_report",
"plotGroups": [],
"tables": []
}

If your task has an input file that is a report, you can access values in the JSON report and use them in a Dependency Injection Model list. For example, you can compute the number of processors based on a value in a report that is computed at run time.

A new Symbol $inputs.0.my_attr_id is used to specify the input file and the positional index into the input types list. The my_attr_id is the report attribute id in the report to extract. The value will be resolved at runtime.

For example, '$inputs.2.attr_x' is interpreted as: grab the third file in input files (which must be of type FileTypes.Report) and extract attribute 'attr_x' from the JSON report.

Passing Data to Tasks

Use case motivation: Determine the task_type from the resolved options and attr_id in the JSON report input file.

Similar to the DI modelable values, $task_type, $ropts, $nproc, or $nchunks, $inputs.0 can be used in a DI model list.

Extending a previous example:

This assumes that the first file in the task input types list is a report type (FilesTypes.REPORT))

def my_nproc_func(nproc, resolved_opts, my_report_attribute_id):
    return nproc / 2

['$max_nproc', '$ropts', '$inputs.0.attr_x', my_nproc_func]

At runtime the value will be extracted from the report and passed the custom function.

Complete Example using Dependency Injection Model

Use case Motivations:

  • We want to compute the task type based on the resolved options and a 'attr_id' from first file in the input files.
  • We want to compute the resolved opts based on 'attr_id' from first file of the input files.
  • We want to compute nproc based on the max number of processors and resolved options

Additionally, we need to request a temp directory to be created. This directory is managed by the workflow and will automatically be deleted on the remote node when the task is completed.

def dyn_opts_func(opts, my_option_01):
    """Return a dict of resolved opts with a value that is extracted from
    a pbreports JSON file."""
    # override option
    opts['my_option_01'] = my_option_01
    return opts

def compute_task_type(resolved_opts, attr_value):
    """ Can compute the task type based on the resolved opts and
    and attribute from the report"""
    return TaskTypes.DISTRIBUTED

def _my_nproc(max_nproc, resolved_opts):
    """ Determine the value of nproc based on resolve options"""
    return max_nproc / 3


@register_task('pbsmrtpipe.tasks.task_id4',
           ('$ropts', '$inputs.0.attr_id', compute_task_type),
           (FileTypes.REPORT, FileTypes.FASTA),
           (FileTypes.ALIGNMENT_CMP_H5,),
           (opts, '$inputs.0.attr_id', dyn_opts_func),
           ('$max_nproc', '$ropts', _my_nproc),
           (ResourceTypes.TMP_DIR, ),
           output_file_names=(('my_awesome_alignment', 'cmp.h5'), ))
def to_cmd(input_files, output_files, resolved_opts, nproc, resources):
    """
    Example of dynamically passing values computed at runtime from an previous task via a pbreport.

    Map the first input file (must be a report type)

    $inputs.0 -> input_file_types.POSITIONAL.report_attribute_id

    Looks for 'attr_id' in the Attribute section of report.

    Can compute if the job should be submitted to the queue via ['$ropts', '$inputs.0.attr_id', compute_task_type]

    """
    _d = dict(e="my_exe.sh", f=input_files[0], o=output_files[0], n=nproc, t=resources[0])
    return "{e} --nproc={n} --tmpdir={t} --fasta={f} -o={o}".format(**_d)

Explicit Resolved Tasks

See several examples of the resolved dependencies in this ipython notebook. PLEASE VIEW ME

These examples shown the graph of DI model lists and explicitly show the dependency resolution and the conversion of a MetaTask instance to a Task instance.

More Task Examples

(these are taken from di_task_api.py)

@register_task('pbsmrtpipe.tasks.simple_task_01',
               TaskTypes.LOCAL,
               (FileTypes.FASTA, ),
               (FileTypes.REPORT, ), {}, 1, ())
def to_cmd(input_files, output_files, resolved_opts, nproc, resources):
    """Simple Hello world task. fasta -> report """
    _d = dict(i=input_files[0], o=output_files[0], n=nproc)
    return "my-simple-cmd.sh --nproc {n} {i} {o}".format(**_d)


@register_task('pbsmrtpipe.tasks.my_task_id',
               TaskTypes.LOCAL,
               (FileTypes.FASTA, FileTypes.RGN_FOFN),
               (FileTypes.ALIGNMENT_CMP_H5, ),
               opts,
               one_proc,
               (ResourceTypes.TMP_DIR, ResourceTypes.LOG_FILE),
               output_file_names=(('my_awesome_alignments', 'cmp.h5'), ))
def to_cmd(input_files, output_files, resolved_opts, nproc, resources):
    """
    Simple Example Task
    """
    my_tmp_dir = resources[0]
    _d = dict(e="my_exe.sh", l=resources[1], t=my_tmp_dir, o=resolved_opts['my_task_option_id'], i=input_files[0], f=input_files[1], r=output_files[0], n=nproc)
    return "{e} --nproc={n} --tmp={t} --log={l} --my-option={o} fasta:{i} --region={f} --output-report={r}".format(**_d)


@register_task('pbsmrtpipe.tasks.task_id2',
               TaskTypes.DISTRIBUTED,
               (FileTypes.VCF, ),
               (FileTypes.REPORT, ),
               opts,
               '$max_nproc',
               (ResourceTypes.TMP_DIR, ResourceTypes.TMP_FILE, ResourceTypes.TMP_FILE))
def to_cmd(input_files, output_files, resolved_opts, nproc, resources):
    """
    Note: Multiple 'resources' of the same type can be provided.
    """
    _d = dict(e="my_exe.sh")
    return "{e}".format(**_d)


def compute_nproc(global_nproc, resolved_opts):
    return global_nproc / 2


def compute_task_type(opts):
    """This must return pbsmrtpipe.constants.{local_task,distributed_task}"""
    return "pbsmrtpipe.constants.local_task"


@register_task('pbsmrtpipe.tasks.task_id8',
               ('$ropts', compute_task_type),
               (FileTypes.FASTA, FileTypes.REPORT),
               (FileTypes.VCF, ),
               opts,
               ('$max_nproc', '$ropts', compute_nproc),
               (ResourceTypes.TMP_DIR, ResourceTypes.TMP_FILE, ResourceTypes.TMP_FILE))
def to_cmd(input_files, output_files, resolved_opts, nproc, resources):
    """
    Note: Set nproc via dependency injection based on $max_nproc,
    The nproc DI list (x) is translated to x[-1](*x[:-1])

    Compute the task type based on the options
    """
    _d = dict(e="my_exe.sh")
    return "{e}".format(**_d)


def my_nproc_func(global_nproc, opts):
    return 12


def my_custom_validator(resolved_opts, a, b, c):
    """Returns resolved option dict or raises an exception.

    This is just illustrating that the DI is blindly passing values. if there's a $X 'special' value, then
    this will be injected. But passing numbers will work as well.
    """
    return resolved_opts


@register_task('pbsmrtpipe.tasks.task_id3',
               TaskTypes.DISTRIBUTED,
               (FileTypes.MOVIE_FOFN, FileTypes.RGN_FOFN),
               (FileTypes.FASTA, FileTypes.FASTQ),
               (opts, 1, 2, 3, my_custom_validator),
               ('$max_nproc', '$ropts', my_nproc_func),
               (ResourceTypes.TMP_FILE,),
               output_file_names=(('my_file', 'fasta'), ('my_f2', 'fastq')))
def to_cmd(input_files, output_files, resolved_opts, nproc, resources):
    """
    Let's set nproc to be dependent on the resolved options and $max_nproc

    Note: '$opts' is the resolved options, whereas 'opts' is the {option_id:JsonSchema}

    Need to think this over a bit.

    """
    _d = dict(e="my_exe.sh")
    return "{e}".format(**_d)


def dyn_opts_func(opts, my_option_01):
    """Return a dict of resolved opts

    Are these the resolve opts that are passed in?
    """
    # override option
    opts['my_option_01'] = my_option_01
    return opts


def compute_task_type(opts, attr_value):
    return "pbsmrtpipe.constants.distributed_task"


def _my_nproc(global_nproc, resolved_opts):

    return global_nproc / 3


@register_task('pbsmrtpipe.tasks.task_id4',
               ('$ropts', '$inputs.0.attr_id', compute_task_type),
               (FileTypes.REPORT, FileTypes.FASTA),
               (FileTypes.ALIGNMENT_CMP_H5,),
               (opts, '$inputs.0.attr_id', dyn_opts_func),
               ('$max_nproc', '$ropts', _my_nproc),
               (ResourceTypes.TMP_FILE, ),
               output_file_names=(('my_file', 'cmp.h5'), ))
def to_cmd(input_files, output_files, resolved_opts, nproc, resources):
    """
    Example of dynamically passing values computed at runtime from an previous task via a pbreport.

    Map the first input file (must be a report type)
    $inputs.0 -> ft.report_type_id

    And looks for 'attr_id' in the Attribute section of report

    Can compute if the job should be submitted to the queue via ['$opts', '$inputs.0.attr_id', compute_task_type]

    """
    _d = dict(e="my_exe.sh")
    return "{e}".format(**_d)


def nchunks_func(nmovies, resolved_opts, resolved_nproc):
    max_chunks = resolved_opts['max_chunks']
    return min(int(nmovies), max_chunks)


@register_scatter_task('pbsmrtpipe.tasks.task_id5',
                       'pbsmrtpipe.constants.distributed_task',
                       (FileTypes.REPORT, FileTypes.MOVIE_FOFN),
                       (FileTypes.RGN_FOFN, ),
                       opts,
                       ('$max_nproc', '$ropts', _my_nproc),
                       (ResourceTypes.OUTPUT_DIR, ),
                       ('$inputs.0.attr_id', '$ropts', '$nproc', nchunks_func),
                       output_file_names=(('my_rgn_movie', 'fofn'), ))
def to_cmd(input_files, output_files, resolved_opts, nproc, resources):
    """
    Scatter Tasks extend the standard task and include a 7th DI mechanism which will set $nchunks which can be used at the workflow level.

    $nchunks is only communicated to the workflow level for proper graph construction, therefore it's not included in the to_cmd signature.

    For example, if $nchunks is set to 3, then outFiles will have the ['/path/to/chunk1', '/path/to/chunk2', '/path/to/chunk3']

    This yields a slightly odd API from a commandline. my_exe.sh input.fofn --output '/path/to/chunk1' '/path/to/chunk2' '/path/to/chunk3'

    Is this a more nature commandline API would be my_exe.sh input.fofn --output-prefix="chunk_" --output-dir=/path/to --nchunks=3
    """
    _d = dict(e="my_exe.sh")
    return "{e}".format(**_d)
"""
DI Task API Model inspired by angularjs
See Readme.rst for details.
"""
import os
import sys
import tempfile
import logging
import types
import pprint
import functools
import copy
import networkx as nx
log = logging.getLogger(__name__)
# Signature {optionid:JsonSchema}
opts = {"my_task_option_id": {}}
one_proc = 1
five_proc = 5
PBSMRTPIPE_FILE_PREFIX = 'pbsmrtpipe.file'
PBSMRTPIPE_TASK_PREFIX = 'pbsmrtpipe.tasks'
class TaskTypes(object):
LOCAL = 'pbsmrtpipe.constants.local_task'
DISTRIBUTED = 'pbsmrtpipe.constants.distributed_task'
def _to_type(prefix, name):
return ".".join([prefix, name])
to_ftype = functools.partial(_to_type, PBSMRTPIPE_FILE_PREFIX)
class FileTypes(object):
REPORT = to_ftype('report')
FASTA = to_ftype('fasta')
FASTQ = to_ftype('fastq')
MOVIE_FOFN = to_ftype('movie_fofn')
RGN_FOFN = to_ftype('rgn_fofn')
ALIGNMENT_CMP_H5 = to_ftype('alignment_cmp_h5')
VCF = to_ftype('vcf')
GFF = to_ftype('gff')
def get_file_name_defaults_d():
return {FileTypes.REPORT: ('report', 'json'),
FileTypes.FASTA: ('file', 'fasta'),
FileTypes.FASTQ: ('file', 'fastq'),
FileTypes.MOVIE_FOFN: ('movie', 'fofn'),
FileTypes.RGN_FOFN: ('rgn', 'fofn'),
FileTypes.ALIGNMENT_CMP_H5: ('alignment', 'cmp.h5'),
FileTypes.VCF: ('file', 'vcf'),
FileTypes.GFF: ('file', 'gff')}
class ResourceTypes(object):
TMP_DIR = '$tmpdir'
TMP_FILE = '$tmpfile'
LOG_FILE = '$logfile'
# tasks can write output to this directory
OUTPUT_DIR = '$outputdir'
@classmethod
def ALL(cls):
return cls.TMP_DIR, cls.TMP_FILE, cls.LOG_FILE, cls.OUTPUT_DIR
@classmethod
def is_valid(cls, attr_name):
return attr_name in cls.ALL()
class MetaTask(object):
def __init__(self, task_id, task_type, input_types, output_types, option_schemas, nproc, resource_types, cmd_func):
"""These may be specified as the DI version"""
is_valid_task_id_or_raise(task_id)
self.task_id = task_id
self.input_types = input_types
self.output_types = output_types
self.resource_types = resource_types
self.option_schemas = option_schemas
self.nproc = nproc
self.task_type = task_type
self.cmd_func = cmd_func
def __repr__(self):
_d = dict(k=self.__class__.__name__,
i=self.task_id,
p=len(self.input_types),
o=len(self.output_types),
r=len(self.resource_types))
return "<{k} id:{i} inputs:{p} outputs:{o} resources:{r} >".format(**_d)
def to_cmd(self, input_files, output_files, resolved_opts, nproc, resource_types):
"""
Quite a bit of validation here to help debugging.
"""
validations = [(self.input_types, input_files),
(self.output_types, output_files),
(self.resource_types, resource_types)]
for k, v in validations:
if len(k) != len(v):
raise IndexError("Incompatible with defined types. Expected {n}. Got {i}. {v}".format(n=len(k), i=len(v), v=v))
# - should validate resolved options against schema
# this can be the DI model, or the raw di
schemas = self.option_schemas
if isinstance(self.option_schemas, (list, tuple)):
# assume the first value is a dict of the opts
schemas = self.option_schemas[0]
for k, v in schemas.iteritems():
if k not in resolved_opts:
raise KeyError("Expected resolved option with id '{k}'. Got {d}. Options are not resolved. {o}".format(k=k, d=resolved_opts, o=self.option_schemas))
if not isinstance(nproc, int):
raise TypeError("nproc expected int, got type {t}".format(t=type(nproc)))
return self.cmd_func(input_files, output_files, resolved_opts, nproc, resource_types)
class MetaScatterTask(MetaTask):
def __init__(self, task_id, task_type, input_types, output_types, opt_schema, nproc, resource_types, cmd_func, chunk_di):
super(MetaScatterTask, self).__init__(task_id, task_type, input_types, output_types, opt_schema, nproc, resource_types, cmd_func)
self.chunk_di = chunk_di
class Task(object):
def __init__(self, task_id, task_type, input_files, output_files, resolved_options, nproc, resources, cmd):
self.task_id = task_id
self.input_files = input_files
self.output_files = output_files
self.resources = resources
self.resolved_options = resolved_options
self.nproc = nproc
self.task_type = task_type
# Command list of strings or string
self.cmd = cmd
def __repr__(self):
_d = dict(k=self.__class__.__name__,
i=self.task_id,
p=len(self.input_files),
o=len(self.output_files),
r=len(self.resources),
n=self.nproc)
return "<{k} id:{i} inputs:{p} outputs:{o} resources:{r} nproc:{n} >".format(**_d)
class ScatterTask(Task):
def __init__(self, task_id, task_type, input_files, output_files, resolved_opts, nproc, resources, cmd, nchunks):
super(ScatterTask, self).__init__(task_id, task_type, input_files, output_files, resolved_opts, nproc, resources, cmd)
self.nchunks = nchunks
def __repr__(self):
_d = dict(k=self.__class__.__name__,
i=self.task_id,
p=len(self.input_files),
o=len(self.output_files),
r=len(self.resources),
n=self.nproc,
c=self.nchunks)
return "<{k} id:{i} inputs:{p} outputs:{o} resources:{r} nproc:{n} nchunks:{c} >".format(**_d)
def _get_tmpdir():
return tempfile.mkdtemp()
def _get_tmpfile(suffix=".file"):
t = tempfile.NamedTemporaryFile(suffix=suffix, delete=False)
t.close()
return t.name
def _get_logfile(output_dir):
suffix = ".log"
t = tempfile.NamedTemporaryFile(suffix=suffix, delete=False, dir=output_dir)
t.close()
return t.name
def get_mock_resolved_task_options():
opts_ = [('nmovies', 5), ('use_subreads', False), ('readScore', 0.78)]
to_k = lambda x: '.'.join(["pbsmrtpipe.task_options", x])
return {to_k(k): v for k, v in opts_}
def get_mock_report_json_attribute(report_file, attribute_id):
return 3.14159
def get_mock_report_json_int_attribute(report_file, attribute_id):
return 7
def _resolve_di(resolved_keys_d, specials_di):
resolved_specials = []
for s in specials_di:
if s in resolved_keys_d:
func = resolved_keys_d[s]
v = func()
resolved_specials.append(v)
else:
_d = dict(s=s, t=resolved_keys_d.keys())
raise ValueError("Unable to resolve special '{s}'. Valid specials are '{t}'".format(**_d))
return resolved_specials
def resolve_di_resources(output_dir, specials_di):
"""Convert the ResourceTypes to Resource instances"""
S = ResourceTypes
to_log = functools.partial(_get_logfile, output_dir)
to_o = lambda: output_dir
r = {S.LOG_FILE: to_log,
S.TMP_DIR: _get_tmpdir,
S.TMP_FILE: _get_tmpfile,
S.OUTPUT_DIR: to_o}
resolved_specials = _resolve_di(r, specials_di)
return resolved_specials
def resolve_io_files(id_to_count, id_to_name_d, output_dir, file_di):
"""
id_to_name_d {ft_id:(base, ext)}
output_dir = str
file_di = list of DI
id_to_count {ft_id: number of times the file instances were created}
(This allows unique files to be created.)
"""
# this should be done via global id?
to_p = lambda d, n: os.path.join(d, n)
to_f = functools.partial(to_p, output_dir)
paths = []
for f in file_di:
base_name, ext = id_to_name_d.get(f, ('file', 'txt'))
instance_id = id_to_count[f]
if instance_id == 0:
p = to_f('.'.join([base_name, ext]))
else:
p = to_f('.'.join([base_name + '-' + str(instance_id), ext]))
id_to_count[f] = instance_id + 1
paths.append(p)
log.debug("ID Counter")
log.debug(id_to_count)
return paths
# Need to create a closure to have the file counter be a global-esque state
resolve_files_with_defaults = functools.partial(resolve_io_files, {i: 0 for i in get_file_name_defaults_d()}, get_file_name_defaults_d())
def get_tail_func_or_raise(alist):
f = alist[-1]
if isinstance(f, types.FunctionType):
return f
else:
_d = dict(a=alist, f=f, t=type(f))
raise TypeError("Invalid DI definition for '{a}'. Expected func type {t} for {f}".format(**_d))
def to_di_graph_with_funcs(meta_task):
"""
:type meta_task: MetaTask
"""
to_funcs = {'to_nproc': None,
'to_task_type': None,
'to_nchunks': None,
'to_ropts': None}
g = nx.DiGraph()
def _add_di_nodes(alist, to_func_node_id):
for i in alist:
if isinstance(i, str):
if i.startswith('$'):
g.add_node(i)
g.add_edge(i, to_func_node_id)
elif isinstance(i, dict):
# FIXME skip these for now.
continue
else:
# Just add stuff?
x = str(i)
g.add_node(x)
g.add_edge(x, to_func_node_id)
# add nodes for task type
g.add_node('to_task_type', shape="rectangle")
g.add_node('$task_type', style="filled", fillcolor="aquamarine")
g.add_edge('to_task_type', '$task_type')
if meta_task.task_type in (TaskTypes.DISTRIBUTED, TaskTypes.LOCAL):
g.add_node(meta_task.task_type)
g.add_edge(meta_task.task_type, 'to_task_type')
else:
f = get_tail_func_or_raise(meta_task.task_type)
to_funcs['to_task_type'] = f
_add_di_nodes(meta_task.task_type[:-1], 'to_task_type')
# add option dependencies
to_ropts_fname = "to_ropts"
if isinstance(meta_task.option_schemas, (list, tuple)):
f = get_tail_func_or_raise(meta_task.option_schemas)
to_funcs['to_ropts'] = f
to_ropts_fname += "--{f}".format(f=f.__name__)
# the raw opts schema is given as first arg
xs = meta_task.option_schemas[:-1]
xs.pop(0)
xs.insert(0, '$opts')
_add_di_nodes(meta_task.option_schemas[:-1], to_ropts_fname)
elif isinstance(meta_task.option_schemas, dict):
# no DI
pass
else:
raise ValueError("Malformed option DI. '{x}'".format(x=meta_task.option_schemas))
# add option dependencies
g.add_node('$opts')
g.add_node('$opts_schema')
g.add_node('$ropts', style="filled", fillcolor="aquamarine")
g.add_node(to_ropts_fname, shape="rectangle")
g.add_edge(to_ropts_fname, '$ropts')
g.add_edge('$opts', to_ropts_fname)
g.add_edge('$opts_schema', to_ropts_fname)
# add nproc
to_nproc_fname = 'to_nproc'
if isinstance(meta_task.nproc, (list, tuple)):
f = get_tail_func_or_raise(meta_task.nproc)
to_funcs['to_nproc'] = f
to_nproc_fname += "--{f}".format(f=f.__name__)
_add_di_nodes(meta_task.nproc[:-1], to_nproc_fname)
else:
if meta_task.nproc == '$max_nproc':
g.add_node('$max_nproc')
g.add_edge('$max_nproc', to_nproc_fname)
elif isinstance(meta_task.nproc, int):
g.add_node(meta_task.nproc)
g.add_edge(meta_task.nproc, to_nproc_fname)
else:
raise TypeError("expected primiative value for nproc. Got type {t} from {v}".format(t=type(meta_task.nproc), v=meta_task.nproc))
# add outputs of func to $nproc
g.add_node('$nproc', style="filled", fillcolor="aquamarine")
g.add_node(to_nproc_fname, shape="rectangle")
g.add_edge(to_nproc_fname, '$nproc')
to_nchunks_fname = "to_nchunks"
if isinstance(meta_task, MetaScatterTask):
if isinstance(meta_task.chunk_di, (list, tuple)):
f = get_tail_func_or_raise(meta_task.chunk_di)
to_funcs['to_nchunks'] = f
to_nchunks_fname += "--{f}".format(f=f.__name__)
_add_di_nodes(meta_task.chunk_di[:-1], to_nchunks_fname)
else:
# primitive value was given
if isinstance(meta_task.chunk_di, int):
raise TypeError("Expected primiative value for nchunks. Got type {t} for {v}".format(t=type(meta_task.chunk_di), v=meta_task.chunk_di))
# add nchunks specific node (e.g., not '1')
else:
log.warn("unexpected value for nproc. {v}".format(v=meta_task.nproc))
g.add_node(to_nchunks_fname, shape="rectangle")
g.add_node('$nchunks', style="filled", fillcolor="aquamarine")
g.add_edge(to_nchunks_fname, '$nchunks')
log.debug(to_funcs)
return g, to_funcs
def to_di_graph(meta_task):
g, _ = to_di_graph_with_funcs(meta_task)
return g
def get_report_di(meta_task):
"""
Get all DI models from meta task and extract the '$inputs' that have
report JSON
:param meta_task: MetaTask
Returns a list of (inputs index, attribute_id)
"""
report_dis = []
attr_names = ['option_schemas', 'nproc', 'task_type']
if isinstance(meta_task, MetaScatterTask):
attr_names.append('chunk_di')
for attr_name in attr_names:
value_or_list = getattr(meta_task, attr_name)
if is_di_list(value_or_list):
for v in value_or_list:
if is_dollar_value(v):
if v.startswith("$inputs."):
s = v.split('.')
if len(s) == 3:
report_inputs_index = int(s[1])
attr_id = s[-1]
file_type = meta_task.input_types[report_inputs_index]
if file_type != FileTypes.REPORT:
_d = dict(i=report_inputs_index, f=file_type, p=meta_task.input_types, r=FileTypes.REPORT, t=meta_task.task_id)
raise ValueError("Incompatible file type for file index {i} {f} for task id '{t}'. Expected '{r}' type. Inputs types {p}".format(**_d))
else:
report_dis.append((report_inputs_index, attr_id))
else:
raise ValueError("Malformed report input DI '{i}".format(i=v))
return report_dis
def is_di_list(alist_or_item):
if isinstance(alist_or_item, (list, tuple)):
if isinstance(list(alist_or_item)[-1], types.FunctionType):
return True
return False
def is_dollar_value(x_):
if isinstance(x_, str):
if x_.startswith('$'):
return True
return False
def resolver(meta_task, all_task_options, output_dir, max_nproc, max_nchunks):
"""
Converts a MetaTask to a Task instance
:rtype: Task
"""
v_to_resolve = '$opts $opts_schema $ropts $task_type $nproc'.split()
# these are resolved values
resolved_values = {'$opts': all_task_options,
'$opts_schema': {'my_task_option1': 7654},
'$max_nproc': max_nproc,
'$max_chunks': max_nchunks}
report_dis = get_report_di(meta_task)
if report_dis:
log.debug("Report Input DIs")
log.debug(report_dis)
for f_index, attr_name in report_dis:
k = '.'.join(['$inputs', str(f_index), attr_name])
v = get_mock_report_json_attribute('mock_file', attr_name)
resolved_values[k] = v
def _default_resolve_ropts(opts, schema):
return opts
def _default_resolve_nproc():
return 1
def _default_task_type():
return "pbsmrtpipe.constants.distributed_task"
default_resolve_ropts = functools.partial(_default_resolve_ropts, resolved_values['$opts'], resolved_values['$opts_schema'])
# Default Resolution Functions. They have no args
default_funcs = {'to_ropts': default_resolve_ropts,
'to_nproc': _default_resolve_nproc,
'to_task_type': _default_task_type,
'to_nchunks': lambda: 5}
# the resolved func with update the resolved values with the value of dict
method_id_to_dollar_t = {'to_ropts': ('$ropts', meta_task.option_schemas),
'to_nproc': ('$nproc', meta_task.nproc),
'to_nchunks': ('$nchunks', 7),
'to_task_type': ('$task_type', meta_task.task_type)}
if isinstance(meta_task, MetaScatterTask):
method_id_to_dollar_t['to_nchunks'] = ('$nchunks', meta_task.chunk_di)
def get_method_id_prefix(s):
if isinstance(s, str):
for to_x in method_id_to_dollar_t.keys():
if s.startswith(to_x):
return to_x
return False
def begins_with_method_prefix(s):
return get_method_id_prefix(s) is not False
g = to_di_graph(meta_task)
nodes = nx.topological_sort(g)
log.info(pprint.pformat(nodes))
for node in nodes:
log.debug("Trying to resolve '{r}'".format(r=node))
if node in resolved_values:
# nothing to do here
log.debug("Value {n} has been resolved. '{v}'".format(n=node, v=resolved_values[node]))
else:
# Are we using default funcs to resolve values
if begins_with_method_prefix(node):
method_prefix = get_method_id_prefix(node)
dollar_key, di_values = method_id_to_dollar_t[method_prefix]
if is_di_list(di_values):
f = get_tail_func_or_raise(di_values)
injectable = []
for x in di_values[:-1]:
if is_dollar_value(x):
if x in resolved_values:
v = resolved_values[x]
injectable.append(v)
else:
raise ValueError("$ value '{x}' not resolved.".format(x=x))
else:
injectable.append(x)
log.debug(f.__name__)
log.debug(injectable)
# this should do an argsinpsect. Putting a TypeError try/catch
# could be misleading
value = f(*injectable)
log.info("resolved '{k}' -> '{v}'".format(k=dollar_key, v=value))
resolved_values[dollar_key] = value
else:
# use default value, it was supplied as a primitive
f = default_funcs[method_prefix]
value = f()
log.info("resolved '{k}' -> '{v}' (default resolution)".format(k=dollar_key, v=value))
resolved_values[dollar_key] = value
else:
msg = "potentially unsupported value '{n}'".format(n=node)
#log.warn(msg)
#raise ValueError(msg)
# Sanity Check to make sure required values are resolved
for x in v_to_resolve:
if x not in resolved_values:
raise ValueError("{x} was never resolved. Resolved keys {k}".format(x=x, k=resolved_values.keys()))
if meta_task.resource_types:
rfiles = resolve_di_resources(output_dir, meta_task.resource_types)
else:
rfiles = ()
ifiles = resolve_files_with_defaults(output_dir, meta_task.input_types)
ofiles = resolve_files_with_defaults(output_dir, meta_task.output_types)
cmd_str = meta_task.to_cmd(ifiles, ofiles, resolved_values['$ropts'], resolved_values['$nproc'], rfiles)
if isinstance(meta_task, MetaScatterTask):
t = ScatterTask(meta_task.task_id, resolved_values['$task_type'], ifiles, ofiles, resolved_values['$ropts'], resolved_values['$nproc'], rfiles, cmd_str, resolved_values['$nchunks'])
else:
t = Task(meta_task.task_id, resolved_values['$task_type'], ifiles, ofiles, resolved_values['$ropts'], resolved_values['$nproc'], rfiles, cmd_str)
log.info(t.__dict__)
#return resolved_values
return t
def test_resolver_with_task(t):
opts_ = {'my_task_options1': 54321,
'max_chunks': 6,
'my_task_option_id': 9876}
output_dir = tempfile.mkdtemp()
max_nproc = 17
max_nchunks = 9
log.info("Trying to resolve {t}".format(t=t))
r_values = resolver(t, opts_, output_dir, max_nproc, max_nchunks)
log.info(r_values)
log.info("Successfully resolved {t}".format(t=t))
return r_values
def test_resolver():
return {t: test_resolver_with_task(t) for t in REGISTERED_TASKS.values()}
# Global Registry
REGISTERED_TASKS = {}
def to_list_if_necessary(tuple_or_s):
if isinstance(tuple_or_s, tuple):
return list(tuple_or_s)
return tuple_or_s
def is_valid_task_id(task_id):
if isinstance(task_id, str):
return task_id.startswith('pbsmrtpipe.tasks.')
return False
def is_valid_task_id_or_raise(task_id):
if is_valid_task_id(task_id):
return True
raise ValueError("Malformed task id '{i}'".format(i=task_id))
def register_task(task_id, task_type, input_types, output_types, opt_schema, nproc, resource_types, output_file_names=None):
# this is a bit sloppy
def f(func):
def _(*args, **kwargs):
return args, kwargs
to_f = to_list_if_necessary
r = resource_types if isinstance(resource_types, (tuple, list)) else (resource_types, )
# need to copy schema opts (mutables in function)
sopts_copy = copy.deepcopy(to_f(opt_schema))
t = MetaTask(task_id, to_f(task_type), input_types, output_types, sopts_copy, to_f(nproc), r, func)
log.info(t)
if task_id in REGISTERED_TASKS:
raise KeyError("Task id {i} is already registered.".format(i=task_id))
else:
REGISTERED_TASKS[task_id] = t
return t
return f
def register_scatter_task(task_id, task_type, input_types, output_types, opt_schema, nproc, resource_types, nchunks, output_file_names=None):
def f(func):
def _(*args, **kwargs):
return args, kwargs
to_f = to_list_if_necessary
r = resource_types if isinstance(resource_types, (tuple, list)) else (resource_types, )
# need to copy schema opts (mutables in function)
sopts_copy = copy.deepcopy(to_f(opt_schema))
t = MetaScatterTask(task_id, to_f(task_type), input_types, output_types, sopts_copy, to_f(nproc), r, func, to_f(nchunks))
log.info(t)
if task_id in REGISTERED_TASKS:
raise KeyError("Task id {i} is already registered.".format(i=task_id))
else:
REGISTERED_TASKS[task_id] = t
return t
return f
@register_task('pbsmrtpipe.tasks.simple_task_01',
TaskTypes.LOCAL,
(FileTypes.FASTA, ),
(FileTypes.REPORT, ), {}, 1, ())
def to_cmd(input_files, output_files, resolved_opts, nproc, resources):
"""Simple Hello world task. fasta -> report """
_d = dict(i=input_files[0], o=output_files[0], n=nproc)
return "my-simple-cmd.sh --nproc {n} {i} {o}".format(**_d)
@register_task('pbsmrtpipe.tasks.my_task_id',
TaskTypes.LOCAL,
(FileTypes.FASTA, FileTypes.RGN_FOFN),
(FileTypes.ALIGNMENT_CMP_H5, ),
opts,
one_proc,
(ResourceTypes.TMP_DIR, ResourceTypes.LOG_FILE),
output_file_names=(('my_awesome_alignments', 'cmp.h5'), ))
def to_cmd(input_files, output_files, resolved_opts, nproc, resources):
"""
Simple Example Task
"""
my_tmp_dir = resources[0]
_d = dict(e="my_exe.sh", l=resources[1], t=my_tmp_dir, o=resolved_opts['my_task_option_id'], i=input_files[0], f=input_files[1], r=output_files[0], n=nproc)
return "{e} --nproc={n} --tmp={t} --log={l} --my-option={o} fasta:{i} --region={f} --output-report={r}".format(**_d)
@register_task('pbsmrtpipe.tasks.task_id2',
TaskTypes.DISTRIBUTED,
(FileTypes.VCF, ),
(FileTypes.REPORT, ),
opts,
'$max_nproc',
(ResourceTypes.TMP_DIR, ResourceTypes.TMP_FILE, ResourceTypes.TMP_FILE))
def to_cmd(input_files, output_files, resolved_opts, nproc, resources):
"""
Note: Multiple 'resources' of the same type can be provided.
"""
_d = dict(e="my_exe.sh")
return "{e}".format(**_d)
def compute_nproc(global_nproc, resolved_opts):
return global_nproc / 2
def compute_task_type(opts):
"""This must return pbsmrtpipe.constants.{local_task,distributed_task}"""
return "pbsmrtpipe.constants.local_task"
@register_task('pbsmrtpipe.tasks.task_id8',
('$ropts', compute_task_type),
(FileTypes.FASTA, FileTypes.REPORT),
(FileTypes.VCF, ),
opts,
('$max_nproc', '$ropts', compute_nproc),
(ResourceTypes.TMP_DIR, ResourceTypes.TMP_FILE, ResourceTypes.TMP_FILE))
def to_cmd(input_files, output_files, resolved_opts, nproc, resources):
"""
Note: Set nproc via dependency injection based on $max_nproc,
The nproc DI list (x) is translated to x[-1](*x[:-1])
Compute the task type based on the options
"""
_d = dict(e="my_exe.sh")
return "{e}".format(**_d)
def my_nproc_func(global_nproc, opts):
return 12
def my_custom_validator(resolved_opts, a, b, c):
"""Returns resolved option dict or raises an exception.
This is just illustrating that the DI is blindly passing values. if there's a $X 'special' value, then
this will be injected. But passing numbers will work as well.
"""
return resolved_opts
@register_task('pbsmrtpipe.tasks.task_id3',
TaskTypes.DISTRIBUTED,
(FileTypes.MOVIE_FOFN, FileTypes.RGN_FOFN),
(FileTypes.FASTA, FileTypes.FASTQ),
(opts, 1, 2, 3, my_custom_validator),
('$max_nproc', '$ropts', my_nproc_func),
(ResourceTypes.TMP_FILE,),
output_file_names=(('my_file', 'fasta'), ('my_f2', 'fastq')))
def to_cmd(input_files, output_files, resolved_opts, nproc, resources):
"""
Let's set nproc to be dependent on the resolved options and $max_nproc
Note: '$opts' is the resolved options, whereas 'opts' is the {option_id:JsonSchema}
Need to think this over a bit.
"""
_d = dict(e="my_exe.sh")
return "{e}".format(**_d)
def dyn_opts_func(opts, my_option_01):
"""Return a dict of resolved opts
Are these the resolve opts that are passed in?
"""
# override option
opts['my_option_01'] = my_option_01
return opts
def compute_task_type(opts, attr_value):
return "pbsmrtpipe.constants.distributed_task"
def _my_nproc(global_nproc, resolved_opts):
return global_nproc / 3
@register_task('pbsmrtpipe.tasks.task_id4',
('$ropts', '$inputs.0.attr_id', compute_task_type),
(FileTypes.REPORT, FileTypes.FASTA),
(FileTypes.ALIGNMENT_CMP_H5,),
(opts, '$inputs.0.attr_id', dyn_opts_func),
('$max_nproc', '$ropts', _my_nproc),
(ResourceTypes.TMP_FILE, ),
output_file_names=(('my_file', 'cmp.h5'), ))
def to_cmd(input_files, output_files, resolved_opts, nproc, resources):
"""
Example of dynamically passing values computed at runtime from an previous task via a pbreport.
Map the first input file (must be a report type)
$inputs.0 -> ft.report_type_id
And looks for 'attr_id' in the Attribute section of report
Can compute if the job should be submitted to the queue via ['$opts', '$inputs.0.attr_id', compute_task_type]
"""
_d = dict(e="my_exe.sh")
return "{e}".format(**_d)
def nchunks_func(nmovies, resolved_opts, resolved_nproc):
max_chunks = resolved_opts['max_chunks']
return min(int(nmovies), max_chunks)
@register_scatter_task('pbsmrtpipe.tasks.task_id5',
'pbsmrtpipe.constants.distributed_task',
(FileTypes.REPORT, FileTypes.MOVIE_FOFN),
(FileTypes.RGN_FOFN, ),
opts,
('$max_nproc', '$ropts', _my_nproc),
(ResourceTypes.OUTPUT_DIR, ),
('$inputs.0.attr_id', '$ropts', '$nproc', nchunks_func),
output_file_names=(('my_rgn_movie', 'fofn'), ))
def to_cmd(input_files, output_files, resolved_opts, nproc, resources):
"""
Scatter Tasks extend the standard task and include a 7th DI mechanism which will set $nchunks which can be used at the workflow level.
$nchunks is only communicated to the workflow level for proper graph construction, therefore it's not included in the to_cmd signature.
For example, if $nchunks is set to 3, then outFiles will have the ['/path/to/chunk1', '/path/to/chunk2', '/path/to/chunk3']
This yields a slightly odd API from a commandline. my_exe.sh input.fofn --output '/path/to/chunk1' '/path/to/chunk2' '/path/to/chunk3'
Is this a more nature commandline API would be my_exe.sh input.fofn --output-prefix="chunk_" --output-dir=/path/to --nchunks=3
"""
_d = dict(e="my_exe.sh")
return "{e}".format(**_d)
def test():
resolved_opts = {'my_task_option_id': 12345}
my_nproc = 7
t = REGISTERED_TASKS['pbsmrtpipe.tasks.my_task_id']
cmd = t.to_cmd(['/path/to/input1', '/path/to/input2'], ['/path/to/output1'], resolved_opts, my_nproc, ['/path/totmpe1', '/path/to/tmp2'])
log.debug(t)
log.debug(cmd)
return t, cmd
def main():
logging.basicConfig(level=logging.DEBUG)
log.info(pprint.pformat(REGISTERED_TASKS))
test()
meta_tasks = REGISTERED_TASKS.values()
for meta_task in meta_tasks:
log.info(pprint.pformat(meta_task.__dict__))
for i, meta_task in REGISTERED_TASKS.iteritems():
log.info(i)
g = to_di_graph(meta_task)
log.info(i)
log.info(g)
fname = i + '.dot'
nx.write_dot(g, fname)
return 0
if __name__ == '__main__':
sys.exit(main())
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
import os
import time
import logging
import functools
import operator
import itertools
import tempfile
import socket
import subprocess
import networkx as nx
from IPython.display import display_svg
log = logging.getLogger(__name__)
_SUPPORTED_IMAGE_TYPES = 'png svg eps'.split()
DOT_EXE = 'dot'
def which(exe_str):
"""walk the exe_str in PATH to get current exe_str.
If path is found, the full path is returned. Else it returns None.
"""
paths = os.environ.get('PATH', None)
state = None
if paths is None:
# log warning
msg = "PATH env var is not defined."
log.error(msg)
return state
for path in paths.split(":"):
exe_path = os.path.join(path, exe_str)
# print exe_path
if os.path.exists(exe_path):
state = exe_path
break
return state
def backticks(cmd, merge_stderr=True):
"""
Returns rcode, stdout, stderr
"""
if merge_stderr:
_stderr = subprocess.STDOUT
else:
_stderr = subprocess.PIPE
# Setting shell = True is really badform, however, many of the tasks
# generate general shell code (which needs to be removed).
p = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=_stderr,
close_fds=True)
started_at = time.time()
nodeId = socket.getfqdn()
log.debug("Running on {s} with cmd '{c}'".format(s=nodeId, c=cmd))
out = [l[:-1] for l in p.stdout.readlines()]
p.stdout.close()
# need to allow process to terminate
p.wait()
run_time = time.time() - started_at
errCode = p.returncode and p.returncode or 0
if p.returncode > 0:
errorMessage = os.linesep.join(out)
output = []
else:
errorMessage = ''
output = out
if p.returncode == 0:
log.debug("Successful output (Return code = 0) in {s:.2f} sec ({m:.2f} min) of {c}".format(c=cmd, s=run_time, m=run_time / 60.0))
else:
msg = "Return code {r} {e} of cmd {c}".format(r=p.returncode, e=errorMessage, c=cmd)
log.error(msg)
sys.stderr.write(msg)
return errCode, output, errorMessage, run_time
def _dot_to_image(image_type, dot_file, image_file):
assert image_type.lower() in _SUPPORTED_IMAGE_TYPES
assert os.path.exists(dot_file)
assert which(DOT_EXE) is not None
cmd_str = "{e} -T{t} {i} -o {o}"
d = dict(e=DOT_EXE, t=image_type, i=dot_file, o=image_file)
cmd = cmd_str.format(**d)
rcode, stdout, stderr, run_time = backticks(cmd)
state = True if rcode == 0 else False
return state
dot_file_to_png = functools.partial(_dot_to_image, 'png')
dot_file_to_svg = functools.partial(_dot_to_image, 'svg')
dot_file_to_eps = functools.partial(_dot_to_image, 'eps')
def display_dot(dot_file):
def _to_temp(suffix):
t = tempfile.NamedTemporaryFile(suffix=suffix, delete=False)
t.close()
return t.name
svg_file = _to_temp('.svg')
# print "write dot file to {f}".format(f=dot_file)
dot_file_to_svg(dot_file, svg_file)
with open(svg_file, 'r') as f:
s = f.read()
display_svg(s, raw=True)
def display_networkx_graph(g):
def _to_temp(suffix):
t = tempfile.NamedTemporaryFile(suffix=suffix, delete=False)
t.close()
return t.name
dot_file = _to_temp('.dot')
nx.write_dot(g, dot_file)
display_dot(dot_file)
def pretty_print_task_klass_options(task_klasses):
outs = []
option_names = []
option_klasses = []
for t in task_klasses:
for name, o in t._OPTIONS.iteritems():
option_names.append(o)
option_klasses.append(o.__class__.__name__)
max_option_name = max(len(o.name) for o in option_names)
max_klass_name = max(len(o) for o in option_klasses)
pad = 2
for pb_name, ts in itertools.groupby(task_klasses, key=operator.attrgetter('__module__')):
# A PB_module can have the same options in the different Tasks
options = {}
for task_klass in ts:
for name, option in task_klass._OPTIONS.iteritems():
options[name] = option
header = "PB module {m} noptions: {n}".format(m=pb_name, n=len(options))
outs.append("-" * len(header))
outs.append(header)
outs.append("-" * len(header))
for o in options.values():
outs.append(" ".join([o.name.ljust(max_option_name + pad), o.__class__.__name__.ljust(max_klass_name + pad), str(o.optional).ljust(5), str(o.default_value)]))
outs.append("")
return "\n".join(outs)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment