Skip to content

Instantly share code, notes, and snippets.

@mpkocher
Last active August 29, 2015 14:05
Show Gist options
  • Save mpkocher/8bb1673da78246e59f4c to your computer and use it in GitHub Desktop.
Save mpkocher/8bb1673da78246e59f4c to your computer and use it in GitHub Desktop.
Dependency Inject Task definition model inspired by angularjs

DI Task API Model inspired by angularjs

# Very simple task
one_proc = 1
opts_schema = {} # task has no options

@register_task('pbsmrtpipe.tasks.simple_task_01',
                TaskTypes.LOCAL,
                (FileTypes.FASTA, ),
                (FileTypes.REPORT, ),
                opts_schema, one_proc, (),
                output_file_names = (('my_awesome_report', 'json'), ))
def to_cmd(input_files, output_files, resolved_opts, nproc, resources):
    """Simple Hello world task. fasta -> report """
    _d = dict(i=input_files[0], o=output_files[0], n=nproc)
    return "my-simple-cmd.sh --nproc {n} {i} {o}".format(**_d)

The register_task deco signature (task_id, task_type, input_types, output_types, task_option_schema, nproc, resource_types, output_file_names=None). The decorator is syntactic sugar for creating a MetaTask instance and registering the meta task in a globally accessible registry by id.

The to_cmd function will have the same signature (with the exception of task_id and task_type and kwargs), but the values passed to the function will be resolved (e.g., input_types will be replaced with actual paths to files, or resolved options).

Details (or order of function signature)

  • task_id This the global unique identifier for the task (it must begin with "pbsmrtpipe.tasks")
  • task_type is an enum of pbsmrtpipe.constants.local_task or pbsmrtpipe.constants.distributed_task (referenced by class constants TaskTypes.LOCAL and TaskTypes.DISTRIBUTED)
  • input file types by ids (e.g., ['pbsmrtpipe.files.fasta', 'pbsmrtpipe.files.movies']) The can be referenced using class constants of FileTypes. For example, FileTypes.FASTA, FileTypes.REPORT)
  • output file types see input_types_files
  • opts_schema are task options as a dict of {option_id:jsonschema}
  • nproc is the number of processors used.
  • resource_types are files or dirs are 'resources' (e.g, ResourceTypes.TMP_FILE, ResourceTypes.TMP_DIR, ResourceTypes.LOG_FILE) that can be accessed positionally into the list. For example, in to_cmd, you can reference resources[0].
  • output_file_names is a keyword argument that allows you to specify the name of each output file. The format is a list of tuples (base name, extension)

For task_type, opts, nproc and nchunks (introduced later), the values can be provided as corresponding primitive data type (e.g., int for nproc), or they can be configured via a Dependency Injection Model. This model allows the description of the computation of the value to be specified by a custom function and reference other values. For example, nproc can be computed from the task type, and, or the resolved options.

This allows for a great deal of flexibility and expressiveness. These custom function can be defined to have explicit dependencies on values that will be resolved. These are called symbols and are prefixed $ value (e.g., $max_nproc).

Symbols Definitions

  • $max_nproc the max number of processors (no dependencies, accessible by any DI configurable option)
  • $max_nchunks the max number of chunks to use (no dependencies, accessible by any DI configurable option)
  • $opts_schema the options id schema that is provided to the task decorator
  • $opts is the options provided by the user
  • $task_type the value computed from task type DI model or primitive value provided
  • $ropts the resolved and validated options
  • $nproc the value computed from nproc DI model or primitive value provided
  • $nchunks the value computed from nchunks DI model or primitive value provided in the register_scatter_task deco

Dependency Injection Model

The format for the DI model is a list of values with the last item being a custom function that can be called.

The general Dependency Injection model works as follows to compute the number of processors to use.

To define how to compute $nproc, you can specify an int, a $max_nproc or use a dependency injection model by specifying a list of dependencies (e.g, resolved options, task type, nchunks) and a function in a list.

The function is specified as the last element of the list, and the signature of the function must have a signature of list[:-1].

The only constraint when defining dependencies is to not create cyclic dependencies. For example, determining the value of '$nproc' has a dependent on resolved task options ('$ropt') and the resolved options have a dependency on '$nproc'. This creates a cycle.

TLDR

The task type, options, nproc and number of chunks can be given as primitives, or they can be provided as a Dependency Injection list model.

Using the DI list model, each computation of task type, nproc, resolved opts, number of chunks can reference other 'resolved' values ($task_type, $nproc, $ropts, $nchunks, respectively), or provided Symbol values ($max_nproc, $max_nchunks). The only constraint is DO NOT create a cycle between the dependencies.

(Please see this ipython notebook for an example of the resolved DI models for several example tasks)

Example #1

Specify the computation of the number of processors to use based on the resolved options. In the register_task deco would replace the primitive value (1) in the task with a Dependency Injection Model list definition.

def my_func(resolved_opts, n, m):
    return 8

['$ropts', 5, 3.14159, my_func]

my_func is a custom function and expected to have the signature (resolve_opts, n, m) and '$ropts' will be injected with the resolved task options.

Example #2

Specify the number of processors for the task to use based on the max number of processors and the resolved options.

def my_nproc_func(nproc, resolved_opts):
    return nproc / 2

['$max_nproc', '$ropts', my_nproc_func]

Accessing Data in Files

Passing data at runtime to task options, nproc, task_type, or nchunks can be performed by accessing a input file type that is pbreport JSON file (i.e., FileTypes.REPORT).

Example of a pbreport JSON file.

{
"_changelist": 130583,
"_version": "2.3",
"attributes": [
    {
        "id": "pbreports_example_report.nmovies",
        "name": null,
        "value": 19
    },
    {
        "id": "pbreports_example_report.ncontigs",
        "name": null,
        "value": 5
    },
    {
        "id": "pbreports_example_report.my_attribute_id",
        "name": null,
        "value": 10.0
    }
],
"id": "pbreports_example_report",
"plotGroups": [],
"tables": []
}

If your task has an input file that is a report, you can access values in the JSON report and use them in a Dependency Injection Model list. For example, you can compute the number of processors based on a value in a report that is computed at run time.

A new Symbol $inputs.0.my_attr_id is used to specify the input file and the positional index into the input types list. The my_attr_id is the report attribute id in the report to extract. The value will be resolved at runtime.

For example, '$inputs.2.attr_x' is interpreted as: grab the third file in input files (which must be of type FileTypes.Report) and extract attribute 'attr_x' from the JSON report.

Passing Data to Tasks

Use case motivation: Determine the task_type from the resolved options and attr_id in the JSON report input file.

Similar to the DI modelable values, $task_type, $ropts, $nproc, or $nchunks, $inputs.0 can be used in a DI model list.

Extending a previous example:

This assumes that the first file in the task input types list is a report type (FilesTypes.REPORT))

def my_nproc_func(nproc, resolved_opts, my_report_attribute_id):
    return nproc / 2

['$max_nproc', '$ropts', '$inputs.0.attr_x', my_nproc_func]

At runtime the value will be extracted from the report and passed the custom function.

Complete Example using Dependency Injection Model

Use case Motivations:

  • We want to compute the task type based on the resolved options and a 'attr_id' from first file in the input files.
  • We want to compute the resolved opts based on 'attr_id' from first file of the input files.
  • We want to compute nproc based on the max number of processors and resolved options

Additionally, we need to request a temp directory to be created. This directory is managed by the workflow and will automatically be deleted on the remote node when the task is completed.

def dyn_opts_func(opts, my_option_01):
    """Return a dict of resolved opts with a value that is extracted from
    a pbreports JSON file."""
    # override option
    opts['my_option_01'] = my_option_01
    return opts

def compute_task_type(resolved_opts, attr_value):
    """ Can compute the task type based on the resolved opts and
    and attribute from the report"""
    return TaskTypes.DISTRIBUTED

def _my_nproc(max_nproc, resolved_opts):
    """ Determine the value of nproc based on resolve options"""
    return max_nproc / 3


@register_task('pbsmrtpipe.tasks.task_id4',
           ('$ropts', '$inputs.0.attr_id', compute_task_type),
           (FileTypes.REPORT, FileTypes.FASTA),
           (FileTypes.ALIGNMENT_CMP_H5,),
           (opts, '$inputs.0.attr_id', dyn_opts_func),
           ('$max_nproc', '$ropts', _my_nproc),
           (ResourceTypes.TMP_DIR, ),
           output_file_names=(('my_awesome_alignment', 'cmp.h5'), ))
def to_cmd(input_files, output_files, resolved_opts, nproc, resources):
    """
    Example of dynamically passing values computed at runtime from an previous task via a pbreport.

    Map the first input file (must be a report type)

    $inputs.0 -> input_file_types.POSITIONAL.report_attribute_id

    Looks for 'attr_id' in the Attribute section of report.

    Can compute if the job should be submitted to the queue via ['$ropts', '$inputs.0.attr_id', compute_task_type]

    """
    _d = dict(e="my_exe.sh", f=input_files[0], o=output_files[0], n=nproc, t=resources[0])
    return "{e} --nproc={n} --tmpdir={t} --fasta={f} -o={o}".format(**_d)

Explicit Resolved Tasks

See several examples of the resolved dependencies in this ipython notebook. PLEASE VIEW ME

These examples shown the graph of DI model lists and explicitly show the dependency resolution and the conversion of a MetaTask instance to a Task instance.

More Task Examples

(these are taken from di_task_api.py)

@register_task('pbsmrtpipe.tasks.simple_task_01',
               TaskTypes.LOCAL,
               (FileTypes.FASTA, ),
               (FileTypes.REPORT, ), {}, 1, ())
def to_cmd(input_files, output_files, resolved_opts, nproc, resources):
    """Simple Hello world task. fasta -> report """
    _d = dict(i=input_files[0], o=output_files[0], n=nproc)
    return "my-simple-cmd.sh --nproc {n} {i} {o}".format(**_d)


@register_task('pbsmrtpipe.tasks.my_task_id',
               TaskTypes.LOCAL,
               (FileTypes.FASTA, FileTypes.RGN_FOFN),
               (FileTypes.ALIGNMENT_CMP_H5, ),
               opts,
               one_proc,
               (ResourceTypes.TMP_DIR, ResourceTypes.LOG_FILE),
               output_file_names=(('my_awesome_alignments', 'cmp.h5'), ))
def to_cmd(input_files, output_files, resolved_opts, nproc, resources):
    """
    Simple Example Task
    """
    my_tmp_dir = resources[0]
    _d = dict(e="my_exe.sh", l=resources[1], t=my_tmp_dir, o=resolved_opts['my_task_option_id'], i=input_files[0], f=input_files[1], r=output_files[0], n=nproc)
    return "{e} --nproc={n} --tmp={t} --log={l} --my-option={o} fasta:{i} --region={f} --output-report={r}".format(**_d)


@register_task('pbsmrtpipe.tasks.task_id2',
               TaskTypes.DISTRIBUTED,
               (FileTypes.VCF, ),
               (FileTypes.REPORT, ),
               opts,
               '$max_nproc',
               (ResourceTypes.TMP_DIR, ResourceTypes.TMP_FILE, ResourceTypes.TMP_FILE))
def to_cmd(input_files, output_files, resolved_opts, nproc, resources):
    """
    Note: Multiple 'resources' of the same type can be provided.
    """
    _d = dict(e="my_exe.sh")
    return "{e}".format(**_d)


def compute_nproc(global_nproc, resolved_opts):
    return global_nproc / 2


def compute_task_type(opts):
    """This must return pbsmrtpipe.constants.{local_task,distributed_task}"""
    return "pbsmrtpipe.constants.local_task"


@register_task('pbsmrtpipe.tasks.task_id8',
               ('$ropts', compute_task_type),
               (FileTypes.FASTA, FileTypes.REPORT),
               (FileTypes.VCF, ),
               opts,
               ('$max_nproc', '$ropts', compute_nproc),
               (ResourceTypes.TMP_DIR, ResourceTypes.TMP_FILE, ResourceTypes.TMP_FILE))
def to_cmd(input_files, output_files, resolved_opts, nproc, resources):
    """
    Note: Set nproc via dependency injection based on $max_nproc,
    The nproc DI list (x) is translated to x[-1](*x[:-1])

    Compute the task type based on the options
    """
    _d = dict(e="my_exe.sh")
    return "{e}".format(**_d)


def my_nproc_func(global_nproc, opts):
    return 12


def my_custom_validator(resolved_opts, a, b, c):
    """Returns resolved option dict or raises an exception.

    This is just illustrating that the DI is blindly passing values. if there's a $X 'special' value, then
    this will be injected. But passing numbers will work as well.
    """
    return resolved_opts


@register_task('pbsmrtpipe.tasks.task_id3',
               TaskTypes.DISTRIBUTED,
               (FileTypes.MOVIE_FOFN, FileTypes.RGN_FOFN),
               (FileTypes.FASTA, FileTypes.FASTQ),
               (opts, 1, 2, 3, my_custom_validator),
               ('$max_nproc', '$ropts', my_nproc_func),
               (ResourceTypes.TMP_FILE,),
               output_file_names=(('my_file', 'fasta'), ('my_f2', 'fastq')))
def to_cmd(input_files, output_files, resolved_opts, nproc, resources):
    """
    Let's set nproc to be dependent on the resolved options and $max_nproc

    Note: '$opts' is the resolved options, whereas 'opts' is the {option_id:JsonSchema}

    Need to think this over a bit.

    """
    _d = dict(e="my_exe.sh")
    return "{e}".format(**_d)


def dyn_opts_func(opts, my_option_01):
    """Return a dict of resolved opts

    Are these the resolve opts that are passed in?
    """
    # override option
    opts['my_option_01'] = my_option_01
    return opts


def compute_task_type(opts, attr_value):
    return "pbsmrtpipe.constants.distributed_task"


def _my_nproc(global_nproc, resolved_opts):

    return global_nproc / 3


@register_task('pbsmrtpipe.tasks.task_id4',
               ('$ropts', '$inputs.0.attr_id', compute_task_type),
               (FileTypes.REPORT, FileTypes.FASTA),
               (FileTypes.ALIGNMENT_CMP_H5,),
               (opts, '$inputs.0.attr_id', dyn_opts_func),
               ('$max_nproc', '$ropts', _my_nproc),
               (ResourceTypes.TMP_FILE, ),
               output_file_names=(('my_file', 'cmp.h5'), ))
def to_cmd(input_files, output_files, resolved_opts, nproc, resources):
    """
    Example of dynamically passing values computed at runtime from an previous task via a pbreport.

    Map the first input file (must be a report type)
    $inputs.0 -> ft.report_type_id

    And looks for 'attr_id' in the Attribute section of report

    Can compute if the job should be submitted to the queue via ['$opts', '$inputs.0.attr_id', compute_task_type]

    """
    _d = dict(e="my_exe.sh")
    return "{e}".format(**_d)


def nchunks_func(nmovies, resolved_opts, resolved_nproc):
    max_chunks = resolved_opts['max_chunks']
    return min(int(nmovies), max_chunks)


@register_scatter_task('pbsmrtpipe.tasks.task_id5',
                       'pbsmrtpipe.constants.distributed_task',
                       (FileTypes.REPORT, FileTypes.MOVIE_FOFN),
                       (FileTypes.RGN_FOFN, ),
                       opts,
                       ('$max_nproc', '$ropts', _my_nproc),
                       (ResourceTypes.OUTPUT_DIR, ),
                       ('$inputs.0.attr_id', '$ropts', '$nproc', nchunks_func),
                       output_file_names=(('my_rgn_movie', 'fofn'), ))
def to_cmd(input_files, output_files, resolved_opts, nproc, resources):
    """
    Scatter Tasks extend the standard task and include a 7th DI mechanism which will set $nchunks which can be used at the workflow level.

    $nchunks is only communicated to the workflow level for proper graph construction, therefore it's not included in the to_cmd signature.

    For example, if $nchunks is set to 3, then outFiles will have the ['/path/to/chunk1', '/path/to/chunk2', '/path/to/chunk3']

    This yields a slightly odd API from a commandline. my_exe.sh input.fofn --output '/path/to/chunk1' '/path/to/chunk2' '/path/to/chunk3'

    Is this a more nature commandline API would be my_exe.sh input.fofn --output-prefix="chunk_" --output-dir=/path/to --nchunks=3
    """
    _d = dict(e="my_exe.sh")
    return "{e}".format(**_d)
"""
DI Task API Model inspired by angularjs
See Readme.rst for details.
"""
import os
import sys
import tempfile
import logging
import types
import pprint
import functools
import copy
import networkx as nx
log = logging.getLogger(__name__)
# Signature {optionid:JsonSchema}
opts = {"my_task_option_id": {}}
one_proc = 1
five_proc = 5
PBSMRTPIPE_FILE_PREFIX = 'pbsmrtpipe.file'
PBSMRTPIPE_TASK_PREFIX = 'pbsmrtpipe.tasks'
class TaskTypes(object):
LOCAL = 'pbsmrtpipe.constants.local_task'
DISTRIBUTED = 'pbsmrtpipe.constants.distributed_task'
def _to_type(prefix, name):
return ".".join([prefix, name])
to_ftype = functools.partial(_to_type, PBSMRTPIPE_FILE_PREFIX)
class FileTypes(object):
REPORT = to_ftype('report')
FASTA = to_ftype('fasta')
FASTQ = to_ftype('fastq')
MOVIE_FOFN = to_ftype('movie_fofn')
RGN_FOFN = to_ftype('rgn_fofn')
ALIGNMENT_CMP_H5 = to_ftype('alignment_cmp_h5')
VCF = to_ftype('vcf')
GFF = to_ftype('gff')
def get_file_name_defaults_d():
return {FileTypes.REPORT: ('report', 'json'),
FileTypes.FASTA: ('file', 'fasta'),
FileTypes.FASTQ: ('file', 'fastq'),
FileTypes.MOVIE_FOFN: ('movie', 'fofn'),
FileTypes.RGN_FOFN: ('rgn', 'fofn'),
FileTypes.ALIGNMENT_CMP_H5: ('alignment', 'cmp.h5'),
FileTypes.VCF: ('file', 'vcf'),
FileTypes.GFF: ('file', 'gff')}
class ResourceTypes(object):
TMP_DIR = '$tmpdir'
TMP_FILE = '$tmpfile'
LOG_FILE = '$logfile'
# tasks can write output to this directory
OUTPUT_DIR = '$outputdir'
@classmethod
def ALL(cls):
return cls.TMP_DIR, cls.TMP_FILE, cls.LOG_FILE, cls.OUTPUT_DIR
@classmethod
def is_valid(cls, attr_name):
return attr_name in cls.ALL()
class MetaTask(object):
def __init__(self, task_id, task_type, input_types, output_types, option_schemas, nproc, resource_types, cmd_func):
"""These may be specified as the DI version"""
is_valid_task_id_or_raise(task_id)
self.task_id = task_id
self.input_types = input_types
self.output_types = output_types
self.resource_types = resource_types
self.option_schemas = option_schemas
self.nproc = nproc
self.task_type = task_type
self.cmd_func = cmd_func
def __repr__(self):
_d = dict(k=self.__class__.__name__,
i=self.task_id,
p=len(self.input_types),
o=len(self.output_types),
r=len(self.resource_types))
return "<{k} id:{i} inputs:{p} outputs:{o} resources:{r} >".format(**_d)
def to_cmd(self, input_files, output_files, resolved_opts, nproc, resource_types):
"""
Quite a bit of validation here to help debugging.
"""
validations = [(self.input_types, input_files),
(self.output_types, output_files),
(self.resource_types, resource_types)]
for k, v in validations:
if len(k) != len(v):
raise IndexError("Incompatible with defined types. Expected {n}. Got {i}. {v}".format(n=len(k), i=len(v), v=v))
# - should validate resolved options against schema
# this can be the DI model, or the raw di
schemas = self.option_schemas
if isinstance(self.option_schemas, (list, tuple)):
# assume the first value is a dict of the opts
schemas = self.option_schemas[0]
for k, v in schemas.iteritems():
if k not in resolved_opts:
raise KeyError("Expected resolved option with id '{k}'. Got {d}. Options are not resolved. {o}".format(k=k, d=resolved_opts, o=self.option_schemas))
if not isinstance(nproc, int):
raise TypeError("nproc expected int, got type {t}".format(t=type(nproc)))
return self.cmd_func(input_files, output_files, resolved_opts, nproc, resource_types)
class MetaScatterTask(MetaTask):
def __init__(self, task_id, task_type, input_types, output_types, opt_schema, nproc, resource_types, cmd_func, chunk_di):
super(MetaScatterTask, self).__init__(task_id, task_type, input_types, output_types, opt_schema, nproc, resource_types, cmd_func)
self.chunk_di = chunk_di
class Task(object):
def __init__(self, task_id, task_type, input_files, output_files, resolved_options, nproc, resources, cmd):
self.task_id = task_id
self.input_files = input_files
self.output_files = output_files
self.resources = resources
self.resolved_options = resolved_options
self.nproc = nproc
self.task_type = task_type
# Command list of strings or string
self.cmd = cmd
def __repr__(self):
_d = dict(k=self.__class__.__name__,
i=self.task_id,
p=len(self.input_files),
o=len(self.output_files),
r=len(self.resources),
n=self.nproc)
return "<{k} id:{i} inputs:{p} outputs:{o} resources:{r} nproc:{n} >".format(**_d)
class ScatterTask(Task):
def __init__(self, task_id, task_type, input_files, output_files, resolved_opts, nproc, resources, cmd, nchunks):
super(ScatterTask, self).__init__(task_id, task_type, input_files, output_files, resolved_opts, nproc, resources, cmd)
self.nchunks = nchunks
def __repr__(self):
_d = dict(k=self.__class__.__name__,
i=self.task_id,
p=len(self.input_files),
o=len(self.output_files),
r=len(self.resources),
n=self.nproc,
c=self.nchunks)
return "<{k} id:{i} inputs:{p} outputs:{o} resources:{r} nproc:{n} nchunks:{c} >".format(**_d)
def _get_tmpdir():
return tempfile.mkdtemp()
def _get_tmpfile(suffix=".file"):
t = tempfile.NamedTemporaryFile(suffix=suffix, delete=False)
t.close()
return t.name
def _get_logfile(output_dir):
suffix = ".log"
t = tempfile.NamedTemporaryFile(suffix=suffix, delete=False, dir=output_dir)
t.close()
return t.name
def get_mock_resolved_task_options():
opts_ = [('nmovies', 5), ('use_subreads', False), ('readScore', 0.78)]
to_k = lambda x: '.'.join(["pbsmrtpipe.task_options", x])
return {to_k(k): v for k, v in opts_}
def get_mock_report_json_attribute(report_file, attribute_id):
return 3.14159
def get_mock_report_json_int_attribute(report_file, attribute_id):
return 7
def _resolve_di(resolved_keys_d, specials_di):
resolved_specials = []
for s in specials_di:
if s in resolved_keys_d:
func = resolved_keys_d[s]
v = func()
resolved_specials.append(v)
else:
_d = dict(s=s, t=resolved_keys_d.keys())
raise ValueError("Unable to resolve special '{s}'. Valid specials are '{t}'".format(**_d))
return resolved_specials
def resolve_di_resources(output_dir, specials_di):
"""Convert the ResourceTypes to Resource instances"""
S = ResourceTypes
to_log = functools.partial(_get_logfile, output_dir)
to_o = lambda: output_dir
r = {S.LOG_FILE: to_log,
S.TMP_DIR: _get_tmpdir,
S.TMP_FILE: _get_tmpfile,
S.OUTPUT_DIR: to_o}
resolved_specials = _resolve_di(r, specials_di)
return resolved_specials
def resolve_io_files(id_to_count, id_to_name_d, output_dir, file_di):
"""
id_to_name_d {ft_id:(base, ext)}
output_dir = str
file_di = list of DI
id_to_count {ft_id: number of times the file instances were created}
(This allows unique files to be created.)
"""
# this should be done via global id?
to_p = lambda d, n: os.path.join(d, n)
to_f = functools.partial(to_p, output_dir)
paths = []
for f in file_di:
base_name, ext = id_to_name_d.get(f, ('file', 'txt'))
instance_id = id_to_count[f]
if instance_id == 0:
p = to_f('.'.join([base_name, ext]))
else:
p = to_f('.'.join([base_name + '-' + str(instance_id), ext]))
id_to_count[f] = instance_id + 1
paths.append(p)
log.debug("ID Counter")
log.debug(id_to_count)
return paths
# Need to create a closure to have the file counter be a global-esque state
resolve_files_with_defaults = functools.partial(resolve_io_files, {i: 0 for i in get_file_name_defaults_d()}, get_file_name_defaults_d())
def get_tail_func_or_raise(alist):
f = alist[-1]
if isinstance(f, types.FunctionType):
return f
else:
_d = dict(a=alist, f=f, t=type(f))
raise TypeError("Invalid DI definition for '{a}'. Expected func type {t} for {f}".format(**_d))
def to_di_graph_with_funcs(meta_task):
"""
:type meta_task: MetaTask
"""
to_funcs = {'to_nproc': None,
'to_task_type': None,
'to_nchunks': None,
'to_ropts': None}
g = nx.DiGraph()
def _add_di_nodes(alist, to_func_node_id):
for i in alist:
if isinstance(i, str):
if i.startswith('$'):
g.add_node(i)
g.add_edge(i, to_func_node_id)
elif isinstance(i, dict):
# FIXME skip these for now.
continue
else:
# Just add stuff?
x = str(i)
g.add_node(x)
g.add_edge(x, to_func_node_id)
# add nodes for task type
g.add_node('to_task_type', shape="rectangle")
g.add_node('$task_type', style="filled", fillcolor="aquamarine")
g.add_edge('to_task_type', '$task_type')
if meta_task.task_type in (TaskTypes.DISTRIBUTED, TaskTypes.LOCAL):
g.add_node(meta_task.task_type)
g.add_edge(meta_task.task_type, 'to_task_type')
else:
f = get_tail_func_or_raise(meta_task.task_type)
to_funcs['to_task_type'] = f
_add_di_nodes(meta_task.task_type[:-1], 'to_task_type')
# add option dependencies
to_ropts_fname = "to_ropts"
if isinstance(meta_task.option_schemas, (list, tuple)):
f = get_tail_func_or_raise(meta_task.option_schemas)
to_funcs['to_ropts'] = f
to_ropts_fname += "--{f}".format(f=f.__name__)
# the raw opts schema is given as first arg
xs = meta_task.option_schemas[:-1]
xs.pop(0)
xs.insert(0, '$opts')
_add_di_nodes(meta_task.option_schemas[:-1], to_ropts_fname)
elif isinstance(meta_task.option_schemas, dict):
# no DI
pass
else:
raise ValueError("Malformed option DI. '{x}'".format(x=meta_task.option_schemas))
# add option dependencies
g.add_node('$opts')
g.add_node('$opts_schema')
g.add_node('$ropts', style="filled", fillcolor="aquamarine")
g.add_node(to_ropts_fname, shape="rectangle")
g.add_edge(to_ropts_fname, '$ropts')
g.add_edge('$opts', to_ropts_fname)
g.add_edge('$opts_schema', to_ropts_fname)
# add nproc
to_nproc_fname = 'to_nproc'
if isinstance(meta_task.nproc, (list, tuple)):
f = get_tail_func_or_raise(meta_task.nproc)
to_funcs['to_nproc'] = f
to_nproc_fname += "--{f}".format(f=f.__name__)
_add_di_nodes(meta_task.nproc[:-1], to_nproc_fname)
else:
if meta_task.nproc == '$max_nproc':
g.add_node('$max_nproc')
g.add_edge('$max_nproc', to_nproc_fname)
elif isinstance(meta_task.nproc, int):
g.add_node(meta_task.nproc)
g.add_edge(meta_task.nproc, to_nproc_fname)
else:
raise TypeError("expected primiative value for nproc. Got type {t} from {v}".format(t=type(meta_task.nproc), v=meta_task.nproc))
# add outputs of func to $nproc
g.add_node('$nproc', style="filled", fillcolor="aquamarine")
g.add_node(to_nproc_fname, shape="rectangle")
g.add_edge(to_nproc_fname, '$nproc')
to_nchunks_fname = "to_nchunks"
if isinstance(meta_task, MetaScatterTask):
if isinstance(meta_task.chunk_di, (list, tuple)):
f = get_tail_func_or_raise(meta_task.chunk_di)
to_funcs['to_nchunks'] = f
to_nchunks_fname += "--{f}".format(f=f.__name__)
_add_di_nodes(meta_task.chunk_di[:-1], to_nchunks_fname)
else:
# primitive value was given
if isinstance(meta_task.chunk_di, int):
raise TypeError("Expected primiative value for nchunks. Got type {t} for {v}".format(t=type(meta_task.chunk_di), v=meta_task.chunk_di))
# add nchunks specific node (e.g., not '1')
else:
log.warn("unexpected value for nproc. {v}".format(v=meta_task.nproc))
g.add_node(to_nchunks_fname, shape="rectangle")
g.add_node('$nchunks', style="filled", fillcolor="aquamarine")
g.add_edge(to_nchunks_fname, '$nchunks')
log.debug(to_funcs)
return g, to_funcs
def to_di_graph(meta_task):
g, _ = to_di_graph_with_funcs(meta_task)
return g
def get_report_di(meta_task):
"""
Get all DI models from meta task and extract the '$inputs' that have
report JSON
:param meta_task: MetaTask
Returns a list of (inputs index, attribute_id)
"""
report_dis = []
attr_names = ['option_schemas', 'nproc', 'task_type']
if isinstance(meta_task, MetaScatterTask):
attr_names.append('chunk_di')
for attr_name in attr_names:
value_or_list = getattr(meta_task, attr_name)
if is_di_list(value_or_list):
for v in value_or_list:
if is_dollar_value(v):
if v.startswith("$inputs."):
s = v.split('.')
if len(s) == 3:
report_inputs_index = int(s[1])
attr_id = s[-1]
file_type = meta_task.input_types[report_inputs_index]
if file_type != FileTypes.REPORT:
_d = dict(i=report_inputs_index, f=file_type, p=meta_task.input_types, r=FileTypes.REPORT, t=meta_task.task_id)
raise ValueError("Incompatible file type for file index {i} {f} for task id '{t}'. Expected '{r}' type. Inputs types {p}".format(**_d))
else:
report_dis.append((report_inputs_index, attr_id))
else:
raise ValueError("Malformed report input DI '{i}".format(i=v))
return report_dis
def is_di_list(alist_or_item):
if isinstance(alist_or_item, (list, tuple)):
if isinstance(list(alist_or_item)[-1], types.FunctionType):
return True
return False
def is_dollar_value(x_):
if isinstance(x_, str):
if x_.startswith('$'):
return True
return False
def resolver(meta_task, all_task_options, output_dir, max_nproc, max_nchunks):
"""
Converts a MetaTask to a Task instance
:rtype: Task
"""
v_to_resolve = '$opts $opts_schema $ropts $task_type $nproc'.split()
# these are resolved values
resolved_values = {'$opts': all_task_options,
'$opts_schema': {'my_task_option1': 7654},
'$max_nproc': max_nproc,
'$max_chunks': max_nchunks}
report_dis = get_report_di(meta_task)
if report_dis:
log.debug("Report Input DIs")
log.debug(report_dis)
for f_index, attr_name in report_dis:
k = '.'.join(['$inputs', str(f_index), attr_name])
v = get_mock_report_json_attribute('mock_file', attr_name)
resolved_values[k] = v
def _default_resolve_ropts(opts, schema):
return opts
def _default_resolve_nproc():
return 1
def _default_task_type():
return "pbsmrtpipe.constants.distributed_task"
default_resolve_ropts = functools.partial(_default_resolve_ropts, resolved_values['$opts'], resolved_values['$opts_schema'])
# Default Resolution Functions. They have no args
default_funcs = {'to_ropts': default_resolve_ropts,
'to_nproc': _default_resolve_nproc,
'to_task_type': _default_task_type,
'to_nchunks': lambda: 5}
# the resolved func with update the resolved values with the value of dict
method_id_to_dollar_t = {'to_ropts': ('$ropts', meta_task.option_schemas),
'to_nproc': ('$nproc', meta_task.nproc),
'to_nchunks': ('$nchunks', 7),
'to_task_type': ('$task_type', meta_task.task_type)}
if isinstance(meta_task, MetaScatterTask):
method_id_to_dollar_t['to_nchunks'] = ('$nchunks', meta_task.chunk_di)
def get_method_id_prefix(s):
if isinstance(s, str):
for to_x in method_id_to_dollar_t.keys():
if s.startswith(to_x):
return to_x
return False
def begins_with_method_prefix(s):
return get_method_id_prefix(s) is not False
g = to_di_graph(meta_task)
nodes = nx.topological_sort(g)
log.info(pprint.pformat(nodes))
for node in nodes:
log.debug("Trying to resolve '{r}'".format(r=node))
if node in resolved_values:
# nothing to do here
log.debug("Value {n} has been resolved. '{v}'".format(n=node, v=resolved_values[node]))
else:
# Are we using default funcs to resolve values
if begins_with_method_prefix(node):
method_prefix = get_method_id_prefix(node)
dollar_key, di_values = method_id_to_dollar_t[method_prefix]
if is_di_list(di_values):
f = get_tail_func_or_raise(di_values)
injectable = []
for x in di_values[:-1]:
if is_dollar_value(x):
if x in resolved_values:
v = resolved_values[x]
injectable.append(v)
else:
raise ValueError("$ value '{x}' not resolved.".format(x=x))
else:
injectable.append(x)
log.debug(f.__name__)
log.debug(injectable)
# this should do an argsinpsect. Putting a TypeError try/catch
# could be misleading
value = f(*injectable)
log.info("resolved '{k}' -> '{v}'".format(k=dollar_key, v=value))
resolved_values[dollar_key] = value
else:
# use default value, it was supplied as a primitive
f = default_funcs[method_prefix]
value = f()
log.info("resolved '{k}' -> '{v}' (default resolution)".format(k=dollar_key, v=value))
resolved_values[dollar_key] = value
else:
msg = "potentially unsupported value '{n}'".format(n=node)
#log.warn(msg)
#raise ValueError(msg)
# Sanity Check to make sure required values are resolved
for x in v_to_resolve:
if x not in resolved_values:
raise ValueError("{x} was never resolved. Resolved keys {k}".format(x=x, k=resolved_values.keys()))
if meta_task.resource_types:
rfiles = resolve_di_resources(output_dir, meta_task.resource_types)
else:
rfiles = ()
ifiles = resolve_files_with_defaults(output_dir, meta_task.input_types)
ofiles = resolve_files_with_defaults(output_dir, meta_task.output_types)
cmd_str = meta_task.to_cmd(ifiles, ofiles, resolved_values['$ropts'], resolved_values['$nproc'], rfiles)
if isinstance(meta_task, MetaScatterTask):
t = ScatterTask(meta_task.task_id, resolved_values['$task_type'], ifiles, ofiles, resolved_values['$ropts'], resolved_values['$nproc'], rfiles, cmd_str, resolved_values['$nchunks'])
else:
t = Task(meta_task.task_id, resolved_values['$task_type'], ifiles, ofiles, resolved_values['$ropts'], resolved_values['$nproc'], rfiles, cmd_str)
log.info(t.__dict__)
#return resolved_values
return t
def test_resolver_with_task(t):
opts_ = {'my_task_options1': 54321,
'max_chunks': 6,
'my_task_option_id': 9876}
output_dir = tempfile.mkdtemp()
max_nproc = 17
max_nchunks = 9
log.info("Trying to resolve {t}".format(t=t))
r_values = resolver(t, opts_, output_dir, max_nproc, max_nchunks)
log.info(r_values)
log.info("Successfully resolved {t}".format(t=t))
return r_values
def test_resolver():
return {t: test_resolver_with_task(t) for t in REGISTERED_TASKS.values()}
# Global Registry
REGISTERED_TASKS = {}
def to_list_if_necessary(tuple_or_s):
if isinstance(tuple_or_s, tuple):
return list(tuple_or_s)
return tuple_or_s
def is_valid_task_id(task_id):
if isinstance(task_id, str):
return task_id.startswith('pbsmrtpipe.tasks.')
return False
def is_valid_task_id_or_raise(task_id):
if is_valid_task_id(task_id):
return True
raise ValueError("Malformed task id '{i}'".format(i=task_id))
def register_task(task_id, task_type, input_types, output_types, opt_schema, nproc, resource_types, output_file_names=None):
# this is a bit sloppy
def f(func):
def _(*args, **kwargs):
return args, kwargs
to_f = to_list_if_necessary
r = resource_types if isinstance(resource_types, (tuple, list)) else (resource_types, )
# need to copy schema opts (mutables in function)
sopts_copy = copy.deepcopy(to_f(opt_schema))
t = MetaTask(task_id, to_f(task_type), input_types, output_types, sopts_copy, to_f(nproc), r, func)
log.info(t)
if task_id in REGISTERED_TASKS:
raise KeyError("Task id {i} is already registered.".format(i=task_id))
else:
REGISTERED_TASKS[task_id] = t
return t
return f
def register_scatter_task(task_id, task_type, input_types, output_types, opt_schema, nproc, resource_types, nchunks, output_file_names=None):
def f(func):
def _(*args, **kwargs):
return args, kwargs
to_f = to_list_if_necessary
r = resource_types if isinstance(resource_types, (tuple, list)) else (resource_types, )
# need to copy schema opts (mutables in function)
sopts_copy = copy.deepcopy(to_f(opt_schema))
t = MetaScatterTask(task_id, to_f(task_type), input_types, output_types, sopts_copy, to_f(nproc), r, func, to_f(nchunks))
log.info(t)
if task_id in REGISTERED_TASKS:
raise KeyError("Task id {i} is already registered.".format(i=task_id))
else:
REGISTERED_TASKS[task_id] = t
return t
return f
@register_task('pbsmrtpipe.tasks.simple_task_01',
TaskTypes.LOCAL,
(FileTypes.FASTA, ),
(FileTypes.REPORT, ), {}, 1, ())
def to_cmd(input_files, output_files, resolved_opts, nproc, resources):
"""Simple Hello world task. fasta -> report """
_d = dict(i=input_files[0], o=output_files[0], n=nproc)
return "my-simple-cmd.sh --nproc {n} {i} {o}".format(**_d)
@register_task('pbsmrtpipe.tasks.my_task_id',
TaskTypes.LOCAL,
(FileTypes.FASTA, FileTypes.RGN_FOFN),
(FileTypes.ALIGNMENT_CMP_H5, ),
opts,
one_proc,
(ResourceTypes.TMP_DIR, ResourceTypes.LOG_FILE),
output_file_names=(('my_awesome_alignments', 'cmp.h5'), ))
def to_cmd(input_files, output_files, resolved_opts, nproc, resources):
"""
Simple Example Task
"""
my_tmp_dir = resources[0]
_d = dict(e="my_exe.sh", l=resources[1], t=my_tmp_dir, o=resolved_opts['my_task_option_id'], i=input_files[0], f=input_files[1], r=output_files[0], n=nproc)
return "{e} --nproc={n} --tmp={t} --log={l} --my-option={o} fasta:{i} --region={f} --output-report={r}".format(**_d)
@register_task('pbsmrtpipe.tasks.task_id2',
TaskTypes.DISTRIBUTED,
(FileTypes.VCF, ),
(FileTypes.REPORT, ),
opts,
'$max_nproc',
(ResourceTypes.TMP_DIR, ResourceTypes.TMP_FILE, ResourceTypes.TMP_FILE))
def to_cmd(input_files, output_files, resolved_opts, nproc, resources):
"""
Note: Multiple 'resources' of the same type can be provided.
"""
_d = dict(e="my_exe.sh")
return "{e}".format(**_d)
def compute_nproc(global_nproc, resolved_opts):
return global_nproc / 2
def compute_task_type(opts):
"""This must return pbsmrtpipe.constants.{local_task,distributed_task}"""
return "pbsmrtpipe.constants.local_task"
@register_task('pbsmrtpipe.tasks.task_id8',
('$ropts', compute_task_type),
(FileTypes.FASTA, FileTypes.REPORT),
(FileTypes.VCF, ),
opts,
('$max_nproc', '$ropts', compute_nproc),
(ResourceTypes.TMP_DIR, ResourceTypes.TMP_FILE, ResourceTypes.TMP_FILE))
def to_cmd(input_files, output_files, resolved_opts, nproc, resources):
"""
Note: Set nproc via dependency injection based on $max_nproc,
The nproc DI list (x) is translated to x[-1](*x[:-1])
Compute the task type based on the options
"""
_d = dict(e="my_exe.sh")
return "{e}".format(**_d)
def my_nproc_func(global_nproc, opts):
return 12
def my_custom_validator(resolved_opts, a, b, c):
"""Returns resolved option dict or raises an exception.
This is just illustrating that the DI is blindly passing values. if there's a $X 'special' value, then
this will be injected. But passing numbers will work as well.
"""
return resolved_opts
@register_task('pbsmrtpipe.tasks.task_id3',
TaskTypes.DISTRIBUTED,
(FileTypes.MOVIE_FOFN, FileTypes.RGN_FOFN),
(FileTypes.FASTA, FileTypes.FASTQ),
(opts, 1, 2, 3, my_custom_validator),
('$max_nproc', '$ropts', my_nproc_func),
(ResourceTypes.TMP_FILE,),
output_file_names=(('my_file', 'fasta'), ('my_f2', 'fastq')))
def to_cmd(input_files, output_files, resolved_opts, nproc, resources):
"""
Let's set nproc to be dependent on the resolved options and $max_nproc
Note: '$opts' is the resolved options, whereas 'opts' is the {option_id:JsonSchema}
Need to think this over a bit.
"""
_d = dict(e="my_exe.sh")
return "{e}".format(**_d)
def dyn_opts_func(opts, my_option_01):
"""Return a dict of resolved opts
Are these the resolve opts that are passed in?
"""
# override option
opts['my_option_01'] = my_option_01
return opts
def compute_task_type(opts, attr_value):
return "pbsmrtpipe.constants.distributed_task"
def _my_nproc(global_nproc, resolved_opts):
return global_nproc / 3
@register_task('pbsmrtpipe.tasks.task_id4',
('$ropts', '$inputs.0.attr_id', compute_task_type),
(FileTypes.REPORT, FileTypes.FASTA),
(FileTypes.ALIGNMENT_CMP_H5,),
(opts, '$inputs.0.attr_id', dyn_opts_func),
('$max_nproc', '$ropts', _my_nproc),
(ResourceTypes.TMP_FILE, ),
output_file_names=(('my_file', 'cmp.h5'), ))
def to_cmd(input_files, output_files, resolved_opts, nproc, resources):
"""
Example of dynamically passing values computed at runtime from an previous task via a pbreport.
Map the first input file (must be a report type)
$inputs.0 -> ft.report_type_id
And looks for 'attr_id' in the Attribute section of report
Can compute if the job should be submitted to the queue via ['$opts', '$inputs.0.attr_id', compute_task_type]
"""
_d = dict(e="my_exe.sh")
return "{e}".format(**_d)
def nchunks_func(nmovies, resolved_opts, resolved_nproc):
max_chunks = resolved_opts['max_chunks']
return min(int(nmovies), max_chunks)
@register_scatter_task('pbsmrtpipe.tasks.task_id5',
'pbsmrtpipe.constants.distributed_task',
(FileTypes.REPORT, FileTypes.MOVIE_FOFN),
(FileTypes.RGN_FOFN, ),
opts,
('$max_nproc', '$ropts', _my_nproc),
(ResourceTypes.OUTPUT_DIR, ),
('$inputs.0.attr_id', '$ropts', '$nproc', nchunks_func),
output_file_names=(('my_rgn_movie', 'fofn'), ))
def to_cmd(input_files, output_files, resolved_opts, nproc, resources):
"""
Scatter Tasks extend the standard task and include a 7th DI mechanism which will set $nchunks which can be used at the workflow level.
$nchunks is only communicated to the workflow level for proper graph construction, therefore it's not included in the to_cmd signature.
For example, if $nchunks is set to 3, then outFiles will have the ['/path/to/chunk1', '/path/to/chunk2', '/path/to/chunk3']
This yields a slightly odd API from a commandline. my_exe.sh input.fofn --output '/path/to/chunk1' '/path/to/chunk2' '/path/to/chunk3'
Is this a more nature commandline API would be my_exe.sh input.fofn --output-prefix="chunk_" --output-dir=/path/to --nchunks=3
"""
_d = dict(e="my_exe.sh")
return "{e}".format(**_d)
def test():
resolved_opts = {'my_task_option_id': 12345}
my_nproc = 7
t = REGISTERED_TASKS['pbsmrtpipe.tasks.my_task_id']
cmd = t.to_cmd(['/path/to/input1', '/path/to/input2'], ['/path/to/output1'], resolved_opts, my_nproc, ['/path/totmpe1', '/path/to/tmp2'])
log.debug(t)
log.debug(cmd)
return t, cmd
def main():
logging.basicConfig(level=logging.DEBUG)
log.info(pprint.pformat(REGISTERED_TASKS))
test()
meta_tasks = REGISTERED_TASKS.values()
for meta_task in meta_tasks:
log.info(pprint.pformat(meta_task.__dict__))
for i, meta_task in REGISTERED_TASKS.iteritems():
log.info(i)
g = to_di_graph(meta_task)
log.info(i)
log.info(g)
fname = i + '.dot'
nx.write_dot(g, fname)
return 0
if __name__ == '__main__':
sys.exit(main())
Display the source blob
Display the rendered blob
Raw
{
"metadata": {
"name": "",
"signature": "sha256:ce44528ea04c7b9b1865fe3fafa64b243dc1ece67d1f60140281fe38bca1c415"
},
"nbformat": 3,
"nbformat_minor": 0,
"worksheets": [
{
"cells": [
{
"cell_type": "code",
"collapsed": false,
"input": [
"import di_task_api as X\n",
"# utils to get the graphs to display inline\n",
"import toolbook as tb\n",
"import pprint"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 1
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Get Registered MetaTasks"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"# not named very well\n",
"# the @register_task deco creates an instance of MetaTask and registers it to the meta task registry.\n",
"X.REGISTERED_TASKS"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 2,
"text": [
"{'pbsmrtpipe.tasks.my_task_id': <MetaTask id:pbsmrtpipe.tasks.my_task_id inputs:2 outputs:1 resources:2 >,\n",
" 'pbsmrtpipe.tasks.simple_task_01': <MetaTask id:pbsmrtpipe.tasks.simple_task_01 inputs:1 outputs:1 resources:0 >,\n",
" 'pbsmrtpipe.tasks.task_id2': <MetaTask id:pbsmrtpipe.tasks.task_id2 inputs:1 outputs:1 resources:3 >,\n",
" 'pbsmrtpipe.tasks.task_id3': <MetaTask id:pbsmrtpipe.tasks.task_id3 inputs:2 outputs:2 resources:1 >,\n",
" 'pbsmrtpipe.tasks.task_id4': <MetaTask id:pbsmrtpipe.tasks.task_id4 inputs:2 outputs:1 resources:1 >,\n",
" 'pbsmrtpipe.tasks.task_id5': <MetaScatterTask id:pbsmrtpipe.tasks.task_id5 inputs:2 outputs:1 resources:1 >,\n",
" 'pbsmrtpipe.tasks.task_id8': <MetaTask id:pbsmrtpipe.tasks.task_id8 inputs:2 outputs:1 resources:3 >}"
]
}
],
"prompt_number": 2
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Test/Mock method to convert MetaTask -> Task instances. A Task instance has all the options, files, nproc, cmd strings resolved."
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"# this is named very poorly\n",
"# this is a simple function that takes all the registered meta tasks and resolves (using mocks) to Task instances.\n",
"# this returns a {MetaTask:Task} dict\n",
"X.test_resolver()"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 3,
"text": [
"{<MetaScatterTask id:pbsmrtpipe.tasks.task_id5 inputs:2 outputs:1 resources:1 >: <ScatterTask id:pbsmrtpipe.tasks.task_id5 inputs:2 outputs:1 resources:1 nproc:5 nchunks:3 >,\n",
" <MetaTask id:pbsmrtpipe.tasks.simple_task_01 inputs:1 outputs:1 resources:0 >: <Task id:pbsmrtpipe.tasks.simple_task_01 inputs:1 outputs:1 resources:0 nproc:1 >,\n",
" <MetaTask id:pbsmrtpipe.tasks.my_task_id inputs:2 outputs:1 resources:2 >: <Task id:pbsmrtpipe.tasks.my_task_id inputs:2 outputs:1 resources:2 nproc:1 >,\n",
" <MetaTask id:pbsmrtpipe.tasks.task_id2 inputs:1 outputs:1 resources:3 >: <Task id:pbsmrtpipe.tasks.task_id2 inputs:1 outputs:1 resources:3 nproc:1 >,\n",
" <MetaTask id:pbsmrtpipe.tasks.task_id8 inputs:2 outputs:1 resources:3 >: <Task id:pbsmrtpipe.tasks.task_id8 inputs:2 outputs:1 resources:3 nproc:8 >,\n",
" <MetaTask id:pbsmrtpipe.tasks.task_id3 inputs:2 outputs:2 resources:1 >: <Task id:pbsmrtpipe.tasks.task_id3 inputs:2 outputs:2 resources:1 nproc:12 >,\n",
" <MetaTask id:pbsmrtpipe.tasks.task_id4 inputs:2 outputs:1 resources:1 >: <Task id:pbsmrtpipe.tasks.task_id4 inputs:2 outputs:1 resources:1 nproc:5 >}"
]
}
],
"prompt_number": 3
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"def print_summary(mtask_task_d):\n",
" \"\"\"\n",
" Simple util func print the summary \n",
" \n",
" mtask_task_d ={MetaTask:Task} dict\n",
" \"\"\"\n",
" \n",
" header = \"*\" * 10\n",
" for meta_task, task in mtask_task_d.iteritems():\n",
" g = X.to_di_graph(meta_task)\n",
" \n",
" print header + \" MetaTask task_id '{i}'\".format(i=meta_task.task_id) + header\n",
" print pprint.pformat(meta_task.__dict__)\n",
" print \n",
" print header + \" Task {i}\".format(i=task.task_id) + header\n",
" print pprint.pformat(task.__dict__)\n",
" tb.display_networkx_graph(g)"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 4
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#Legend and Dependency Injection Resolving Examples\n",
"\n",
"The rectangles are functions that will return resolved values or pass resolved values to other resolving functions from the dependency injection model. \n",
"\n",
"Custom functions are prefixed with the base function name 'to_X' and joined with '--', For example, `to_nproc--my_custom_nproc_func`.\n",
"\n",
"Every MetaTask must resolve \\$ropts, \\$nproc, and \\$task_type. ScatterMetaTasks must also resolve \\$nchunks. This are represented in **aqua** and are **communicated back to the workflow level**. This provides a very clean separation of task to_cmd and solid interface to communicate how to compute values (e.g, nproc).\n",
"\n",
"- \\$opts_schema is task option schema dict with signatuare {option_id : Schema}\n",
"- \\$opts are the supplied options\n",
"- \\$ropts are the 'resolved' options (\\$opts processed via \\$opts_schema)\n",
"- \\$global_nproc is the max number of processors that a task can use (this is pulled from pbsmrtpipe.options.max_nproc)\n",
"- \\$ropts respensents the Resolved options from the DI\n",
"- \\$task_type is an enum of pbsmrtpipe.constants.{local_task,distributed_task}\n",
"- \\$inputs.0.my_attribute_id is mapping of the first input type (which must be a Report type) and the attribute (my_attribute_id) will be extracted and injected in the requested DI dependencies."
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"mtasks_to_tasks = X.test_resolver()\n",
"print_summary(mtasks_to_tasks)"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"********** MetaTask task_id 'pbsmrtpipe.tasks.task_id2'**********\n",
"{'cmd_func': <function to_cmd at 0x10dd43410>,\n",
" 'input_types': ('pbsmrtpipe.file.vcf',),\n",
" 'nproc': '$max_nproc',\n",
" 'option_schemas': {'my_task_option_id': {}},\n",
" 'output_types': ('pbsmrtpipe.file.report',),\n",
" 'resource_types': ('$tmpdir', '$tmpfile', '$tmpfile'),\n",
" 'task_id': 'pbsmrtpipe.tasks.task_id2',\n",
" 'task_type': 'pbsmrtpipe.constants.distributed_task'}\n",
"\n",
"********** Task pbsmrtpipe.tasks.task_id2**********\n",
"{'cmd': 'my_exe.sh',\n",
" 'input_files': ['/var/folders/_c/8_r02r4s7vl3kx1pt92sbcmr0000gn/T/tmpv0kIYd/file-3.vcf'],\n",
" 'nproc': 1,\n",
" 'output_files': ['/var/folders/_c/8_r02r4s7vl3kx1pt92sbcmr0000gn/T/tmpv0kIYd/report-9.json'],\n",
" 'resolved_options': {'max_chunks': 6,\n",
" 'my_task_option_id': 9876,\n",
" 'my_task_options1': 54321},\n",
" 'resources': ['/var/folders/_c/8_r02r4s7vl3kx1pt92sbcmr0000gn/T/tmpvH7wyt',\n",
" '/var/folders/_c/8_r02r4s7vl3kx1pt92sbcmr0000gn/T/tmphJA_rA.file',\n",
" '/var/folders/_c/8_r02r4s7vl3kx1pt92sbcmr0000gn/T/tmpKB3rf_.file'],\n",
" 'task_id': 'pbsmrtpipe.tasks.task_id2',\n",
" 'task_type': 'pbsmrtpipe.constants.distributed_task'}\n"
]
},
{
"metadata": {},
"output_type": "display_data",
"svg": [
"<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"no\"?>\n",
"<!DOCTYPE svg PUBLIC \"-//W3C//DTD SVG 1.1//EN\"\n",
" \"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd\">\n",
"<!-- Generated by graphviz version 2.38.0 (20140413.2041)\n",
" -->\n",
"<!-- Title: %3 Pages: 1 -->\n",
"<svg width=\"637pt\" height=\"188pt\"\n",
" viewBox=\"0.00 0.00 637.04 188.00\" xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\">\n",
"<g id=\"graph0\" class=\"graph\" transform=\"scale(1 1) rotate(0) translate(4 184)\">\n",
"<title>%3</title>\n",
"<polygon fill=\"white\" stroke=\"none\" points=\"-4,4 -4,-184 633.037,-184 633.037,4 -4,4\"/>\n",
"<!-- to_nproc -->\n",
"<g id=\"node1\" class=\"node\"><title>to_nproc</title>\n",
"<polygon fill=\"none\" stroke=\"black\" points=\"87.0772,-108 21.5452,-108 21.5452,-72 87.0772,-72 87.0772,-108\"/>\n",
"<text text-anchor=\"middle\" x=\"54.3112\" y=\"-85.8\" font-family=\"Times,serif\" font-size=\"14.00\">to_nproc</text>\n",
"</g>\n",
"<!-- $nproc -->\n",
"<g id=\"node9\" class=\"node\"><title>$nproc</title>\n",
"<ellipse fill=\"aquamarine\" stroke=\"black\" cx=\"54.3112\" cy=\"-18\" rx=\"34.8285\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"54.3112\" y=\"-13.8\" font-family=\"Times,serif\" font-size=\"14.00\">$nproc</text>\n",
"</g>\n",
"<!-- to_nproc&#45;&gt;$nproc -->\n",
"<g id=\"edge6\" class=\"edge\"><title>to_nproc&#45;&gt;$nproc</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M54.3112,-71.6966C54.3112,-63.9827 54.3112,-54.7125 54.3112,-46.1124\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"57.8113,-46.1043 54.3112,-36.1043 50.8113,-46.1044 57.8113,-46.1043\"/>\n",
"</g>\n",
"<!-- $max_nproc -->\n",
"<g id=\"node2\" class=\"node\"><title>$max_nproc</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"54.3112\" cy=\"-162\" rx=\"54.123\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"54.3112\" y=\"-157.8\" font-family=\"Times,serif\" font-size=\"14.00\">$max_nproc</text>\n",
"</g>\n",
"<!-- $max_nproc&#45;&gt;to_nproc -->\n",
"<g id=\"edge1\" class=\"edge\"><title>$max_nproc&#45;&gt;to_nproc</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M54.3112,-143.697C54.3112,-135.983 54.3112,-126.712 54.3112,-118.112\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"57.8113,-118.104 54.3112,-108.104 50.8113,-118.104 57.8113,-118.104\"/>\n",
"</g>\n",
"<!-- to_ropts -->\n",
"<g id=\"node3\" class=\"node\"><title>to_ropts</title>\n",
"<polygon fill=\"none\" stroke=\"black\" points=\"271.201,-108 209.421,-108 209.421,-72 271.201,-72 271.201,-108\"/>\n",
"<text text-anchor=\"middle\" x=\"240.311\" y=\"-85.8\" font-family=\"Times,serif\" font-size=\"14.00\">to_ropts</text>\n",
"</g>\n",
"<!-- $ropts -->\n",
"<g id=\"node6\" class=\"node\"><title>$ropts</title>\n",
"<ellipse fill=\"aquamarine\" stroke=\"black\" cx=\"240.311\" cy=\"-18\" rx=\"32.4324\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"240.311\" y=\"-13.8\" font-family=\"Times,serif\" font-size=\"14.00\">$ropts</text>\n",
"</g>\n",
"<!-- to_ropts&#45;&gt;$ropts -->\n",
"<g id=\"edge4\" class=\"edge\"><title>to_ropts&#45;&gt;$ropts</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M240.311,-71.6966C240.311,-63.9827 240.311,-54.7125 240.311,-46.1124\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"243.811,-46.1043 240.311,-36.1043 236.811,-46.1044 243.811,-46.1043\"/>\n",
"</g>\n",
"<!-- $opts_schema -->\n",
"<g id=\"node4\" class=\"node\"><title>$opts_schema</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"187.311\" cy=\"-162\" rx=\"60.4202\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"187.311\" y=\"-157.8\" font-family=\"Times,serif\" font-size=\"14.00\">$opts_schema</text>\n",
"</g>\n",
"<!-- $opts_schema&#45;&gt;to_ropts -->\n",
"<g id=\"edge2\" class=\"edge\"><title>$opts_schema&#45;&gt;to_ropts</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M200.141,-144.055C206.427,-135.753 214.126,-125.584 221.097,-116.378\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"223.985,-118.361 227.231,-108.275 218.404,-114.135 223.985,-118.361\"/>\n",
"</g>\n",
"<!-- $opts -->\n",
"<g id=\"node5\" class=\"node\"><title>$opts</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"294.311\" cy=\"-162\" rx=\"29.0429\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"294.311\" y=\"-157.8\" font-family=\"Times,serif\" font-size=\"14.00\">$opts</text>\n",
"</g>\n",
"<!-- $opts&#45;&gt;to_ropts -->\n",
"<g id=\"edge3\" class=\"edge\"><title>$opts&#45;&gt;to_ropts</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M282.327,-145.465C275.79,-136.991 267.532,-126.286 260.07,-116.613\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"262.66,-114.241 253.781,-108.461 257.118,-118.517 262.66,-114.241\"/>\n",
"</g>\n",
"<!-- to_task_type -->\n",
"<g id=\"node7\" class=\"node\"><title>to_task_type</title>\n",
"<polygon fill=\"none\" stroke=\"black\" points=\"528.857,-108 441.765,-108 441.765,-72 528.857,-72 528.857,-108\"/>\n",
"<text text-anchor=\"middle\" x=\"485.311\" y=\"-85.8\" font-family=\"Times,serif\" font-size=\"14.00\">to_task_type</text>\n",
"</g>\n",
"<!-- $task_type -->\n",
"<g id=\"node8\" class=\"node\"><title>$task_type</title>\n",
"<ellipse fill=\"aquamarine\" stroke=\"black\" cx=\"485.311\" cy=\"-18\" rx=\"48.3427\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"485.311\" y=\"-13.8\" font-family=\"Times,serif\" font-size=\"14.00\">$task_type</text>\n",
"</g>\n",
"<!-- to_task_type&#45;&gt;$task_type -->\n",
"<g id=\"edge5\" class=\"edge\"><title>to_task_type&#45;&gt;$task_type</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M485.311,-71.6966C485.311,-63.9827 485.311,-54.7125 485.311,-46.1124\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"488.811,-46.1043 485.311,-36.1043 481.811,-46.1044 488.811,-46.1043\"/>\n",
"</g>\n",
"<!-- pbsmrtpipe.constants.distributed_task -->\n",
"<g id=\"node10\" class=\"node\"><title>pbsmrtpipe.constants.distributed_task</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"485.311\" cy=\"-162\" rx=\"143.952\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"485.311\" y=\"-157.8\" font-family=\"Times,serif\" font-size=\"14.00\">pbsmrtpipe.constants.distributed_task</text>\n",
"</g>\n",
"<!-- pbsmrtpipe.constants.distributed_task&#45;&gt;to_task_type -->\n",
"<g id=\"edge7\" class=\"edge\"><title>pbsmrtpipe.constants.distributed_task&#45;&gt;to_task_type</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M485.311,-143.697C485.311,-135.983 485.311,-126.712 485.311,-118.112\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"488.811,-118.104 485.311,-108.104 481.811,-118.104 488.811,-118.104\"/>\n",
"</g>\n",
"</g>\n",
"</svg>\n"
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"********** MetaTask task_id 'pbsmrtpipe.tasks.task_id8'**********\n",
"{'cmd_func': <function to_cmd at 0x10dd43578>,\n",
" 'input_types': ('pbsmrtpipe.file.fasta', 'pbsmrtpipe.file.report'),\n",
" 'nproc': ['$max_nproc', '$ropts', <function compute_nproc at 0x10dd432a8>],\n",
" 'option_schemas': {'my_task_option_id': {}},\n",
" 'output_types': ('pbsmrtpipe.file.vcf',),\n",
" 'resource_types': ('$tmpdir', '$tmpfile', '$tmpfile'),\n",
" 'task_id': 'pbsmrtpipe.tasks.task_id8',\n",
" 'task_type': ['$ropts', <function compute_task_type at 0x10dd43488>]}\n",
"\n",
"********** Task pbsmrtpipe.tasks.task_id8**********\n",
"{'cmd': 'my_exe.sh',\n",
" 'input_files': ['/var/folders/_c/8_r02r4s7vl3kx1pt92sbcmr0000gn/T/tmpB2j_Bz/file-5.fasta',\n",
" '/var/folders/_c/8_r02r4s7vl3kx1pt92sbcmr0000gn/T/tmpB2j_Bz/report-5.json'],\n",
" 'nproc': 8,\n",
" 'output_files': ['/var/folders/_c/8_r02r4s7vl3kx1pt92sbcmr0000gn/T/tmpB2j_Bz/file-2.vcf'],\n",
" 'resolved_options': {'max_chunks': 6,\n",
" 'my_task_option_id': 9876,\n",
" 'my_task_options1': 54321},\n",
" 'resources': ['/var/folders/_c/8_r02r4s7vl3kx1pt92sbcmr0000gn/T/tmpejfy8P',\n",
" '/var/folders/_c/8_r02r4s7vl3kx1pt92sbcmr0000gn/T/tmpYVz4L0.file',\n",
" '/var/folders/_c/8_r02r4s7vl3kx1pt92sbcmr0000gn/T/tmp_azWQk.file'],\n",
" 'task_id': 'pbsmrtpipe.tasks.task_id8',\n",
" 'task_type': 'pbsmrtpipe.constants.local_task'}\n"
]
},
{
"metadata": {},
"output_type": "display_data",
"svg": [
"<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"no\"?>\n",
"<!DOCTYPE svg PUBLIC \"-//W3C//DTD SVG 1.1//EN\"\n",
" \"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd\">\n",
"<!-- Generated by graphviz version 2.38.0 (20140413.2041)\n",
" -->\n",
"<!-- Title: %3 Pages: 1 -->\n",
"<svg width=\"296pt\" height=\"332pt\"\n",
" viewBox=\"0.00 0.00 296.36 332.00\" xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\">\n",
"<g id=\"graph0\" class=\"graph\" transform=\"scale(1 1) rotate(0) translate(4 328)\">\n",
"<title>%3</title>\n",
"<polygon fill=\"white\" stroke=\"none\" points=\"-4,4 -4,-328 292.357,-328 292.357,4 -4,4\"/>\n",
"<!-- to_nproc&#45;&#45;compute_nproc -->\n",
"<g id=\"node1\" class=\"node\"><title>to_nproc&#45;&#45;compute_nproc</title>\n",
"<polygon fill=\"none\" stroke=\"black\" points=\"162.259,-108 -0.086518,-108 -0.086518,-72 162.259,-72 162.259,-108\"/>\n",
"<text text-anchor=\"middle\" x=\"81.0864\" y=\"-85.8\" font-family=\"Times,serif\" font-size=\"14.00\">to_nproc&#45;&#45;compute_nproc</text>\n",
"</g>\n",
"<!-- $nproc -->\n",
"<g id=\"node6\" class=\"node\"><title>$nproc</title>\n",
"<ellipse fill=\"aquamarine\" stroke=\"black\" cx=\"81.0864\" cy=\"-18\" rx=\"34.8285\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"81.0864\" y=\"-13.8\" font-family=\"Times,serif\" font-size=\"14.00\">$nproc</text>\n",
"</g>\n",
"<!-- to_nproc&#45;&#45;compute_nproc&#45;&gt;$nproc -->\n",
"<g id=\"edge4\" class=\"edge\"><title>to_nproc&#45;&#45;compute_nproc&#45;&gt;$nproc</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M81.0864,-71.6966C81.0864,-63.9827 81.0864,-54.7125 81.0864,-46.1124\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"84.5865,-46.1043 81.0864,-36.1043 77.5865,-46.1044 84.5865,-46.1043\"/>\n",
"</g>\n",
"<!-- $max_nproc -->\n",
"<g id=\"node2\" class=\"node\"><title>$max_nproc</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"81.0864\" cy=\"-162\" rx=\"54.123\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"81.0864\" y=\"-157.8\" font-family=\"Times,serif\" font-size=\"14.00\">$max_nproc</text>\n",
"</g>\n",
"<!-- $max_nproc&#45;&gt;to_nproc&#45;&#45;compute_nproc -->\n",
"<g id=\"edge1\" class=\"edge\"><title>$max_nproc&#45;&gt;to_nproc&#45;&#45;compute_nproc</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M81.0864,-143.697C81.0864,-135.983 81.0864,-126.712 81.0864,-118.112\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"84.5865,-118.104 81.0864,-108.104 77.5865,-118.104 84.5865,-118.104\"/>\n",
"</g>\n",
"<!-- to_ropts -->\n",
"<g id=\"node3\" class=\"node\"><title>to_ropts</title>\n",
"<polygon fill=\"none\" stroke=\"black\" points=\"235.976,-252 174.197,-252 174.197,-216 235.976,-216 235.976,-252\"/>\n",
"<text text-anchor=\"middle\" x=\"205.086\" y=\"-229.8\" font-family=\"Times,serif\" font-size=\"14.00\">to_ropts</text>\n",
"</g>\n",
"<!-- $ropts -->\n",
"<g id=\"node7\" class=\"node\"><title>$ropts</title>\n",
"<ellipse fill=\"aquamarine\" stroke=\"black\" cx=\"205.086\" cy=\"-162\" rx=\"32.4324\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"205.086\" y=\"-157.8\" font-family=\"Times,serif\" font-size=\"14.00\">$ropts</text>\n",
"</g>\n",
"<!-- to_ropts&#45;&gt;$ropts -->\n",
"<g id=\"edge5\" class=\"edge\"><title>to_ropts&#45;&gt;$ropts</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M205.086,-215.697C205.086,-207.983 205.086,-198.712 205.086,-190.112\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"208.587,-190.104 205.086,-180.104 201.587,-190.104 208.587,-190.104\"/>\n",
"</g>\n",
"<!-- $opts_schema -->\n",
"<g id=\"node4\" class=\"node\"><title>$opts_schema</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"152.086\" cy=\"-306\" rx=\"60.4202\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"152.086\" y=\"-301.8\" font-family=\"Times,serif\" font-size=\"14.00\">$opts_schema</text>\n",
"</g>\n",
"<!-- $opts_schema&#45;&gt;to_ropts -->\n",
"<g id=\"edge2\" class=\"edge\"><title>$opts_schema&#45;&gt;to_ropts</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M164.916,-288.055C171.202,-279.753 178.902,-269.584 185.872,-260.378\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"188.761,-262.361 192.007,-252.275 183.18,-258.135 188.761,-262.361\"/>\n",
"</g>\n",
"<!-- $opts -->\n",
"<g id=\"node5\" class=\"node\"><title>$opts</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"259.086\" cy=\"-306\" rx=\"29.0429\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"259.086\" y=\"-301.8\" font-family=\"Times,serif\" font-size=\"14.00\">$opts</text>\n",
"</g>\n",
"<!-- $opts&#45;&gt;to_ropts -->\n",
"<g id=\"edge3\" class=\"edge\"><title>$opts&#45;&gt;to_ropts</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M247.102,-289.465C240.565,-280.991 232.307,-270.286 224.845,-260.613\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"227.436,-258.241 218.556,-252.461 221.893,-262.517 227.436,-258.241\"/>\n",
"</g>\n",
"<!-- $ropts&#45;&gt;to_nproc&#45;&#45;compute_nproc -->\n",
"<g id=\"edge7\" class=\"edge\"><title>$ropts&#45;&gt;to_nproc&#45;&#45;compute_nproc</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M183.203,-148.647C165.882,-138.868 141.239,-124.957 120.501,-113.25\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"122.017,-110.087 111.588,-108.218 118.575,-116.182 122.017,-110.087\"/>\n",
"</g>\n",
"<!-- to_task_type -->\n",
"<g id=\"node8\" class=\"node\"><title>to_task_type</title>\n",
"<polygon fill=\"none\" stroke=\"black\" points=\"267.633,-108 180.54,-108 180.54,-72 267.633,-72 267.633,-108\"/>\n",
"<text text-anchor=\"middle\" x=\"224.086\" y=\"-85.8\" font-family=\"Times,serif\" font-size=\"14.00\">to_task_type</text>\n",
"</g>\n",
"<!-- $ropts&#45;&gt;to_task_type -->\n",
"<g id=\"edge8\" class=\"edge\"><title>$ropts&#45;&gt;to_task_type</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M209.686,-144.055C211.801,-136.261 214.363,-126.822 216.736,-118.079\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"220.156,-118.843 219.397,-108.275 213.4,-117.009 220.156,-118.843\"/>\n",
"</g>\n",
"<!-- $task_type -->\n",
"<g id=\"node9\" class=\"node\"><title>$task_type</title>\n",
"<ellipse fill=\"aquamarine\" stroke=\"black\" cx=\"224.086\" cy=\"-18\" rx=\"48.3427\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"224.086\" y=\"-13.8\" font-family=\"Times,serif\" font-size=\"14.00\">$task_type</text>\n",
"</g>\n",
"<!-- to_task_type&#45;&gt;$task_type -->\n",
"<g id=\"edge6\" class=\"edge\"><title>to_task_type&#45;&gt;$task_type</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M224.086,-71.6966C224.086,-63.9827 224.086,-54.7125 224.086,-46.1124\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"227.587,-46.1043 224.086,-36.1043 220.587,-46.1044 227.587,-46.1043\"/>\n",
"</g>\n",
"</g>\n",
"</svg>\n"
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"********** MetaTask task_id 'pbsmrtpipe.tasks.task_id3'**********\n",
"{'cmd_func': <function to_cmd at 0x10dd436e0>,\n",
" 'input_types': ('pbsmrtpipe.file.movie_fofn', 'pbsmrtpipe.file.rgn_fofn'),\n",
" 'nproc': ['$max_nproc', '$ropts', <function my_nproc_func at 0x10dd43500>],\n",
" 'option_schemas': [{'my_task_option_id': {}},\n",
" 1,\n",
" 2,\n",
" 3,\n",
" <function my_custom_validator at 0x10dd435f0>],\n",
" 'output_types': ('pbsmrtpipe.file.fasta', 'pbsmrtpipe.file.fastq'),\n",
" 'resource_types': ('$tmpfile',),\n",
" 'task_id': 'pbsmrtpipe.tasks.task_id3',\n",
" 'task_type': 'pbsmrtpipe.constants.distributed_task'}\n",
"\n",
"********** Task pbsmrtpipe.tasks.task_id3**********\n",
"{'cmd': 'my_exe.sh',\n",
" 'input_files': ['/var/folders/_c/8_r02r4s7vl3kx1pt92sbcmr0000gn/T/tmpPLkmQm/movie-3.fofn',\n",
" '/var/folders/_c/8_r02r4s7vl3kx1pt92sbcmr0000gn/T/tmpPLkmQm/rgn-4.fofn'],\n",
" 'nproc': 12,\n",
" 'output_files': ['/var/folders/_c/8_r02r4s7vl3kx1pt92sbcmr0000gn/T/tmpPLkmQm/file-8.fasta',\n",
" '/var/folders/_c/8_r02r4s7vl3kx1pt92sbcmr0000gn/T/tmpPLkmQm/file-1.fastq'],\n",
" 'resolved_options': {'my_task_option_id': {}},\n",
" 'resources': ['/var/folders/_c/8_r02r4s7vl3kx1pt92sbcmr0000gn/T/tmpzs0bui.file'],\n",
" 'task_id': 'pbsmrtpipe.tasks.task_id3',\n",
" 'task_type': 'pbsmrtpipe.constants.distributed_task'}\n"
]
},
{
"metadata": {},
"output_type": "display_data",
"svg": [
"<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"no\"?>\n",
"<!DOCTYPE svg PUBLIC \"-//W3C//DTD SVG 1.1//EN\"\n",
" \"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd\">\n",
"<!-- Generated by graphviz version 2.38.0 (20140413.2041)\n",
" -->\n",
"<!-- Title: %3 Pages: 1 -->\n",
"<svg width=\"726pt\" height=\"332pt\"\n",
" viewBox=\"0.00 0.00 725.94 332.00\" xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\">\n",
"<g id=\"graph0\" class=\"graph\" transform=\"scale(1 1) rotate(0) translate(4 328)\">\n",
"<title>%3</title>\n",
"<polygon fill=\"white\" stroke=\"none\" points=\"-4,4 -4,-328 721.936,-328 721.936,4 -4,4\"/>\n",
"<!-- to_nproc&#45;&#45;my_nproc_func -->\n",
"<g id=\"node1\" class=\"node\"><title>to_nproc&#45;&#45;my_nproc_func</title>\n",
"<polygon fill=\"none\" stroke=\"black\" points=\"269.941,-108 106.478,-108 106.478,-72 269.941,-72 269.941,-108\"/>\n",
"<text text-anchor=\"middle\" x=\"188.21\" y=\"-85.8\" font-family=\"Times,serif\" font-size=\"14.00\">to_nproc&#45;&#45;my_nproc_func</text>\n",
"</g>\n",
"<!-- $nproc -->\n",
"<g id=\"node12\" class=\"node\"><title>$nproc</title>\n",
"<ellipse fill=\"aquamarine\" stroke=\"black\" cx=\"188.21\" cy=\"-18\" rx=\"34.8285\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"188.21\" y=\"-13.8\" font-family=\"Times,serif\" font-size=\"14.00\">$nproc</text>\n",
"</g>\n",
"<!-- to_nproc&#45;&#45;my_nproc_func&#45;&gt;$nproc -->\n",
"<g id=\"edge9\" class=\"edge\"><title>to_nproc&#45;&#45;my_nproc_func&#45;&gt;$nproc</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M188.21,-71.6966C188.21,-63.9827 188.21,-54.7125 188.21,-46.1124\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"191.71,-46.1043 188.21,-36.1043 184.71,-46.1044 191.71,-46.1043\"/>\n",
"</g>\n",
"<!-- $max_nproc -->\n",
"<g id=\"node2\" class=\"node\"><title>$max_nproc</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"136.21\" cy=\"-162\" rx=\"54.123\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"136.21\" y=\"-157.8\" font-family=\"Times,serif\" font-size=\"14.00\">$max_nproc</text>\n",
"</g>\n",
"<!-- $max_nproc&#45;&gt;to_nproc&#45;&#45;my_nproc_func -->\n",
"<g id=\"edge1\" class=\"edge\"><title>$max_nproc&#45;&gt;to_nproc&#45;&#45;my_nproc_func</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M148.533,-144.411C154.695,-136.115 162.291,-125.891 169.181,-116.615\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"172.096,-118.561 175.249,-108.447 166.476,-114.387 172.096,-118.561\"/>\n",
"</g>\n",
"<!-- to_ropts&#45;&#45;my_custom_validator -->\n",
"<g id=\"node3\" class=\"node\"><title>to_ropts&#45;&#45;my_custom_validator</title>\n",
"<polygon fill=\"none\" stroke=\"black\" points=\"338.014,-252 144.406,-252 144.406,-216 338.014,-216 338.014,-252\"/>\n",
"<text text-anchor=\"middle\" x=\"241.21\" y=\"-229.8\" font-family=\"Times,serif\" font-size=\"14.00\">to_ropts&#45;&#45;my_custom_validator</text>\n",
"</g>\n",
"<!-- $ropts -->\n",
"<g id=\"node6\" class=\"node\"><title>$ropts</title>\n",
"<ellipse fill=\"aquamarine\" stroke=\"black\" cx=\"241.21\" cy=\"-162\" rx=\"32.4324\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"241.21\" y=\"-157.8\" font-family=\"Times,serif\" font-size=\"14.00\">$ropts</text>\n",
"</g>\n",
"<!-- to_ropts&#45;&#45;my_custom_validator&#45;&gt;$ropts -->\n",
"<g id=\"edge4\" class=\"edge\"><title>to_ropts&#45;&#45;my_custom_validator&#45;&gt;$ropts</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M241.21,-215.697C241.21,-207.983 241.21,-198.712 241.21,-190.112\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"244.71,-190.104 241.21,-180.104 237.71,-190.104 244.71,-190.104\"/>\n",
"</g>\n",
"<!-- $opts_schema -->\n",
"<g id=\"node4\" class=\"node\"><title>$opts_schema</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"60.2097\" cy=\"-306\" rx=\"60.4202\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"60.2097\" y=\"-301.8\" font-family=\"Times,serif\" font-size=\"14.00\">$opts_schema</text>\n",
"</g>\n",
"<!-- $opts_schema&#45;&gt;to_ropts&#45;&#45;my_custom_validator -->\n",
"<g id=\"edge2\" class=\"edge\"><title>$opts_schema&#45;&gt;to_ropts&#45;&#45;my_custom_validator</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M95.538,-291.337C121.717,-281.213 157.856,-267.236 187.588,-255.738\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"188.931,-258.971 196.995,-252.099 186.406,-252.442 188.931,-258.971\"/>\n",
"</g>\n",
"<!-- $opts -->\n",
"<g id=\"node5\" class=\"node\"><title>$opts</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"167.21\" cy=\"-306\" rx=\"29.0429\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"167.21\" y=\"-301.8\" font-family=\"Times,serif\" font-size=\"14.00\">$opts</text>\n",
"</g>\n",
"<!-- $opts&#45;&gt;to_ropts&#45;&#45;my_custom_validator -->\n",
"<g id=\"edge3\" class=\"edge\"><title>$opts&#45;&gt;to_ropts&#45;&#45;my_custom_validator</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M182.543,-290.496C192.105,-281.451 204.648,-269.585 215.681,-259.149\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"218.163,-261.618 223.023,-252.204 213.353,-256.533 218.163,-261.618\"/>\n",
"</g>\n",
"<!-- $ropts&#45;&gt;to_nproc&#45;&#45;my_nproc_func -->\n",
"<g id=\"edge11\" class=\"edge\"><title>$ropts&#45;&gt;to_nproc&#45;&#45;my_nproc_func</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M229.183,-145.116C222.758,-136.63 214.695,-125.981 207.426,-116.38\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"210.129,-114.152 201.302,-108.292 204.548,-118.377 210.129,-114.152\"/>\n",
"</g>\n",
"<!-- 1 -->\n",
"<g id=\"node7\" class=\"node\"><title>1</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"241.21\" cy=\"-306\" rx=\"27\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"241.21\" y=\"-301.8\" font-family=\"Times,serif\" font-size=\"14.00\">1</text>\n",
"</g>\n",
"<!-- 1&#45;&gt;to_ropts&#45;&#45;my_custom_validator -->\n",
"<g id=\"edge5\" class=\"edge\"><title>1&#45;&gt;to_ropts&#45;&#45;my_custom_validator</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M241.21,-287.697C241.21,-279.983 241.21,-270.712 241.21,-262.112\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"244.71,-262.104 241.21,-252.104 237.71,-262.104 244.71,-262.104\"/>\n",
"</g>\n",
"<!-- 3 -->\n",
"<g id=\"node8\" class=\"node\"><title>3</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"313.21\" cy=\"-306\" rx=\"27\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"313.21\" y=\"-301.8\" font-family=\"Times,serif\" font-size=\"14.00\">3</text>\n",
"</g>\n",
"<!-- 3&#45;&gt;to_ropts&#45;&#45;my_custom_validator -->\n",
"<g id=\"edge6\" class=\"edge\"><title>3&#45;&gt;to_ropts&#45;&#45;my_custom_validator</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M298.639,-290.834C289.291,-281.746 276.912,-269.71 266.041,-259.141\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"268.421,-256.574 258.811,-252.113 263.542,-261.593 268.421,-256.574\"/>\n",
"</g>\n",
"<!-- 2 -->\n",
"<g id=\"node9\" class=\"node\"><title>2</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"385.21\" cy=\"-306\" rx=\"27\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"385.21\" y=\"-301.8\" font-family=\"Times,serif\" font-size=\"14.00\">2</text>\n",
"</g>\n",
"<!-- 2&#45;&gt;to_ropts&#45;&#45;my_custom_validator -->\n",
"<g id=\"edge7\" class=\"edge\"><title>2&#45;&gt;to_ropts&#45;&#45;my_custom_validator</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M363.962,-294.671C343.407,-284.679 311.624,-269.229 285.549,-256.554\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"286.756,-253.249 276.232,-252.025 283.695,-259.544 286.756,-253.249\"/>\n",
"</g>\n",
"<!-- to_task_type -->\n",
"<g id=\"node10\" class=\"node\"><title>to_task_type</title>\n",
"<polygon fill=\"none\" stroke=\"black\" points=\"617.756,-252 530.664,-252 530.664,-216 617.756,-216 617.756,-252\"/>\n",
"<text text-anchor=\"middle\" x=\"574.21\" y=\"-229.8\" font-family=\"Times,serif\" font-size=\"14.00\">to_task_type</text>\n",
"</g>\n",
"<!-- $task_type -->\n",
"<g id=\"node11\" class=\"node\"><title>$task_type</title>\n",
"<ellipse fill=\"aquamarine\" stroke=\"black\" cx=\"574.21\" cy=\"-162\" rx=\"48.3427\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"574.21\" y=\"-157.8\" font-family=\"Times,serif\" font-size=\"14.00\">$task_type</text>\n",
"</g>\n",
"<!-- to_task_type&#45;&gt;$task_type -->\n",
"<g id=\"edge8\" class=\"edge\"><title>to_task_type&#45;&gt;$task_type</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M574.21,-215.697C574.21,-207.983 574.21,-198.712 574.21,-190.112\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"577.71,-190.104 574.21,-180.104 570.71,-190.104 577.71,-190.104\"/>\n",
"</g>\n",
"<!-- pbsmrtpipe.constants.distributed_task -->\n",
"<g id=\"node13\" class=\"node\"><title>pbsmrtpipe.constants.distributed_task</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"574.21\" cy=\"-306\" rx=\"143.952\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"574.21\" y=\"-301.8\" font-family=\"Times,serif\" font-size=\"14.00\">pbsmrtpipe.constants.distributed_task</text>\n",
"</g>\n",
"<!-- pbsmrtpipe.constants.distributed_task&#45;&gt;to_task_type -->\n",
"<g id=\"edge10\" class=\"edge\"><title>pbsmrtpipe.constants.distributed_task&#45;&gt;to_task_type</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M574.21,-287.697C574.21,-279.983 574.21,-270.712 574.21,-262.112\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"577.71,-262.104 574.21,-252.104 570.71,-262.104 577.71,-262.104\"/>\n",
"</g>\n",
"</g>\n",
"</svg>\n"
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"********** MetaTask task_id 'pbsmrtpipe.tasks.simple_task_01'**********\n",
"{'cmd_func': <function to_cmd at 0x10dd43320>,\n",
" 'input_types': ('pbsmrtpipe.file.fasta',),\n",
" 'nproc': 1,\n",
" 'option_schemas': {},\n",
" 'output_types': ('pbsmrtpipe.file.report',),\n",
" 'resource_types': (),\n",
" 'task_id': 'pbsmrtpipe.tasks.simple_task_01',\n",
" 'task_type': 'pbsmrtpipe.constants.local_task'}\n",
"\n",
"********** Task pbsmrtpipe.tasks.simple_task_01**********\n",
"{'cmd': 'my-simple-cmd.sh --nproc 1 /var/folders/_c/8_r02r4s7vl3kx1pt92sbcmr0000gn/T/tmpX6PQ_q/file-6.fasta /var/folders/_c/8_r02r4s7vl3kx1pt92sbcmr0000gn/T/tmpX6PQ_q/report-6.json',\n",
" 'input_files': ['/var/folders/_c/8_r02r4s7vl3kx1pt92sbcmr0000gn/T/tmpX6PQ_q/file-6.fasta'],\n",
" 'nproc': 1,\n",
" 'output_files': ['/var/folders/_c/8_r02r4s7vl3kx1pt92sbcmr0000gn/T/tmpX6PQ_q/report-6.json'],\n",
" 'resolved_options': {'max_chunks': 6,\n",
" 'my_task_option_id': 9876,\n",
" 'my_task_options1': 54321},\n",
" 'resources': (),\n",
" 'task_id': 'pbsmrtpipe.tasks.simple_task_01',\n",
" 'task_type': 'pbsmrtpipe.constants.distributed_task'}\n"
]
},
{
"metadata": {},
"output_type": "display_data",
"svg": [
"<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"no\"?>\n",
"<!DOCTYPE svg PUBLIC \"-//W3C//DTD SVG 1.1//EN\"\n",
" \"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd\">\n",
"<!-- Generated by graphviz version 2.38.0 (20140413.2041)\n",
" -->\n",
"<!-- Title: %3 Pages: 1 -->\n",
"<svg width=\"548pt\" height=\"188pt\"\n",
" viewBox=\"0.00 0.00 547.93 188.00\" xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\">\n",
"<g id=\"graph0\" class=\"graph\" transform=\"scale(1 1) rotate(0) translate(4 184)\">\n",
"<title>%3</title>\n",
"<polygon fill=\"white\" stroke=\"none\" points=\"-4,4 -4,-184 543.934,-184 543.934,4 -4,4\"/>\n",
"<!-- to_nproc -->\n",
"<g id=\"node1\" class=\"node\"><title>to_nproc</title>\n",
"<polygon fill=\"none\" stroke=\"black\" points=\"67.4299,-108 1.89781,-108 1.89781,-72 67.4299,-72 67.4299,-108\"/>\n",
"<text text-anchor=\"middle\" x=\"34.6638\" y=\"-85.8\" font-family=\"Times,serif\" font-size=\"14.00\">to_nproc</text>\n",
"</g>\n",
"<!-- $nproc -->\n",
"<g id=\"node10\" class=\"node\"><title>$nproc</title>\n",
"<ellipse fill=\"aquamarine\" stroke=\"black\" cx=\"34.6638\" cy=\"-18\" rx=\"34.8285\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"34.6638\" y=\"-13.8\" font-family=\"Times,serif\" font-size=\"14.00\">$nproc</text>\n",
"</g>\n",
"<!-- to_nproc&#45;&gt;$nproc -->\n",
"<g id=\"edge7\" class=\"edge\"><title>to_nproc&#45;&gt;$nproc</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M34.6638,-71.6966C34.6638,-63.9827 34.6638,-54.7125 34.6638,-46.1124\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"38.1639,-46.1043 34.6638,-36.1043 31.1639,-46.1044 38.1639,-46.1043\"/>\n",
"</g>\n",
"<!-- 1 -->\n",
"<g id=\"node2\" class=\"node\"><title>1</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"34.6638\" cy=\"-162\" rx=\"27\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"34.6638\" y=\"-157.8\" font-family=\"Times,serif\" font-size=\"14.00\">1</text>\n",
"</g>\n",
"<!-- 1&#45;&gt;to_nproc -->\n",
"<g id=\"edge1\" class=\"edge\"><title>1&#45;&gt;to_nproc</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M34.6638,-143.697C34.6638,-135.983 34.6638,-126.712 34.6638,-118.112\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"38.1639,-118.104 34.6638,-108.104 31.1639,-118.104 38.1639,-118.104\"/>\n",
"</g>\n",
"<!-- to_task_type -->\n",
"<g id=\"node3\" class=\"node\"><title>to_task_type</title>\n",
"<polygon fill=\"none\" stroke=\"black\" points=\"246.21,-108 159.118,-108 159.118,-72 246.21,-72 246.21,-108\"/>\n",
"<text text-anchor=\"middle\" x=\"202.664\" y=\"-85.8\" font-family=\"Times,serif\" font-size=\"14.00\">to_task_type</text>\n",
"</g>\n",
"<!-- $task_type -->\n",
"<g id=\"node9\" class=\"node\"><title>$task_type</title>\n",
"<ellipse fill=\"aquamarine\" stroke=\"black\" cx=\"202.664\" cy=\"-18\" rx=\"48.3427\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"202.664\" y=\"-13.8\" font-family=\"Times,serif\" font-size=\"14.00\">$task_type</text>\n",
"</g>\n",
"<!-- to_task_type&#45;&gt;$task_type -->\n",
"<g id=\"edge6\" class=\"edge\"><title>to_task_type&#45;&gt;$task_type</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M202.664,-71.6966C202.664,-63.9827 202.664,-54.7125 202.664,-46.1124\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"206.164,-46.1043 202.664,-36.1043 199.164,-46.1044 206.164,-46.1043\"/>\n",
"</g>\n",
"<!-- pbsmrtpipe.constants.local_task -->\n",
"<g id=\"node4\" class=\"node\"><title>pbsmrtpipe.constants.local_task</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"202.664\" cy=\"-162\" rx=\"123.171\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"202.664\" y=\"-157.8\" font-family=\"Times,serif\" font-size=\"14.00\">pbsmrtpipe.constants.local_task</text>\n",
"</g>\n",
"<!-- pbsmrtpipe.constants.local_task&#45;&gt;to_task_type -->\n",
"<g id=\"edge2\" class=\"edge\"><title>pbsmrtpipe.constants.local_task&#45;&gt;to_task_type</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M202.664,-143.697C202.664,-135.983 202.664,-126.712 202.664,-118.112\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"206.164,-118.104 202.664,-108.104 199.164,-118.104 206.164,-118.104\"/>\n",
"</g>\n",
"<!-- to_ropts -->\n",
"<g id=\"node5\" class=\"node\"><title>to_ropts</title>\n",
"<polygon fill=\"none\" stroke=\"black\" points=\"487.554,-108 425.774,-108 425.774,-72 487.554,-72 487.554,-108\"/>\n",
"<text text-anchor=\"middle\" x=\"456.664\" y=\"-85.8\" font-family=\"Times,serif\" font-size=\"14.00\">to_ropts</text>\n",
"</g>\n",
"<!-- $ropts -->\n",
"<g id=\"node8\" class=\"node\"><title>$ropts</title>\n",
"<ellipse fill=\"aquamarine\" stroke=\"black\" cx=\"456.664\" cy=\"-18\" rx=\"32.4324\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"456.664\" y=\"-13.8\" font-family=\"Times,serif\" font-size=\"14.00\">$ropts</text>\n",
"</g>\n",
"<!-- to_ropts&#45;&gt;$ropts -->\n",
"<g id=\"edge5\" class=\"edge\"><title>to_ropts&#45;&gt;$ropts</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M456.664,-71.6966C456.664,-63.9827 456.664,-54.7125 456.664,-46.1124\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"460.164,-46.1043 456.664,-36.1043 453.164,-46.1044 460.164,-46.1043\"/>\n",
"</g>\n",
"<!-- $opts_schema -->\n",
"<g id=\"node6\" class=\"node\"><title>$opts_schema</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"403.664\" cy=\"-162\" rx=\"60.4202\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"403.664\" y=\"-157.8\" font-family=\"Times,serif\" font-size=\"14.00\">$opts_schema</text>\n",
"</g>\n",
"<!-- $opts_schema&#45;&gt;to_ropts -->\n",
"<g id=\"edge3\" class=\"edge\"><title>$opts_schema&#45;&gt;to_ropts</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M416.494,-144.055C422.78,-135.753 430.479,-125.584 437.449,-116.378\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"440.338,-118.361 443.584,-108.275 434.757,-114.135 440.338,-118.361\"/>\n",
"</g>\n",
"<!-- $opts -->\n",
"<g id=\"node7\" class=\"node\"><title>$opts</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"510.664\" cy=\"-162\" rx=\"29.0429\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"510.664\" y=\"-157.8\" font-family=\"Times,serif\" font-size=\"14.00\">$opts</text>\n",
"</g>\n",
"<!-- $opts&#45;&gt;to_ropts -->\n",
"<g id=\"edge4\" class=\"edge\"><title>$opts&#45;&gt;to_ropts</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M498.679,-145.465C492.143,-136.991 483.884,-126.286 476.423,-116.613\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"479.013,-114.241 470.134,-108.461 473.47,-118.517 479.013,-114.241\"/>\n",
"</g>\n",
"</g>\n",
"</svg>\n"
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"********** MetaTask task_id 'pbsmrtpipe.tasks.my_task_id'**********\n",
"{'cmd_func': <function to_cmd at 0x10dd43398>,\n",
" 'input_types': ('pbsmrtpipe.file.fasta', 'pbsmrtpipe.file.rgn_fofn'),\n",
" 'nproc': 1,\n",
" 'option_schemas': {'my_task_option_id': {}},\n",
" 'output_types': ('pbsmrtpipe.file.alignment_cmp_h5',),\n",
" 'resource_types': ('$tmpdir', '$logfile'),\n",
" 'task_id': 'pbsmrtpipe.tasks.my_task_id',\n",
" 'task_type': 'pbsmrtpipe.constants.local_task'}\n",
"\n",
"********** Task pbsmrtpipe.tasks.my_task_id**********\n",
"{'cmd': 'my_exe.sh --nproc=1 --tmp=/var/folders/_c/8_r02r4s7vl3kx1pt92sbcmr0000gn/T/tmpwo7owC --log=/var/folders/_c/8_r02r4s7vl3kx1pt92sbcmr0000gn/T/tmpnxX9Je/tmpUPm0SX.log --my-option=9876 fasta:/var/folders/_c/8_r02r4s7vl3kx1pt92sbcmr0000gn/T/tmpnxX9Je/file-9.fasta --region=/var/folders/_c/8_r02r4s7vl3kx1pt92sbcmr0000gn/T/tmpnxX9Je/rgn-5.fofn --output-report=/var/folders/_c/8_r02r4s7vl3kx1pt92sbcmr0000gn/T/tmpnxX9Je/alignment-3.cmp.h5',\n",
" 'input_files': ['/var/folders/_c/8_r02r4s7vl3kx1pt92sbcmr0000gn/T/tmpnxX9Je/file-9.fasta',\n",
" '/var/folders/_c/8_r02r4s7vl3kx1pt92sbcmr0000gn/T/tmpnxX9Je/rgn-5.fofn'],\n",
" 'nproc': 1,\n",
" 'output_files': ['/var/folders/_c/8_r02r4s7vl3kx1pt92sbcmr0000gn/T/tmpnxX9Je/alignment-3.cmp.h5'],\n",
" 'resolved_options': {'max_chunks': 6,\n",
" 'my_task_option_id': 9876,\n",
" 'my_task_options1': 54321},\n",
" 'resources': ['/var/folders/_c/8_r02r4s7vl3kx1pt92sbcmr0000gn/T/tmpwo7owC',\n",
" '/var/folders/_c/8_r02r4s7vl3kx1pt92sbcmr0000gn/T/tmpnxX9Je/tmpUPm0SX.log'],\n",
" 'task_id': 'pbsmrtpipe.tasks.my_task_id',\n",
" 'task_type': 'pbsmrtpipe.constants.distributed_task'}\n"
]
},
{
"metadata": {},
"output_type": "display_data",
"svg": [
"<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"no\"?>\n",
"<!DOCTYPE svg PUBLIC \"-//W3C//DTD SVG 1.1//EN\"\n",
" \"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd\">\n",
"<!-- Generated by graphviz version 2.38.0 (20140413.2041)\n",
" -->\n",
"<!-- Title: %3 Pages: 1 -->\n",
"<svg width=\"548pt\" height=\"188pt\"\n",
" viewBox=\"0.00 0.00 547.93 188.00\" xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\">\n",
"<g id=\"graph0\" class=\"graph\" transform=\"scale(1 1) rotate(0) translate(4 184)\">\n",
"<title>%3</title>\n",
"<polygon fill=\"white\" stroke=\"none\" points=\"-4,4 -4,-184 543.934,-184 543.934,4 -4,4\"/>\n",
"<!-- to_nproc -->\n",
"<g id=\"node1\" class=\"node\"><title>to_nproc</title>\n",
"<polygon fill=\"none\" stroke=\"black\" points=\"67.4299,-108 1.89781,-108 1.89781,-72 67.4299,-72 67.4299,-108\"/>\n",
"<text text-anchor=\"middle\" x=\"34.6638\" y=\"-85.8\" font-family=\"Times,serif\" font-size=\"14.00\">to_nproc</text>\n",
"</g>\n",
"<!-- $nproc -->\n",
"<g id=\"node10\" class=\"node\"><title>$nproc</title>\n",
"<ellipse fill=\"aquamarine\" stroke=\"black\" cx=\"34.6638\" cy=\"-18\" rx=\"34.8285\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"34.6638\" y=\"-13.8\" font-family=\"Times,serif\" font-size=\"14.00\">$nproc</text>\n",
"</g>\n",
"<!-- to_nproc&#45;&gt;$nproc -->\n",
"<g id=\"edge7\" class=\"edge\"><title>to_nproc&#45;&gt;$nproc</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M34.6638,-71.6966C34.6638,-63.9827 34.6638,-54.7125 34.6638,-46.1124\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"38.1639,-46.1043 34.6638,-36.1043 31.1639,-46.1044 38.1639,-46.1043\"/>\n",
"</g>\n",
"<!-- 1 -->\n",
"<g id=\"node2\" class=\"node\"><title>1</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"34.6638\" cy=\"-162\" rx=\"27\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"34.6638\" y=\"-157.8\" font-family=\"Times,serif\" font-size=\"14.00\">1</text>\n",
"</g>\n",
"<!-- 1&#45;&gt;to_nproc -->\n",
"<g id=\"edge1\" class=\"edge\"><title>1&#45;&gt;to_nproc</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M34.6638,-143.697C34.6638,-135.983 34.6638,-126.712 34.6638,-118.112\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"38.1639,-118.104 34.6638,-108.104 31.1639,-118.104 38.1639,-118.104\"/>\n",
"</g>\n",
"<!-- to_task_type -->\n",
"<g id=\"node3\" class=\"node\"><title>to_task_type</title>\n",
"<polygon fill=\"none\" stroke=\"black\" points=\"246.21,-108 159.118,-108 159.118,-72 246.21,-72 246.21,-108\"/>\n",
"<text text-anchor=\"middle\" x=\"202.664\" y=\"-85.8\" font-family=\"Times,serif\" font-size=\"14.00\">to_task_type</text>\n",
"</g>\n",
"<!-- $task_type -->\n",
"<g id=\"node9\" class=\"node\"><title>$task_type</title>\n",
"<ellipse fill=\"aquamarine\" stroke=\"black\" cx=\"202.664\" cy=\"-18\" rx=\"48.3427\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"202.664\" y=\"-13.8\" font-family=\"Times,serif\" font-size=\"14.00\">$task_type</text>\n",
"</g>\n",
"<!-- to_task_type&#45;&gt;$task_type -->\n",
"<g id=\"edge6\" class=\"edge\"><title>to_task_type&#45;&gt;$task_type</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M202.664,-71.6966C202.664,-63.9827 202.664,-54.7125 202.664,-46.1124\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"206.164,-46.1043 202.664,-36.1043 199.164,-46.1044 206.164,-46.1043\"/>\n",
"</g>\n",
"<!-- pbsmrtpipe.constants.local_task -->\n",
"<g id=\"node4\" class=\"node\"><title>pbsmrtpipe.constants.local_task</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"202.664\" cy=\"-162\" rx=\"123.171\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"202.664\" y=\"-157.8\" font-family=\"Times,serif\" font-size=\"14.00\">pbsmrtpipe.constants.local_task</text>\n",
"</g>\n",
"<!-- pbsmrtpipe.constants.local_task&#45;&gt;to_task_type -->\n",
"<g id=\"edge2\" class=\"edge\"><title>pbsmrtpipe.constants.local_task&#45;&gt;to_task_type</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M202.664,-143.697C202.664,-135.983 202.664,-126.712 202.664,-118.112\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"206.164,-118.104 202.664,-108.104 199.164,-118.104 206.164,-118.104\"/>\n",
"</g>\n",
"<!-- to_ropts -->\n",
"<g id=\"node5\" class=\"node\"><title>to_ropts</title>\n",
"<polygon fill=\"none\" stroke=\"black\" points=\"487.554,-108 425.774,-108 425.774,-72 487.554,-72 487.554,-108\"/>\n",
"<text text-anchor=\"middle\" x=\"456.664\" y=\"-85.8\" font-family=\"Times,serif\" font-size=\"14.00\">to_ropts</text>\n",
"</g>\n",
"<!-- $ropts -->\n",
"<g id=\"node8\" class=\"node\"><title>$ropts</title>\n",
"<ellipse fill=\"aquamarine\" stroke=\"black\" cx=\"456.664\" cy=\"-18\" rx=\"32.4324\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"456.664\" y=\"-13.8\" font-family=\"Times,serif\" font-size=\"14.00\">$ropts</text>\n",
"</g>\n",
"<!-- to_ropts&#45;&gt;$ropts -->\n",
"<g id=\"edge5\" class=\"edge\"><title>to_ropts&#45;&gt;$ropts</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M456.664,-71.6966C456.664,-63.9827 456.664,-54.7125 456.664,-46.1124\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"460.164,-46.1043 456.664,-36.1043 453.164,-46.1044 460.164,-46.1043\"/>\n",
"</g>\n",
"<!-- $opts_schema -->\n",
"<g id=\"node6\" class=\"node\"><title>$opts_schema</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"403.664\" cy=\"-162\" rx=\"60.4202\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"403.664\" y=\"-157.8\" font-family=\"Times,serif\" font-size=\"14.00\">$opts_schema</text>\n",
"</g>\n",
"<!-- $opts_schema&#45;&gt;to_ropts -->\n",
"<g id=\"edge3\" class=\"edge\"><title>$opts_schema&#45;&gt;to_ropts</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M416.494,-144.055C422.78,-135.753 430.479,-125.584 437.449,-116.378\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"440.338,-118.361 443.584,-108.275 434.757,-114.135 440.338,-118.361\"/>\n",
"</g>\n",
"<!-- $opts -->\n",
"<g id=\"node7\" class=\"node\"><title>$opts</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"510.664\" cy=\"-162\" rx=\"29.0429\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"510.664\" y=\"-157.8\" font-family=\"Times,serif\" font-size=\"14.00\">$opts</text>\n",
"</g>\n",
"<!-- $opts&#45;&gt;to_ropts -->\n",
"<g id=\"edge4\" class=\"edge\"><title>$opts&#45;&gt;to_ropts</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M498.679,-145.465C492.143,-136.991 483.884,-126.286 476.423,-116.613\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"479.013,-114.241 470.134,-108.461 473.47,-118.517 479.013,-114.241\"/>\n",
"</g>\n",
"</g>\n",
"</svg>\n"
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"********** MetaTask task_id 'pbsmrtpipe.tasks.task_id5'**********\n",
"{'chunk_di': ['$inputs.0.attr_id',\n",
" '$ropts',\n",
" '$nproc',\n",
" <function nchunks_func at 0x10dd43848>],\n",
" 'cmd_func': <function to_cmd at 0x10dd439b0>,\n",
" 'input_types': ('pbsmrtpipe.file.report', 'pbsmrtpipe.file.movie_fofn'),\n",
" 'nproc': ['$max_nproc', '$ropts', <function _my_nproc at 0x10dd437d0>],\n",
" 'option_schemas': {'my_task_option_id': {}},\n",
" 'output_types': ('pbsmrtpipe.file.rgn_fofn',),\n",
" 'resource_types': ('$outputdir',),\n",
" 'task_id': 'pbsmrtpipe.tasks.task_id5',\n",
" 'task_type': 'pbsmrtpipe.constants.distributed_task'}\n",
"\n",
"********** Task pbsmrtpipe.tasks.task_id5**********\n",
"{'cmd': 'my_exe.sh',\n",
" 'input_files': ['/var/folders/_c/8_r02r4s7vl3kx1pt92sbcmr0000gn/T/tmpqRX8cG/report-8.json',\n",
" '/var/folders/_c/8_r02r4s7vl3kx1pt92sbcmr0000gn/T/tmpqRX8cG/movie-2.fofn'],\n",
" 'nchunks': 3,\n",
" 'nproc': 5,\n",
" 'output_files': ['/var/folders/_c/8_r02r4s7vl3kx1pt92sbcmr0000gn/T/tmpqRX8cG/rgn-3.fofn'],\n",
" 'resolved_options': {'max_chunks': 6,\n",
" 'my_task_option_id': 9876,\n",
" 'my_task_options1': 54321},\n",
" 'resources': ['/var/folders/_c/8_r02r4s7vl3kx1pt92sbcmr0000gn/T/tmpqRX8cG'],\n",
" 'task_id': 'pbsmrtpipe.tasks.task_id5',\n",
" 'task_type': 'pbsmrtpipe.constants.distributed_task'}\n"
]
},
{
"metadata": {},
"output_type": "display_data",
"svg": [
"<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"no\"?>\n",
"<!DOCTYPE svg PUBLIC \"-//W3C//DTD SVG 1.1//EN\"\n",
" \"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd\">\n",
"<!-- Generated by graphviz version 2.38.0 (20140413.2041)\n",
" -->\n",
"<!-- Title: %3 Pages: 1 -->\n",
"<svg width=\"563pt\" height=\"476pt\"\n",
" viewBox=\"0.00 0.00 563.15 476.00\" xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\">\n",
"<g id=\"graph0\" class=\"graph\" transform=\"scale(1 1) rotate(0) translate(4 472)\">\n",
"<title>%3</title>\n",
"<polygon fill=\"white\" stroke=\"none\" points=\"-4,4 -4,-472 559.154,-472 559.154,4 -4,4\"/>\n",
"<!-- to_nproc&#45;&#45;_my_nproc -->\n",
"<g id=\"node1\" class=\"node\"><title>to_nproc&#45;&#45;_my_nproc</title>\n",
"<polygon fill=\"none\" stroke=\"black\" points=\"138.783,-252 0.0721905,-252 0.0721905,-216 138.783,-216 138.783,-252\"/>\n",
"<text text-anchor=\"middle\" x=\"69.4277\" y=\"-229.8\" font-family=\"Times,serif\" font-size=\"14.00\">to_nproc&#45;&#45;_my_nproc</text>\n",
"</g>\n",
"<!-- $nproc -->\n",
"<g id=\"node12\" class=\"node\"><title>$nproc</title>\n",
"<ellipse fill=\"aquamarine\" stroke=\"black\" cx=\"86.4277\" cy=\"-162\" rx=\"34.8285\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"86.4277\" y=\"-157.8\" font-family=\"Times,serif\" font-size=\"14.00\">$nproc</text>\n",
"</g>\n",
"<!-- to_nproc&#45;&#45;_my_nproc&#45;&gt;$nproc -->\n",
"<g id=\"edge9\" class=\"edge\"><title>to_nproc&#45;&#45;_my_nproc&#45;&gt;$nproc</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M73.63,-215.697C75.5239,-207.898 77.8042,-198.509 79.9121,-189.829\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"83.3149,-190.648 82.2738,-180.104 76.5126,-188.996 83.3149,-190.648\"/>\n",
"</g>\n",
"<!-- $max_nproc -->\n",
"<g id=\"node2\" class=\"node\"><title>$max_nproc</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"61.4277\" cy=\"-306\" rx=\"54.123\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"61.4277\" y=\"-301.8\" font-family=\"Times,serif\" font-size=\"14.00\">$max_nproc</text>\n",
"</g>\n",
"<!-- $max_nproc&#45;&gt;to_nproc&#45;&#45;_my_nproc -->\n",
"<g id=\"edge1\" class=\"edge\"><title>$max_nproc&#45;&gt;to_nproc&#45;&#45;_my_nproc</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M63.4053,-287.697C64.2869,-279.983 65.3463,-270.712 66.3292,-262.112\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"69.8148,-262.437 67.473,-252.104 62.86,-261.642 69.8148,-262.437\"/>\n",
"</g>\n",
"<!-- to_ropts -->\n",
"<g id=\"node3\" class=\"node\"><title>to_ropts</title>\n",
"<polygon fill=\"none\" stroke=\"black\" points=\"197.317,-396 135.538,-396 135.538,-360 197.317,-360 197.317,-396\"/>\n",
"<text text-anchor=\"middle\" x=\"166.428\" y=\"-373.8\" font-family=\"Times,serif\" font-size=\"14.00\">to_ropts</text>\n",
"</g>\n",
"<!-- $ropts -->\n",
"<g id=\"node8\" class=\"node\"><title>$ropts</title>\n",
"<ellipse fill=\"aquamarine\" stroke=\"black\" cx=\"166.428\" cy=\"-306\" rx=\"32.4324\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"166.428\" y=\"-301.8\" font-family=\"Times,serif\" font-size=\"14.00\">$ropts</text>\n",
"</g>\n",
"<!-- to_ropts&#45;&gt;$ropts -->\n",
"<g id=\"edge5\" class=\"edge\"><title>to_ropts&#45;&gt;$ropts</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M166.428,-359.697C166.428,-351.983 166.428,-342.712 166.428,-334.112\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"169.928,-334.104 166.428,-324.104 162.928,-334.104 169.928,-334.104\"/>\n",
"</g>\n",
"<!-- $opts_schema -->\n",
"<g id=\"node4\" class=\"node\"><title>$opts_schema</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"113.428\" cy=\"-450\" rx=\"60.4202\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"113.428\" y=\"-445.8\" font-family=\"Times,serif\" font-size=\"14.00\">$opts_schema</text>\n",
"</g>\n",
"<!-- $opts_schema&#45;&gt;to_ropts -->\n",
"<g id=\"edge2\" class=\"edge\"><title>$opts_schema&#45;&gt;to_ropts</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M126.258,-432.055C132.544,-423.753 140.243,-413.584 147.213,-404.378\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"150.102,-406.361 153.348,-396.275 144.521,-402.135 150.102,-406.361\"/>\n",
"</g>\n",
"<!-- $opts -->\n",
"<g id=\"node5\" class=\"node\"><title>$opts</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"220.428\" cy=\"-450\" rx=\"29.0429\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"220.428\" y=\"-445.8\" font-family=\"Times,serif\" font-size=\"14.00\">$opts</text>\n",
"</g>\n",
"<!-- $opts&#45;&gt;to_ropts -->\n",
"<g id=\"edge3\" class=\"edge\"><title>$opts&#45;&gt;to_ropts</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M208.443,-433.465C201.907,-424.991 193.648,-414.286 186.186,-404.613\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"188.777,-402.241 179.898,-396.461 183.234,-406.517 188.777,-402.241\"/>\n",
"</g>\n",
"<!-- to_nchunks&#45;&#45;nchunks_func -->\n",
"<g id=\"node6\" class=\"node\"><title>to_nchunks&#45;&#45;nchunks_func</title>\n",
"<polygon fill=\"none\" stroke=\"black\" points=\"250.842,-108 82.0132,-108 82.0132,-72 250.842,-72 250.842,-108\"/>\n",
"<text text-anchor=\"middle\" x=\"166.428\" y=\"-85.8\" font-family=\"Times,serif\" font-size=\"14.00\">to_nchunks&#45;&#45;nchunks_func</text>\n",
"</g>\n",
"<!-- $nchunks -->\n",
"<g id=\"node7\" class=\"node\"><title>$nchunks</title>\n",
"<ellipse fill=\"aquamarine\" stroke=\"black\" cx=\"166.428\" cy=\"-18\" rx=\"44.0078\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"166.428\" y=\"-13.8\" font-family=\"Times,serif\" font-size=\"14.00\">$nchunks</text>\n",
"</g>\n",
"<!-- to_nchunks&#45;&#45;nchunks_func&#45;&gt;$nchunks -->\n",
"<g id=\"edge4\" class=\"edge\"><title>to_nchunks&#45;&#45;nchunks_func&#45;&gt;$nchunks</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M166.428,-71.6966C166.428,-63.9827 166.428,-54.7125 166.428,-46.1124\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"169.928,-46.1043 166.428,-36.1043 162.928,-46.1044 169.928,-46.1043\"/>\n",
"</g>\n",
"<!-- $ropts&#45;&gt;to_nproc&#45;&#45;_my_nproc -->\n",
"<g id=\"edge12\" class=\"edge\"><title>$ropts&#45;&gt;to_nproc&#45;&#45;_my_nproc</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M147.264,-291.17C134.127,-281.69 116.39,-268.89 101.165,-257.904\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"103.168,-255.033 93.0113,-252.019 99.0721,-260.709 103.168,-255.033\"/>\n",
"</g>\n",
"<!-- $ropts&#45;&gt;to_nchunks&#45;&#45;nchunks_func -->\n",
"<g id=\"edge11\" class=\"edge\"><title>$ropts&#45;&gt;to_nchunks&#45;&#45;nchunks_func</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M166.428,-287.849C166.428,-250.832 166.428,-163.181 166.428,-118.386\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"169.928,-118.232 166.428,-108.232 162.928,-118.232 169.928,-118.232\"/>\n",
"</g>\n",
"<!-- to_task_type -->\n",
"<g id=\"node9\" class=\"node\"><title>to_task_type</title>\n",
"<polygon fill=\"none\" stroke=\"black\" points=\"454.974,-396 367.882,-396 367.882,-360 454.974,-360 454.974,-396\"/>\n",
"<text text-anchor=\"middle\" x=\"411.428\" y=\"-373.8\" font-family=\"Times,serif\" font-size=\"14.00\">to_task_type</text>\n",
"</g>\n",
"<!-- $task_type -->\n",
"<g id=\"node10\" class=\"node\"><title>$task_type</title>\n",
"<ellipse fill=\"aquamarine\" stroke=\"black\" cx=\"411.428\" cy=\"-306\" rx=\"48.3427\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"411.428\" y=\"-301.8\" font-family=\"Times,serif\" font-size=\"14.00\">$task_type</text>\n",
"</g>\n",
"<!-- to_task_type&#45;&gt;$task_type -->\n",
"<g id=\"edge6\" class=\"edge\"><title>to_task_type&#45;&gt;$task_type</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M411.428,-359.697C411.428,-351.983 411.428,-342.712 411.428,-334.112\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"414.928,-334.104 411.428,-324.104 407.928,-334.104 414.928,-334.104\"/>\n",
"</g>\n",
"<!-- $inputs.0.attr_id -->\n",
"<g id=\"node11\" class=\"node\"><title>$inputs.0.attr_id</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"262.428\" cy=\"-162\" rx=\"68.1547\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"262.428\" y=\"-157.8\" font-family=\"Times,serif\" font-size=\"14.00\">$inputs.0.attr_id</text>\n",
"</g>\n",
"<!-- $inputs.0.attr_id&#45;&gt;to_nchunks&#45;&#45;nchunks_func -->\n",
"<g id=\"edge7\" class=\"edge\"><title>$inputs.0.attr_id&#45;&gt;to_nchunks&#45;&#45;nchunks_func</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M240.162,-144.765C227.686,-135.667 211.916,-124.168 198.165,-114.142\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"200.054,-111.188 189.912,-108.124 195.93,-116.844 200.054,-111.188\"/>\n",
"</g>\n",
"<!-- $nproc&#45;&gt;to_nchunks&#45;&#45;nchunks_func -->\n",
"<g id=\"edge8\" class=\"edge\"><title>$nproc&#45;&gt;to_nchunks&#45;&#45;nchunks_func</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M103.394,-146.155C113.853,-137.003 127.5,-125.061 139.42,-114.631\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"141.732,-117.259 146.953,-108.04 137.123,-111.991 141.732,-117.259\"/>\n",
"</g>\n",
"<!-- pbsmrtpipe.constants.distributed_task -->\n",
"<g id=\"node13\" class=\"node\"><title>pbsmrtpipe.constants.distributed_task</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"411.428\" cy=\"-450\" rx=\"143.952\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"411.428\" y=\"-445.8\" font-family=\"Times,serif\" font-size=\"14.00\">pbsmrtpipe.constants.distributed_task</text>\n",
"</g>\n",
"<!-- pbsmrtpipe.constants.distributed_task&#45;&gt;to_task_type -->\n",
"<g id=\"edge10\" class=\"edge\"><title>pbsmrtpipe.constants.distributed_task&#45;&gt;to_task_type</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M411.428,-431.697C411.428,-423.983 411.428,-414.712 411.428,-406.112\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"414.928,-406.104 411.428,-396.104 407.928,-406.104 414.928,-406.104\"/>\n",
"</g>\n",
"</g>\n",
"</svg>\n"
]
},
{
"output_type": "stream",
"stream": "stdout",
"text": [
"********** MetaTask task_id 'pbsmrtpipe.tasks.task_id4'**********\n",
"{'cmd_func': <function to_cmd at 0x10dd438c0>,\n",
" 'input_types': ('pbsmrtpipe.file.report', 'pbsmrtpipe.file.fasta'),\n",
" 'nproc': ['$max_nproc', '$ropts', <function _my_nproc at 0x10dd437d0>],\n",
" 'option_schemas': [{'my_option_01': 3.14159, 'my_task_option_id': {}},\n",
" '$inputs.0.attr_id',\n",
" <function dyn_opts_func at 0x10dd43668>],\n",
" 'output_types': ('pbsmrtpipe.file.alignment_cmp_h5',),\n",
" 'resource_types': ('$tmpfile',),\n",
" 'task_id': 'pbsmrtpipe.tasks.task_id4',\n",
" 'task_type': ['$ropts',\n",
" '$inputs.0.attr_id',\n",
" <function compute_task_type at 0x10dd43758>]}\n",
"\n",
"********** Task pbsmrtpipe.tasks.task_id4**********\n",
"{'cmd': 'my_exe.sh',\n",
" 'input_files': ['/var/folders/_c/8_r02r4s7vl3kx1pt92sbcmr0000gn/T/tmpW7RGkd/report-7.json',\n",
" '/var/folders/_c/8_r02r4s7vl3kx1pt92sbcmr0000gn/T/tmpW7RGkd/file-7.fasta'],\n",
" 'nproc': 5,\n",
" 'output_files': ['/var/folders/_c/8_r02r4s7vl3kx1pt92sbcmr0000gn/T/tmpW7RGkd/alignment-2.cmp.h5'],\n",
" 'resolved_options': {'my_option_01': 3.14159, 'my_task_option_id': {}},\n",
" 'resources': ['/var/folders/_c/8_r02r4s7vl3kx1pt92sbcmr0000gn/T/tmpsHhKQo.file'],\n",
" 'task_id': 'pbsmrtpipe.tasks.task_id4',\n",
" 'task_type': 'pbsmrtpipe.constants.distributed_task'}\n"
]
},
{
"metadata": {},
"output_type": "display_data",
"svg": [
"<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"no\"?>\n",
"<!DOCTYPE svg PUBLIC \"-//W3C//DTD SVG 1.1//EN\"\n",
" \"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd\">\n",
"<!-- Generated by graphviz version 2.38.0 (20140413.2041)\n",
" -->\n",
"<!-- Title: %3 Pages: 1 -->\n",
"<svg width=\"358pt\" height=\"332pt\"\n",
" viewBox=\"0.00 0.00 358.35 332.00\" xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\">\n",
"<g id=\"graph0\" class=\"graph\" transform=\"scale(1 1) rotate(0) translate(4 328)\">\n",
"<title>%3</title>\n",
"<polygon fill=\"white\" stroke=\"none\" points=\"-4,4 -4,-328 354.348,-328 354.348,4 -4,4\"/>\n",
"<!-- to_nproc&#45;&#45;_my_nproc -->\n",
"<g id=\"node1\" class=\"node\"><title>to_nproc&#45;&#45;_my_nproc</title>\n",
"<polygon fill=\"none\" stroke=\"black\" points=\"178.626,-108 39.915,-108 39.915,-72 178.626,-72 178.626,-108\"/>\n",
"<text text-anchor=\"middle\" x=\"109.271\" y=\"-85.8\" font-family=\"Times,serif\" font-size=\"14.00\">to_nproc&#45;&#45;_my_nproc</text>\n",
"</g>\n",
"<!-- $nproc -->\n",
"<g id=\"node9\" class=\"node\"><title>$nproc</title>\n",
"<ellipse fill=\"aquamarine\" stroke=\"black\" cx=\"109.271\" cy=\"-18\" rx=\"34.8285\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"109.271\" y=\"-13.8\" font-family=\"Times,serif\" font-size=\"14.00\">$nproc</text>\n",
"</g>\n",
"<!-- to_nproc&#45;&#45;_my_nproc&#45;&gt;$nproc -->\n",
"<g id=\"edge7\" class=\"edge\"><title>to_nproc&#45;&#45;_my_nproc&#45;&gt;$nproc</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M109.271,-71.6966C109.271,-63.9827 109.271,-54.7125 109.271,-46.1124\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"112.771,-46.1043 109.271,-36.1043 105.771,-46.1044 112.771,-46.1043\"/>\n",
"</g>\n",
"<!-- $max_nproc -->\n",
"<g id=\"node2\" class=\"node\"><title>$max_nproc</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"79.2705\" cy=\"-162\" rx=\"54.123\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"79.2705\" y=\"-157.8\" font-family=\"Times,serif\" font-size=\"14.00\">$max_nproc</text>\n",
"</g>\n",
"<!-- $max_nproc&#45;&gt;to_nproc&#45;&#45;_my_nproc -->\n",
"<g id=\"edge1\" class=\"edge\"><title>$max_nproc&#45;&gt;to_nproc&#45;&#45;_my_nproc</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M86.5327,-144.055C89.9456,-136.091 94.0945,-126.411 97.9092,-117.51\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"101.145,-118.845 101.867,-108.275 94.7106,-116.088 101.145,-118.845\"/>\n",
"</g>\n",
"<!-- to_ropts&#45;&#45;dyn_opts_func -->\n",
"<g id=\"node3\" class=\"node\"><title>to_ropts&#45;&#45;dyn_opts_func</title>\n",
"<polygon fill=\"none\" stroke=\"black\" points=\"233.699,-252 78.8422,-252 78.8422,-216 233.699,-216 233.699,-252\"/>\n",
"<text text-anchor=\"middle\" x=\"156.271\" y=\"-229.8\" font-family=\"Times,serif\" font-size=\"14.00\">to_ropts&#45;&#45;dyn_opts_func</text>\n",
"</g>\n",
"<!-- $ropts -->\n",
"<g id=\"node4\" class=\"node\"><title>$ropts</title>\n",
"<ellipse fill=\"aquamarine\" stroke=\"black\" cx=\"184.271\" cy=\"-162\" rx=\"32.4324\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"184.271\" y=\"-157.8\" font-family=\"Times,serif\" font-size=\"14.00\">$ropts</text>\n",
"</g>\n",
"<!-- to_ropts&#45;&#45;dyn_opts_func&#45;&gt;$ropts -->\n",
"<g id=\"edge2\" class=\"edge\"><title>to_ropts&#45;&#45;dyn_opts_func&#45;&gt;$ropts</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M163.192,-215.697C166.392,-207.696 170.263,-198.02 173.809,-189.155\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"177.157,-190.207 177.622,-179.622 170.658,-187.607 177.157,-190.207\"/>\n",
"</g>\n",
"<!-- $ropts&#45;&gt;to_nproc&#45;&#45;_my_nproc -->\n",
"<g id=\"edge10\" class=\"edge\"><title>$ropts&#45;&gt;to_nproc&#45;&#45;_my_nproc</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M168.365,-146.155C158.653,-137.09 146.008,-125.289 134.91,-114.93\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"137.226,-112.305 127.528,-108.04 132.45,-117.422 137.226,-112.305\"/>\n",
"</g>\n",
"<!-- to_task_type -->\n",
"<g id=\"node6\" class=\"node\"><title>to_task_type</title>\n",
"<polygon fill=\"none\" stroke=\"black\" points=\"298.817,-108 211.724,-108 211.724,-72 298.817,-72 298.817,-108\"/>\n",
"<text text-anchor=\"middle\" x=\"255.271\" y=\"-85.8\" font-family=\"Times,serif\" font-size=\"14.00\">to_task_type</text>\n",
"</g>\n",
"<!-- $ropts&#45;&gt;to_task_type -->\n",
"<g id=\"edge9\" class=\"edge\"><title>$ropts&#45;&gt;to_task_type</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M199.676,-145.811C208.734,-136.881 220.395,-125.384 230.696,-115.229\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"233.235,-117.64 237.899,-108.127 228.321,-112.655 233.235,-117.64\"/>\n",
"</g>\n",
"<!-- $opts -->\n",
"<g id=\"node5\" class=\"node\"><title>$opts</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"29.2705\" cy=\"-306\" rx=\"29.0429\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"29.2705\" y=\"-301.8\" font-family=\"Times,serif\" font-size=\"14.00\">$opts</text>\n",
"</g>\n",
"<!-- $opts&#45;&gt;to_ropts&#45;&#45;dyn_opts_func -->\n",
"<g id=\"edge3\" class=\"edge\"><title>$opts&#45;&gt;to_ropts&#45;&#45;dyn_opts_func</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M50.242,-293.441C68.1991,-283.543 94.5537,-269.017 116.495,-256.924\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"118.211,-259.974 125.279,-252.082 114.832,-253.844 118.211,-259.974\"/>\n",
"</g>\n",
"<!-- $task_type -->\n",
"<g id=\"node7\" class=\"node\"><title>$task_type</title>\n",
"<ellipse fill=\"aquamarine\" stroke=\"black\" cx=\"255.271\" cy=\"-18\" rx=\"48.3427\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"255.271\" y=\"-13.8\" font-family=\"Times,serif\" font-size=\"14.00\">$task_type</text>\n",
"</g>\n",
"<!-- to_task_type&#45;&gt;$task_type -->\n",
"<g id=\"edge4\" class=\"edge\"><title>to_task_type&#45;&gt;$task_type</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M255.271,-71.6966C255.271,-63.9827 255.271,-54.7125 255.271,-46.1124\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"258.771,-46.1043 255.271,-36.1043 251.771,-46.1044 258.771,-46.1043\"/>\n",
"</g>\n",
"<!-- $inputs.0.attr_id -->\n",
"<g id=\"node8\" class=\"node\"><title>$inputs.0.attr_id</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"282.271\" cy=\"-306\" rx=\"68.1547\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"282.271\" y=\"-301.8\" font-family=\"Times,serif\" font-size=\"14.00\">$inputs.0.attr_id</text>\n",
"</g>\n",
"<!-- $inputs.0.attr_id&#45;&gt;to_ropts&#45;&#45;dyn_opts_func -->\n",
"<g id=\"edge5\" class=\"edge\"><title>$inputs.0.attr_id&#45;&gt;to_ropts&#45;&#45;dyn_opts_func</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M254.307,-289.465C237.235,-279.98 215.131,-267.7 196.233,-257.201\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"197.691,-254.008 187.25,-252.211 194.292,-260.127 197.691,-254.008\"/>\n",
"</g>\n",
"<!-- $inputs.0.attr_id&#45;&gt;to_task_type -->\n",
"<g id=\"edge6\" class=\"edge\"><title>$inputs.0.attr_id&#45;&gt;to_task_type</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M280.107,-287.849C275.436,-250.832 264.377,-163.181 258.726,-118.386\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"262.169,-117.716 257.445,-108.232 255.224,-118.592 262.169,-117.716\"/>\n",
"</g>\n",
"<!-- $opts_schema -->\n",
"<g id=\"node10\" class=\"node\"><title>$opts_schema</title>\n",
"<ellipse fill=\"none\" stroke=\"black\" cx=\"136.271\" cy=\"-306\" rx=\"60.4202\" ry=\"18\"/>\n",
"<text text-anchor=\"middle\" x=\"136.271\" y=\"-301.8\" font-family=\"Times,serif\" font-size=\"14.00\">$opts_schema</text>\n",
"</g>\n",
"<!-- $opts_schema&#45;&gt;to_ropts&#45;&#45;dyn_opts_func -->\n",
"<g id=\"edge8\" class=\"edge\"><title>$opts_schema&#45;&gt;to_ropts&#45;&#45;dyn_opts_func</title>\n",
"<path fill=\"none\" stroke=\"black\" d=\"M141.112,-288.055C143.339,-280.261 146.036,-270.822 148.534,-262.079\"/>\n",
"<polygon fill=\"black\" stroke=\"black\" points=\"151.953,-262.852 151.335,-252.275 145.222,-260.929 151.953,-262.852\"/>\n",
"</g>\n",
"</g>\n",
"</svg>\n"
]
}
],
"prompt_number": 5
},
{
"cell_type": "code",
"collapsed": false,
"input": [],
"language": "python",
"metadata": {},
"outputs": []
}
],
"metadata": {}
}
]
}
import os
import time
import logging
import functools
import operator
import itertools
import tempfile
import socket
import subprocess
import networkx as nx
from IPython.display import display_svg
log = logging.getLogger(__name__)
_SUPPORTED_IMAGE_TYPES = 'png svg eps'.split()
DOT_EXE = 'dot'
def which(exe_str):
"""walk the exe_str in PATH to get current exe_str.
If path is found, the full path is returned. Else it returns None.
"""
paths = os.environ.get('PATH', None)
state = None
if paths is None:
# log warning
msg = "PATH env var is not defined."
log.error(msg)
return state
for path in paths.split(":"):
exe_path = os.path.join(path, exe_str)
# print exe_path
if os.path.exists(exe_path):
state = exe_path
break
return state
def backticks(cmd, merge_stderr=True):
"""
Returns rcode, stdout, stderr
"""
if merge_stderr:
_stderr = subprocess.STDOUT
else:
_stderr = subprocess.PIPE
# Setting shell = True is really badform, however, many of the tasks
# generate general shell code (which needs to be removed).
p = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=_stderr,
close_fds=True)
started_at = time.time()
nodeId = socket.getfqdn()
log.debug("Running on {s} with cmd '{c}'".format(s=nodeId, c=cmd))
out = [l[:-1] for l in p.stdout.readlines()]
p.stdout.close()
# need to allow process to terminate
p.wait()
run_time = time.time() - started_at
errCode = p.returncode and p.returncode or 0
if p.returncode > 0:
errorMessage = os.linesep.join(out)
output = []
else:
errorMessage = ''
output = out
if p.returncode == 0:
log.debug("Successful output (Return code = 0) in {s:.2f} sec ({m:.2f} min) of {c}".format(c=cmd, s=run_time, m=run_time / 60.0))
else:
msg = "Return code {r} {e} of cmd {c}".format(r=p.returncode, e=errorMessage, c=cmd)
log.error(msg)
sys.stderr.write(msg)
return errCode, output, errorMessage, run_time
def _dot_to_image(image_type, dot_file, image_file):
assert image_type.lower() in _SUPPORTED_IMAGE_TYPES
assert os.path.exists(dot_file)
assert which(DOT_EXE) is not None
cmd_str = "{e} -T{t} {i} -o {o}"
d = dict(e=DOT_EXE, t=image_type, i=dot_file, o=image_file)
cmd = cmd_str.format(**d)
rcode, stdout, stderr, run_time = backticks(cmd)
state = True if rcode == 0 else False
return state
dot_file_to_png = functools.partial(_dot_to_image, 'png')
dot_file_to_svg = functools.partial(_dot_to_image, 'svg')
dot_file_to_eps = functools.partial(_dot_to_image, 'eps')
def display_dot(dot_file):
def _to_temp(suffix):
t = tempfile.NamedTemporaryFile(suffix=suffix, delete=False)
t.close()
return t.name
svg_file = _to_temp('.svg')
# print "write dot file to {f}".format(f=dot_file)
dot_file_to_svg(dot_file, svg_file)
with open(svg_file, 'r') as f:
s = f.read()
display_svg(s, raw=True)
def display_networkx_graph(g):
def _to_temp(suffix):
t = tempfile.NamedTemporaryFile(suffix=suffix, delete=False)
t.close()
return t.name
dot_file = _to_temp('.dot')
nx.write_dot(g, dot_file)
display_dot(dot_file)
def pretty_print_task_klass_options(task_klasses):
outs = []
option_names = []
option_klasses = []
for t in task_klasses:
for name, o in t._OPTIONS.iteritems():
option_names.append(o)
option_klasses.append(o.__class__.__name__)
max_option_name = max(len(o.name) for o in option_names)
max_klass_name = max(len(o) for o in option_klasses)
pad = 2
for pb_name, ts in itertools.groupby(task_klasses, key=operator.attrgetter('__module__')):
# A PB_module can have the same options in the different Tasks
options = {}
for task_klass in ts:
for name, option in task_klass._OPTIONS.iteritems():
options[name] = option
header = "PB module {m} noptions: {n}".format(m=pb_name, n=len(options))
outs.append("-" * len(header))
outs.append(header)
outs.append("-" * len(header))
for o in options.values():
outs.append(" ".join([o.name.ljust(max_option_name + pad), o.__class__.__name__.ljust(max_klass_name + pad), str(o.optional).ljust(5), str(o.default_value)]))
outs.append("")
return "\n".join(outs)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment