Created
October 25, 2018 15:14
-
-
Save gdlmx/7424c101b2c6554c3df68cdcbcb3c72d to your computer and use it in GitHub Desktop.
Pegasus 4.8.4 Python 3
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright 2010 University Of Southern California | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, | |
# software distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
"""API for generating Pegasus DAXes | |
The classes in this module can be used to generate DAXes that can be | |
read by Pegasus. | |
The official DAX schema is here: http://pegasus.isi.edu/schema/ | |
Here is an example showing how to create the diamond DAX using this API: | |
# Create a DAX | |
diamond = ADAG("diamond") | |
# Add some metadata | |
diamond.metadata("name", "diamond") | |
diamond.metadata("createdby", "Gideon Juve") | |
# Add input file to the DAX-level replica catalog | |
a = File("f.a") | |
a.addPFN(PFN("gsiftp://site.com/inputs/f.a","site")) | |
a.metadata("size", "1024") | |
diamond.addFile(a) | |
# Add executables to the DAX-level replica catalog | |
e_preprocess = Executable(namespace="diamond", name="preprocess", version="4.0", os="linux", arch="x86_64") | |
e_preprocess.metadata("size", "2048") | |
e_preprocess.addPFN(PFN("gsiftp://site.com/bin/preprocess","site")) | |
diamond.addExecutable(e_preprocess) | |
e_findrange = Executable(namespace="diamond", name="findrange", version="4.0", os="linux", arch="x86_64") | |
e_findrange.addPFN(PFN("gsiftp://site.com/bin/findrange","site")) | |
diamond.addExecutable(e_findrange) | |
e_analyze = Executable(namespace="diamond", name="analyze", version="4.0", os="linux", arch="x86_64") | |
e_analyze.addPFN(PFN("gsiftp://site.com/bin/analyze","site")) | |
diamond.addExecutable(e_analyze) | |
# Add a preprocess job | |
preprocess = Job(e_preprocess) | |
preprocess.metadata("time", "60") | |
b1 = File("f.b1") | |
b2 = File("f.b2") | |
preprocess.addArguments("-a preprocess","-T60","-i",a,"-o",b1,b2) | |
preprocess.uses(a, link=Link.INPUT) | |
preprocess.uses(b1, link=Link.OUTPUT, transfer=True) | |
preprocess.uses(b2, link=Link.OUTPUT, transfer=True) | |
diamond.addJob(preprocess) | |
# Add left Findrange job | |
frl = Job(e_findrange) | |
frl.metadata("time", "60") | |
c1 = File("f.c1") | |
frl.addArguments("-a findrange","-T60","-i",b1,"-o",c1) | |
frl.uses(b1, link=Link.INPUT) | |
frl.uses(c1, link=Link.OUTPUT, transfer=True) | |
diamond.addJob(frl) | |
# Add right Findrange job | |
frr = Job(e_findrange) | |
frr.metadata("time", "60") | |
c2 = File("f.c2") | |
frr.addArguments("-a findrange","-T60","-i",b2,"-o",c2) | |
frr.uses(b2, link=Link.INPUT) | |
frr.uses(c2, link=Link.OUTPUT, transfer=True) | |
diamond.addJob(frr) | |
# Add Analyze job | |
analyze = Job(e_analyze) | |
analyze.metadata("time", "60") | |
d = File("f.d") | |
analyze.addArguments("-a analyze","-T60","-i",c1,c2,"-o",d) | |
analyze.uses(c1, link=Link.INPUT) | |
analyze.uses(c2, link=Link.INPUT) | |
analyze.uses(d, link=Link.OUTPUT, transfer=True, register=True) | |
diamond.addJob(analyze) | |
# Add dependencies | |
diamond.depends(parent=preprocess, child=frl) | |
diamond.depends(parent=preprocess, child=frr) | |
diamond.depends(parent=frl, child=analyze) | |
diamond.depends(parent=frr, child=analyze) | |
# Write the DAX to stdout | |
import sys | |
diamond.writeXML(sys.stdout) | |
# Write the DAX to a file | |
f = open("diamond.dax","w") | |
diamond.writeXML(f) | |
f.close() | |
""" | |
__author__ = "Gideon Juve <[email protected]>, Rafael Ferreira da Silva <[email protected]>" | |
__version__ = "3.6" | |
__all__ = [ | |
"DAX3Error", | |
"DuplicateError", | |
"NotFoundError", | |
"FormatError", | |
"ParseError", | |
"Element", | |
"Namespace", | |
"ContainerType", | |
"Arch", | |
"Link", | |
"Transfer", | |
"OS", | |
"When", | |
"Invoke", | |
"InvokeMixin", | |
"ProfileMixin", | |
"MetadataMixin", | |
"PFNMixin", | |
"CatalogType", | |
"File", | |
"Executable", | |
"Container", | |
"Metadata", | |
"PFN", | |
"Profile", | |
"Use", | |
"UseMixin", | |
"Transformation", | |
"AbstractJob", | |
"Job", | |
"DAX", | |
"DAG", | |
"Dependency", | |
"ADAG", | |
"parseString", | |
"parse" | |
] | |
import datetime, os, sys | |
import codecs | |
import shlex | |
import codecs | |
import warnings | |
if sys.version_info >= (3, 0): | |
# compatibility with Python 3 | |
from past.builtins import basestring , unicode | |
try: | |
from StringIO import StringIO | |
except ImportError: | |
from io import StringIO | |
SCHEMA_NAMESPACE = "http://pegasus.isi.edu/schema/DAX" | |
SCHEMA_LOCATION = "http://pegasus.isi.edu/schema/dax-3.6.xsd" | |
SCHEMA_VERSION = "3.6" | |
class DAX3Error(Exception): pass | |
class DuplicateError(DAX3Error): pass | |
class NotFoundError(DAX3Error): pass | |
class FormatError(DAX3Error): pass | |
class ParseError(DAX3Error): pass | |
class Element: | |
"""Representation of an XML element for formatting output""" | |
def __init__(self, name, attrs=[]): | |
self.name = name | |
self.attrs = [] | |
for attr, value in attrs: | |
if value is not None: | |
if isinstance(value, bool): | |
value = str(value).lower() | |
elif not isinstance(value, basestring): | |
value = repr(value) | |
attr = attr.replace('__', ':') | |
self.attrs.append((attr, value)) | |
self.children = [] | |
self.flat = False | |
def _escape(self, text): | |
"""Escape special characters in XML""" | |
o = [] | |
for c in text: | |
if c == '"': | |
o.append(""") | |
elif c == "'": | |
o.append("'") | |
elif c == "<": | |
o.append("<") | |
elif c == ">": | |
o.append(">") | |
elif c == "&": | |
o.append("&") | |
else: | |
o.append(c) | |
return ''.join(o) | |
def element(self, element): | |
self.children.append(element) | |
return element | |
def text(self, value): | |
if not isinstance(value, basestring): | |
value = str(value) | |
self.children.append(self._escape(value)) | |
return self | |
def comment(self, message): | |
self.children.append("<!-- %s -->" % self._escape(message)) | |
def flatten(self): | |
self.flat = True | |
return self | |
def __str__(self): | |
s = StringIO() | |
self.write(s) | |
x = s.getvalue() | |
s.close() | |
return unicode(x) | |
def __bytes__(self): | |
return unicode(self).encode('utf-8') | |
def write(self, stream=sys.stdout, level=0, flatten=False): | |
flat = self.flat or flatten | |
stream.write('<%s' % self.name) | |
for attr, value in self.attrs: | |
value = self._escape(value) | |
stream.write(' %s="%s"' % (attr, value)) | |
if len(self.children) == 0: | |
stream.write('/>') | |
else: | |
stream.write('>') | |
if not flat: | |
stream.write('\n') | |
for child in self.children: | |
if not flat: | |
stream.write('\t' * (level + 1)) | |
if isinstance(child, basestring): | |
stream.write(child) | |
else: | |
child.write(stream, level + 1, flat) | |
if not flat: | |
stream.write('\n') | |
if not flat: | |
stream.write('\t' * level) | |
stream.write('</%s>' % self.name) | |
class Namespace: | |
""" | |
Namespace values recognized by Pegasus. See Executable, | |
Transformation, and Job. | |
""" | |
PEGASUS = 'pegasus' | |
CONDOR = 'condor' | |
DAGMAN = 'dagman' | |
ENV = 'env' | |
HINTS = 'hints' | |
GLOBUS = 'globus' | |
SELECTOR = 'selector' | |
STAT = 'stat' | |
class Arch: | |
""" | |
Architecture types. See Executable. | |
""" | |
X86 = 'x86' | |
X86_64 = 'x86_64' | |
PPC = 'ppc' | |
PPC_64 = 'ppc_64' | |
IA64 = 'ia64' | |
SPARCV7 = 'sparcv7' | |
SPARCV9 = 'sparcv9' | |
AMD64 = 'amd64' | |
class Link: | |
""" | |
Linkage attributes. See File, Executable and uses(). | |
""" | |
NONE = 'none' | |
INPUT = 'input' | |
OUTPUT = 'output' | |
INOUT = 'inout' | |
CHECKPOINT = 'checkpoint' | |
class Transfer: | |
""" | |
Transfer types for uses. See Executable, File. | |
""" | |
FALSE = 'false' | |
OPTIONAL = 'optional' | |
TRUE = 'true' | |
class OS: | |
""" | |
OS types. See Executable. | |
""" | |
LINUX = 'linux' | |
SUNOS = 'sunos' | |
AIX = 'aix' | |
MACOS = 'macos' | |
WINDOWS = 'windows' | |
class When: | |
""" | |
Job states for notifications. See Job/DAX/DAG.invoke(). | |
""" | |
NEVER = 'never' | |
START = 'start' | |
ON_ERROR = 'on_error' | |
ON_SUCCESS = 'on_success' | |
AT_END = 'at_end' | |
ALL = 'all' | |
class ContainerType: | |
""" | |
Container types. See Container. | |
""" | |
DOCKER = 'docker' | |
SINGULARITY = 'singularity' | |
class Invoke: | |
def __init__(self, when, what): | |
if not when: | |
raise FormatError("invalid when", when) | |
if not what: | |
raise FormatError("invalid what", what) | |
self.when = when | |
self.what = what | |
def __str__(self): | |
return u"<Invoke %s %s>" % (self.when, self.what) | |
def __bytes__(self): | |
return unicode(self).encode('utf-8') | |
def __hash__(self): | |
return hash((self.when, self.what)) | |
def __eq__(self, other): | |
if isinstance(other, Invoke): | |
return self.when == other.when and self.what == other.what | |
return False | |
def toXML(self): | |
e = Element('invoke', [('when', self.when)]) | |
e.text(self.what) | |
e.flatten() | |
return e | |
class InvokeMixin: | |
def addInvoke(self, invoke): | |
"""Add invoke to this object""" | |
if self.hasInvoke(invoke): | |
raise DuplicateError("Duplicate Invoke %s" % invoke) | |
self.invocations.add(invoke) | |
def hasInvoke(self, invoke): | |
"""Test to see if this object has invoke""" | |
return invoke in self.invocations | |
def removeInvoke(self, invoke): | |
"""Remove invoke from this object""" | |
if not self.hasInvoke(invoke): | |
raise NotFoundError("Invoke not found", invoke) | |
self.invocations.remove(invoke) | |
def clearInvokes(self): | |
"""Remove all Invoke objects""" | |
self.invocations.clear() | |
def invoke(self, when, what): | |
""" | |
Invoke executable 'what' when job reaches status 'when'. The value of | |
'what' should be a command that can be executed on the submit host. | |
The list of valid values for 'when' is: | |
WHEN MEANING | |
========== ======================================================= | |
never never invoke | |
start invoke just before job gets submitted. | |
on_error invoke after job finishes with failure (exitcode != 0). | |
on_success invoke after job finishes with success (exitcode == 0). | |
at_end invoke after job finishes, regardless of exit status. | |
all like start and at_end combined. | |
Examples: | |
obj.invoke('at_end','/usr/bin/mail -s "job done" [email protected]') | |
obj.invoke('on_error','/usr/bin/update_db -failure') | |
""" | |
self.addInvoke(Invoke(when, what)) | |
class ProfileMixin: | |
def addProfile(self, profile): | |
"""Add a profile to this object""" | |
if self.hasProfile(profile): | |
raise DuplicateError("Duplicate profile %s" % profile) | |
self.profiles.add(profile) | |
def hasProfile(self, profile): | |
"""Does this object have profile?""" | |
return profile in self.profiles | |
def removeProfile(self, profile): | |
"""Remove profile from this object""" | |
if not self.hasProfile(profile): | |
raise NotFoundError("Profile not found", profile) | |
self.profiles.remove(profile) | |
def clearProfiles(self): | |
"""Remove all profiles from this object""" | |
self.profiles.clear() | |
def profile(self, namespace, key, value): | |
"""Declarative profile addition""" | |
self.addProfile(Profile(namespace, key, value)) | |
class MetadataMixin: | |
def addMetadata(self, metadata): | |
"""Add metadata to this object""" | |
if self.hasMetadata(metadata): | |
raise DuplicateError("Duplicate Metadata %s" % metadata) | |
self._metadata.add(metadata) | |
def removeMetadata(self, metadata): | |
"""Remove meta from this object""" | |
if not self.hasMetadata(metadata): | |
raise NotFoundError("Metadata not found", metadata) | |
self._metadata.remove(metadata) | |
def hasMetadata(self, metadata): | |
"""Does this object have metadata?""" | |
return metadata in self._metadata | |
def clearMetadata(self): | |
"""Remove all metadata from this object""" | |
self._metadata.clear() | |
def metadata(self, key, value): | |
"""Declarative metadata addition""" | |
self.addMetadata(Metadata(key, value)) | |
class PFNMixin: | |
def addPFN(self, pfn): | |
"""Add a PFN to this object""" | |
if self.hasPFN(pfn): | |
raise DuplicateError("Duplicate PFN %s" % pfn) | |
self.pfns.add(pfn) | |
def removePFN(self, pfn): | |
"""Remove PFN from this object""" | |
if not self.hasPFN(pfn): | |
raise NotFoundError("PFN not found", pfn) | |
self.pfns.remove(pfn) | |
def hasPFN(self, pfn): | |
"""Does this object have pfn?""" | |
return pfn in self.pfns | |
def clearPFNs(self): | |
"""Remove all PFNs from this object""" | |
self.pfns.clear() | |
def PFN(self, url, site=None): | |
"""Declarative PFN addition""" | |
self.addPFN(PFN(url, site)) | |
class CatalogType(ProfileMixin, MetadataMixin, PFNMixin): | |
"""Base class for File and Executable""" | |
def __init__(self, name): | |
""" | |
All arguments specify the workflow-level behavior of this File. Job-level | |
behavior can be defined when adding the File to a Job's uses. If the | |
properties are not overridden at the job-level, then the workflow-level | |
values are used as defaults. | |
If this LFN is to be used as a job's stdin/stdout/stderr then the value | |
of link is ignored when generating the <std*> tags. | |
Arguments: | |
name: The name of the file (required) | |
""" | |
if not name: | |
raise FormatError('name required') | |
self.name = name | |
self.profiles = set() | |
self._metadata = set() | |
self.pfns = set() | |
def innerXML(self, parent): | |
for p in self.profiles: | |
parent.element(p.toXML()) | |
for m in self._metadata: | |
parent.element(m.toXML()) | |
for p in self.pfns: | |
parent.element(p.toXML()) | |
class File(CatalogType): | |
"""File(name) | |
A file entry for the DAX-level replica catalog, or a reference to a logical file | |
used by the workflow. | |
Examples: | |
input = File('input.txt') | |
Example use in job: | |
input = File('input.txt') | |
output = File('output.txt') | |
job = Job(name="compute") | |
job.uses(input, link=Link.INPUT, transfer=True) | |
job.uses(output, link=Link.OUTPUT, transfer=True, register=True) | |
""" | |
def __init__(self, name): | |
""" | |
All arguments specify the workflow-level behavior of this File. Job-level | |
behavior can be defined when adding the File to a Job's uses. If the | |
properties are not overridden at the job-level, then the workflow-level | |
values are used as defaults. | |
If this LFN is to be used as a job's stdin/stdout/stderr then the value | |
of link is ignored when generating the <std*> tags. | |
Arguments: | |
name: The name of the file (required) | |
""" | |
CatalogType.__init__(self, name) | |
def __str__(self): | |
return u"<File %s>" % self.name | |
def __bytes__(self): | |
return unicode(self).encode('utf-8') | |
def __hash__(self): | |
return hash(self.name) | |
def __eq__(self, other): | |
return isinstance(other, File) and self.name == other.name | |
def toArgumentXML(self): | |
"""Returns an XML representation of this File with no inner elements""" | |
return Element('file', [('name', self.name)]) | |
def toStdioXML(self, tag): | |
"""Returns an XML representation of this file as a stdin/out/err tag""" | |
if tag is 'stdin': | |
link = "input" # stdin is always input | |
elif tag in ['stdout', 'stderr']: | |
link = "output" # stdout/stderr are always output | |
else: | |
raise FormatError("invalid tag", tag, "should be one of stdin, stdout, stderr") | |
return Element(tag, [ | |
('name', self.name), | |
('link', link) | |
]) | |
def toXML(self): | |
"""Return the XML representation of this File with inner elements""" | |
e = self.toArgumentXML() | |
self.innerXML(e) | |
return e | |
class Executable(CatalogType, InvokeMixin): | |
"""Executable(name[,namespace][,version][,arch][,os][,osrelease][,osversion][,glibc][,installed]) | |
An entry for an executable in the DAX-level transformation catalog. | |
Examples: | |
grep = Executable("grep") | |
grep = Executable(namespace="os",name="grep",version="2.3") | |
grep = Executable(namespace="os",name="grep",version="2.3",arch=Arch.X86) | |
grep = Executable(namespace="os",name="grep",version="2.3",arch=Arch.X86,os=OS.LINUX) | |
""" | |
def __init__(self, name, namespace=None, version=None, arch=None, os=None, | |
osrelease=None, osversion=None, glibc=None, installed=None, | |
container=None): | |
""" | |
Arguments: | |
name: Logical name of executable | |
namespace: Executable namespace | |
version: Executable version | |
arch: Architecture that this exe was compiled for | |
os: Name of os that this exe was compiled for | |
osrelease: Release of os that this exe was compiled for | |
osversion: Version of os that this exe was compiled for | |
glibc: Version of glibc this exe was compiled against | |
installed: Is the executable installed (true), or stageable (false) | |
container: Optional attribute to specify the container to use | |
""" | |
CatalogType.__init__(self, name) | |
self.namespace = namespace | |
self.version = version | |
self.arch = arch | |
self.os = os | |
self.osrelease = osrelease | |
self.osversion = osversion | |
self.glibc = glibc | |
self.installed = installed | |
self.container = container | |
self.invocations = set() | |
def __str__(self): | |
return u"<Executable %s::%s:%s>" % (self.namespace, self.name, self.version) | |
def __bytes__(self): | |
return unicode(self).encode('utf-8') | |
def __hash__(self): | |
return hash((self.name, | |
self.namespace, | |
self.version, | |
self.arch, | |
self.os, | |
self.osrelease, | |
self.osversion, | |
self.glibc, | |
self.installed, | |
self.container)) | |
def __eq__(self, other): | |
if isinstance(other, Executable): | |
return self.name == other.name and \ | |
self.namespace == other.namespace and \ | |
self.version == other.version and \ | |
self.arch == other.arch and \ | |
self.os == other.os and \ | |
self.osrelease == other.osrelease and \ | |
self.osversion == other.osversion and \ | |
self.glibc == other.glibc and \ | |
self.installed == other.installed and \ | |
self.container == other.container | |
return False | |
def toXML(self): | |
"""Returns an XML representation of this file as a filename tag""" | |
e = Element('executable', [ | |
('name', self.name), | |
('namespace', self.namespace), | |
('version', self.version), | |
('arch', self.arch), | |
('os', self.os), | |
('osrelease', self.osrelease), | |
('osversion', self.osversion), | |
('glibc', self.glibc), | |
('installed', self.installed) | |
# containers are not support by the DAX3 schema | |
]) | |
self.innerXML(e) | |
if self.container: | |
warnings.warn('The DAX API extensions do not support references for containers.') | |
# Invocations | |
for inv in self.invocations: | |
e.element(inv.toXML()) | |
return e | |
class Container(ProfileMixin): | |
"""Container(name,type,image[,image_site]) | |
An entry for a container in the DAX-level transformation catalog. | |
Examples: | |
mycontainer = Container("myapp", type="docker", image="docker:///rynge/montage:latest") | |
""" | |
def __init__(self, name, type, image, imagesite=None): | |
""" | |
Arguments: | |
name: Container name | |
type: Container type (see ContainerType) | |
image: URL to image in a container hub OR URL to an existing container image | |
imagesite: optional site attribute to tell pegasus which site tar file exist | |
""" | |
if not name: | |
raise FormatError("Invalid name", name) | |
if not type: | |
raise FormatError("Invalid container type", type) | |
if not image: | |
raise FormatError("Invalid image", image) | |
self.name = name | |
self.type = type | |
self.image = image | |
self.imagesite = imagesite | |
self.profiles = set() | |
def __str__(self): | |
return u"<Container %s:%s>" % (self.name, self.type) | |
def __bytes__(self): | |
return unicode(self).encode('utf-8') | |
def __hash__(self): | |
return hash((self.name, | |
self.type, | |
self.image, | |
self.imagesite)) | |
def __eq__(self, other): | |
if isinstance(other, Container): | |
return self.name == other.name and \ | |
self.type == other.type and \ | |
self.image == other.image and \ | |
self.imagesite == other.imagesite | |
return False | |
class Metadata: | |
"""Metadata(key,value) | |
A way to add metadata to File and Executable objects. This is | |
useful if you want to annotate the DAX with things like file | |
sizes, application-specific attributes, etc. | |
There is currently no restriction on the type. | |
Examples: | |
s = Metadata('size','12') | |
a = Metadata('algorithm','plav') | |
""" | |
def __init__(self, key, value): | |
""" | |
Arguments: | |
key: The key name of the item | |
value: The value of the item | |
""" | |
if not key: | |
raise FormatError("Invalid key", key) | |
if not value: | |
raise FormatError("Invalid value", value) | |
self.key = key | |
self.value = value | |
def __str__(self): | |
return u"<Metadata %s = %s>" % (self.key, self.value) | |
def __bytes__(self): | |
return unicode(self).encode('utf-8') | |
def __hash__(self): | |
return hash(self.key) | |
def __eq__(self, other): | |
return isinstance(other, Metadata) and self.key == other.key | |
def toXML(self): | |
m = Element('metadata', [ | |
('key', self.key) | |
]) | |
m.text(self.value).flatten() | |
return m | |
class PFN(ProfileMixin): | |
"""PFN(url[,site]) | |
A physical file name. Used to provide URLs for files and executables | |
in the DAX-level replica catalog. | |
PFNs can be added to File and Executable. | |
Examples: | |
PFN('http://site.com/path/to/file.txt','site') | |
PFN('http://site.com/path/to/file.txt',site='site') | |
PFN('http://site.com/path/to/file.txt') | |
""" | |
def __init__(self, url, site=None): | |
""" | |
Arguments: | |
url: The url of the file. | |
site: The name of the site. [default: local] | |
""" | |
if not url: | |
raise FormatError("Invalid url", url) | |
if not site: | |
raise FormatError("Invalid site", site) | |
self.url = url | |
self.site = site | |
self.profiles = set() | |
def __str__(self): | |
return u"<PFN %s %s>" % (self.site, self.url) | |
def __bytes__(self): | |
return unicode(self).encode('utf-8') | |
def __hash__(self): | |
return hash((self.url, self.site)) | |
def __eq__(self, other): | |
return isinstance(other, PFN) and \ | |
self.url == other.url and \ | |
self.site == other.site | |
def toXML(self): | |
pfn = Element('pfn', [ | |
('url', self.url), | |
('site', self.site) | |
]) | |
for p in self.profiles: | |
pfn.element(p.toXML()) | |
return pfn | |
class Profile: | |
"""Profile(namespace,key,value) | |
A Profile captures scheduler-, system-, and environment-specific | |
parameters in a uniform fashion. Each profile declaration assigns a value | |
to a key within a namespace. | |
Profiles can be added to Job, DAX, DAG, File, Executable, and PFN. | |
Examples: | |
path = Profile(Namespace.ENV,'PATH','/bin') | |
vanilla = Profile(Namespace.CONDOR,'universe','vanilla') | |
path = Profile(namespace='env',key='PATH',value='/bin') | |
path = Profile('env','PATH','/bin') | |
""" | |
def __init__(self, namespace, key, value): | |
""" | |
Arguments: | |
namespace: The namespace of the profile (see Namespace) | |
key: The key name. Can be anything that responds to str(). | |
value: The value for the profile. Can be anything that responds to str(). | |
""" | |
self.namespace = namespace | |
self.key = key | |
self.value = value | |
def __str__(self): | |
return u"<Profile %s::%s = %s>" % (self.namespace, self.key, self.value) | |
def __bytes__(self): | |
return unicode(self).encode('utf-8') | |
def __hash__(self): | |
return hash((self.namespace, self.key)) | |
def __eq__(self, other): | |
return isinstance(other, Profile) and \ | |
self.namespace == other.namespace and \ | |
self.key == other.key | |
def toXML(self): | |
"""Return an XML element for this profile""" | |
p = Element("profile", [ | |
('namespace', self.namespace), | |
('key', self.key) | |
]) | |
p.text(self.value).flatten() | |
return p | |
class Use(MetadataMixin): | |
"""Use(file[,link][,register][,transfer][,optional] | |
[,namespace][,version][,executable][,size]) | |
Use of a logical file name. Used for referencing files in the DAX. | |
Attributes: | |
file: A string, File or Executable representing the logical file | |
link: Is this file a job input, output or both (See LFN) (optional) | |
register: Should this file be registered in RLS? (True/False) (optional) | |
transfer: Should this file be transferred? (True/False or See LFN) (optional) | |
optional: Is this file optional, or should its absence be an error? (optional) | |
namespace: Namespace of executable (optional) | |
version: version of executable (optional) | |
executable: Is file an executable? (True/False) (optional) | |
size: The size of the file (optional) | |
For Use objects that are added to Transformations, the attributes 'link', 'register', | |
'transfer', 'optional' and 'size' are ignored. | |
If a File object is passed in as 'file', then the default value for executable | |
is 'false'. Similarly, if an Executable object is passed in, then the default | |
value for executable is 'true'. | |
""" | |
def __init__(self, name, link=None, register=None, transfer=None, | |
optional=None, namespace=None, version=None, executable=None, | |
size=None): | |
if not name: | |
raise FormatError('Invalid name', name) | |
self.name = name | |
self.link = link | |
self.optional = optional | |
self.register = register | |
self.transfer = transfer | |
self.namespace = namespace | |
self.version = version | |
self.executable = executable | |
self.size = size | |
self._metadata = set() | |
def __str__(self): | |
return u"<Use %s::%s:%s>" % (self.namespace, self.name, self.version) | |
def __bytes__(self): | |
return unicode(self).encode("utf-8") | |
def __hash__(self): | |
return hash((self.namespace, self.name, self.version)) | |
def __eq__(self, other): | |
if isinstance(other, Use): | |
return self.namespace == other.namespace and \ | |
self.name == other.name and \ | |
self.version == other.version | |
def toTransformationXML(self): | |
e = Element('uses', [ | |
('namespace', self.namespace), | |
('name', self.name), | |
('version', self.version), | |
('executable', self.executable) | |
]) | |
for m in self._metadata: | |
e.element(m.toXML()) | |
return e | |
def toJobXML(self): | |
e = Element('uses', [ | |
('namespace', self.namespace), | |
('name', self.name), | |
('version', self.version), | |
('link', self.link), | |
('register', self.register), | |
('transfer', self.transfer), | |
('optional', self.optional), | |
('executable', self.executable), | |
('size', self.size) | |
]) | |
for m in self._metadata: | |
e.element(m.toXML()) | |
return e | |
class UseMixin: | |
def addUse(self, use): | |
"""Add Use to this object""" | |
if self.hasUse(use): | |
raise DuplicateError("Duplicate Use %s" % use) | |
self.used.add(use) | |
def removeUse(self, use): | |
"""Remove use from this object""" | |
if not self.hasUse(use): | |
raise NotFoundError("No such Use", use) | |
self.used.remove(use) | |
def hasUse(self, use): | |
"""Test to see if this object has use""" | |
return use in self.used | |
def clearUses(self): | |
"""Remove all uses from this object""" | |
self.used.clear() | |
def uses(self, arg, link=None, register=None, transfer=None, | |
optional=None, namespace=None, version=None, executable=None, | |
size=None): | |
if isinstance(arg, CatalogType): | |
_name = arg.name | |
else: | |
_name = arg | |
_namespace = None | |
_version = None | |
_executable = None | |
if isinstance(arg, Executable): | |
_namespace = arg.namespace | |
_version = arg.version | |
# We only need to set this for jobs | |
# the default is True for Transformations | |
if isinstance(self, AbstractJob): | |
_executable = True | |
if isinstance(arg, File): | |
# We only need to set this for transformations | |
# The default is False for Jobs | |
if isinstance(self, Transformation): | |
_executable = False | |
if namespace is not None: | |
_namespace = namespace | |
if version is not None: | |
_version = str(version) | |
if executable is not None: | |
_executable = executable | |
use = Use(_name, link, register, transfer, optional, _namespace, | |
_version, _executable, size) | |
# Copy metadata from File or Executable | |
# XXX Maybe we only want this if link!=input | |
if isinstance(arg, CatalogType): | |
for m in arg._metadata: | |
use.addMetadata(m) | |
self.addUse(use) | |
class Transformation(UseMixin, InvokeMixin, MetadataMixin): | |
"""Transformation((name|executable)[,namespace][,version]) | |
A logical transformation. This is basically defining one or more | |
entries in the transformation catalog. You can think of it like a macro | |
for adding <uses> to your jobs. You can define a transformation that | |
uses several files and/or executables, and refer to it when creating | |
a job. If you do, then all of the uses defined for that transformation | |
will be copied to the job during planning. | |
This code: | |
in = File("input.txt") | |
exe = Executable("exe") | |
t = Transformation(namespace="foo", name="bar", version="baz") | |
t.uses(in) | |
t.uses(exe) | |
j = Job(t) | |
is equivalent to: | |
in = File("input.txt") | |
exe = Executable("exe") | |
j = Job(namespace="foo", name="bar", version="baz") | |
j.uses(in) | |
j.uses(exe) | |
Examples: | |
Transformation(name='mDiff') | |
Transformation(namespace='montage',name='mDiff') | |
Transformation(namespace='montage',name='mDiff',version='3.0') | |
Using one executable: | |
mProjectPP = Executable(namespace="montage",name="mProjectPP",version="3.0") | |
x_mProjectPP = Transformation(mProjectPP) | |
Using several executables: | |
mDiff = Executable(namespace="montage",name="mProjectPP",version="3.0") | |
mFitplane = Executable(namespace="montage",name="mFitplane",version="3.0") | |
mDiffFit = Executable(namespace="montage",name="mDiffFit",version="3.0") | |
x_mDiffFit = Transformation(mDiffFit) | |
x_mDiffFit.uses(mDiff) | |
x_mDiffFit.uses(mFitplane) | |
Config files too: | |
conf = File("jbsim.conf") | |
jbsim = Executable(namespace="scec",name="jbsim") | |
x_jbsim = Transformation(jbsim) | |
x_jbsim.uses(conf) | |
""" | |
def __init__(self, name, namespace=None, version=None): | |
""" | |
The name argument can be either a string or an Executable object. | |
If it is an Executable object, then the Transformation inherits | |
its name, namespace and version from the Executable, and the | |
Transformation is set to use the Executable with link=input, | |
transfer=true, and register=False. | |
Arguments: | |
name: The name of the transformation | |
namespace: The namespace of the xform (optional) | |
version: The version of the xform (optional) | |
""" | |
self.name = None | |
self.namespace = None | |
self.version = None | |
self.used = set() | |
self.invocations = set() | |
self._metadata = set() | |
if isinstance(name, Executable): | |
self.name = name.name | |
self.namespace = name.namespace | |
self.version = name.version | |
else: | |
self.name = name | |
if namespace: self.namespace = namespace | |
if version: self.version = version | |
def __str__(self): | |
return u"<Transformation %s::%s:%s>" % (self.namespace, self.name, self.version) | |
def __bytes__(self): | |
return unicode(self).encode("utf-8") | |
def __hash__(self): | |
return hash((self.namespace, self.name, self.version)) | |
def __eq__(self, other): | |
if isinstance(other, Transformation): | |
return self.namespace == other.namespace and \ | |
self.name == other.name and \ | |
self.version == other.version | |
def toXML(self): | |
"""Return an XML representation of this transformation""" | |
e = Element('transformation', [ | |
('namespace', self.namespace), | |
('name', self.name), | |
('version', self.version) | |
]) | |
# Metadata | |
for m in self._metadata: | |
e.element(m.toXML()) | |
# Uses | |
def getlink(a): | |
return a.link | |
used = list(self.used) | |
used.sort(key=getlink) | |
for u in used: | |
e.element(u.toTransformationXML()) | |
# Invocations | |
for inv in self.invocations: | |
e.element(inv.toXML()) | |
return e | |
class AbstractJob(ProfileMixin, UseMixin, InvokeMixin, MetadataMixin): | |
"""The base class for Job, DAX, and DAG""" | |
def __init__(self, id=None, node_label=None): | |
self.id = id | |
self.node_label = node_label | |
self.arguments = [] | |
self.profiles = set() | |
self.used = set() | |
self.invocations = set() | |
self._metadata = set() | |
self.stdout = None | |
self.stderr = None | |
self.stdin = None | |
def addArguments(self, *arguments): | |
"""Add one or more arguments to the job (this will add whitespace)""" | |
for arg in arguments: | |
if not isinstance(arg, (File, basestring)): | |
raise FormatError("Invalid argument", arg) | |
for arg in arguments: | |
if len(self.arguments) > 0: | |
self.arguments.append(' ') | |
self.arguments.append(arg) | |
def addRawArguments(self, *arguments): | |
"""Add one or more arguments to the job (whitespace will NOT be added)""" | |
for arg in arguments: | |
if not isinstance(arg, (File, basestring)): | |
raise FormatError("Invalid argument", arg) | |
self.arguments.extend(arguments) | |
def clearArguments(self): | |
"""Remove all arguments from this job""" | |
self.arguments = [] | |
def getArguments(self): | |
"""Get the arguments of this job""" | |
args = [] | |
for a in self.arguments: | |
if isinstance(a, File): | |
args.append(unicode(a.toArgumentXML())) | |
else: | |
args.append(a) | |
return ''.join(args) | |
def setStdout(self, filename): | |
"""Redirect stdout to a file""" | |
if isinstance(filename, File): | |
self.stdout = filename | |
else: | |
self.stdout = File(filename) | |
def clearStdout(self): | |
"""Remove stdout file""" | |
self.stdout = None | |
def setStderr(self, filename): | |
"""Redirect stderr to a file""" | |
if isinstance(filename, File): | |
self.stderr = filename | |
else: | |
self.stderr = File(filename) | |
def clearStderr(self): | |
"""Remove stderr file""" | |
self.stderr = None | |
def setStdin(self, filename): | |
"""Redirect stdin from a file""" | |
if isinstance(filename, File): | |
self.stdin = filename | |
else: | |
self.stdin = File(filename) | |
def clearStdin(self): | |
"""Remove stdin file""" | |
self.stdin = None | |
def innerXML(self, element): | |
"""Return an XML representation of this job""" | |
# Arguments | |
if len(self.arguments) > 0: | |
args = Element('argument').flatten() | |
for x in self.arguments: | |
if isinstance(x, File): | |
args.element(x.toArgumentXML()) | |
else: | |
args.text(x) | |
element.element(args) | |
# Metadata | |
for m in self._metadata: | |
element.element(m.toXML()) | |
# Profiles | |
for pro in self.profiles: | |
element.element(pro.toXML()) | |
# Stdin/xml/err | |
if self.stdin is not None: | |
element.element(self.stdin.toStdioXML('stdin')) | |
if self.stdout is not None: | |
element.element(self.stdout.toStdioXML('stdout')) | |
if self.stderr is not None: | |
element.element(self.stderr.toStdioXML('stderr')) | |
# Uses | |
def getlink(a): | |
return a.link | |
used = list(self.used) | |
used.sort(key=getlink) | |
for use in used: | |
element.element(use.toJobXML()) | |
# Invocations | |
for inv in self.invocations: | |
element.element(inv.toXML()) | |
class Job(AbstractJob): | |
"""Job((name|Executable|Transformation)[,id][,namespace][,version][,node_label]) | |
This class defines the specifics of a job to run in an abstract manner. | |
All filename references still refer to logical files. All references | |
transformations also refer to logical transformations, though | |
physical location hints can be passed through profiles. | |
Examples: | |
sleep = Job(id="ID0001",name="sleep") | |
jbsim = Job(id="ID0002",name="jbsim",namespace="cybershake",version="2.1") | |
merge = Job("jbsim") | |
You can create a Job based on a Transformation: | |
mDiff_xform = Transformation("mDiff", ...) | |
mDiff_job = Job(mDiff_xform) | |
Or an Executable: | |
mDiff_exe = Executable("mDiff", ...) | |
mDiff_job = Job(mDiff_exe) | |
Several arguments can be added at the same time: | |
input = File(...) | |
output = File(...) | |
job.addArguments("-i",input,"-o",output) | |
Profiles are added similarly: | |
job.addProfile(Profile(Namespace.ENV, key='PATH', value='/bin')) | |
job.profile(Namespace.ENV, "PATH", "/bin") | |
Adding file uses is simple, and you can override global File attributes: | |
job.uses(input, Link.INPUT) | |
job.uses(output, Link.OUTPUT, transfer=True, register=True) | |
""" | |
def __init__(self, name, id=None, namespace=None, version=None, node_label=None): | |
"""The ID for each job should be unique in the DAX. If it is None, then | |
it will be automatically generated when the job is added to the DAX. | |
The name, namespace, and version should match what you have in your | |
transformation catalog. For example, if namespace="foo" name="bar" | |
and version="1.0", then the transformation catalog should have an | |
entry for "foo::bar:1.0". | |
The name argument can be either a string, or a Transformation object. If | |
it is a Transformation object, then the job will inherit the name, namespace, | |
and version from the Transformation. | |
Arguments: | |
name: The transformation name or Transformation object (required) | |
id: A unique identifier for the job (optional) | |
namespace: The namespace of the transformation (optional) | |
version: The transformation version (optional) | |
node_label: The label for this job to use in graphing (optional) | |
""" | |
self.namespace = None | |
self.version = None | |
if isinstance(name, (Transformation, Executable)): | |
self.name = name.name | |
self.namespace = name.namespace | |
self.version = name.version | |
elif isinstance(name, basestring): | |
self.name = name | |
else: | |
raise FormatError("Name must be a string, Transformation or Executable") | |
if not self.name: | |
raise FormatError("Invalid name", self.name) | |
AbstractJob.__init__(self, id=id, node_label=node_label) | |
if namespace: self.namespace = namespace | |
if version: self.version = version | |
def __str__(self): | |
return u"<Job %s %s::%s:%s>" % (self.id, self.namespace, self.name, self.version) | |
def __bytes__(self): | |
return unicode(self).encode("utf-8") | |
def toXML(self): | |
e = Element('job', [ | |
('id', self.id), | |
('namespace', self.namespace), | |
('name', self.name), | |
('version', self.version), | |
('node-label', self.node_label) | |
]) | |
self.innerXML(e) | |
return e | |
class DAX(AbstractJob): | |
"""DAX(file[,id][,node_label]) | |
This job represents a sub-DAX that will be planned and executed by | |
the workflow. | |
Examples: | |
daxjob1 = DAX("foo.dax") | |
daxfile = File("foo.dax") | |
daxjob2 = DAX(daxfile) | |
""" | |
def __init__(self, file, id=None, node_label=None): | |
""" | |
The name argument can be either a string, or a File object. If | |
it is a File object, then this job will inherit its name from the | |
File and the File will be added in a <uses> with transfer=True, | |
register=False, and link=input. | |
Arguments: | |
file: The logical name of the DAX file or the DAX File object | |
id: The id of the DAX job [default: autogenerated] | |
node_label: The label for this job to use in graphing | |
""" | |
if isinstance(file, File): | |
self.file = file | |
elif isinstance(file, str) or isinstance(file, unicode): | |
self.file = File(name=file) | |
else: | |
raise FormatError("invalid file", file) | |
AbstractJob.__init__(self, id=id, node_label=node_label) | |
def __str__(self): | |
return u"<DAX %s %s>" % (self.id, self.file.name) | |
def __bytes__(self): | |
return unicode(self).encode("utf-8") | |
def toXML(self): | |
"""Return an XML representation of this job""" | |
e = Element('dax', [ | |
('id', self.id), | |
('file', self.file.name), | |
('node-label', self.node_label) | |
]) | |
self.innerXML(e) | |
return e | |
class DAG(AbstractJob): | |
"""DAG(file[,id][,node_label]) | |
This job represents a sub-DAG that will be executed by this | |
workflow. | |
Examples: | |
dagjob1 = DAG(file="foo.dag") | |
dagfile = File("foo.dag") | |
dagjob2 = DAG(dagfile) | |
""" | |
def __init__(self, file, id=None, node_label=None): | |
""" | |
The name argument can be either a string, or a File object. If | |
it is a File object, then this job will inherit its name from the | |
File and the File will be added in a <uses> with transfer=True, | |
register=False, and link=input. | |
Arguments: | |
file: The logical name of the DAG file, or the DAG File object | |
id: The ID of the DAG job [default: autogenerated] | |
node_label: The label for this job to use in graphing | |
""" | |
if isinstance(file, File): | |
self.file = file | |
elif isinstance(file, str) or isinstance(file, unicode): | |
self.file = File(name=file) | |
else: | |
raise FormatError("Invalid file", file) | |
AbstractJob.__init__(self, id=id, node_label=node_label) | |
def __str__(self): | |
return u"<DAG %s %s>" % (self.id, self.file.name) | |
def __bytes__(self): | |
return unicode(self).encode("utf-8") | |
def toXML(self): | |
"""Return an XML representation of this DAG""" | |
e = Element('dag', [ | |
('id', self.id), | |
('file', self.file.name), | |
('node-label', self.node_label) | |
]) | |
self.innerXML(e) | |
return e | |
class Dependency: | |
"""A dependency between two nodes in the ADAG""" | |
def __init__(self, parent, child, edge_label=None): | |
if isinstance(parent, AbstractJob): | |
if not parent.id: | |
raise FormatError("Parent job has no id", parent) | |
self.parent = parent.id | |
elif parent: | |
self.parent = parent | |
else: | |
raise FormatError("Invalid parent", parent) | |
if isinstance(child, AbstractJob): | |
if not child.id: | |
raise FormatError("Child job has no id", child) | |
self.child = child.id | |
elif child: | |
self.child = child | |
else: | |
raise FormatError("Invalid child", child) | |
if self.parent == self.child: | |
raise FormatError("No self edges allowed", (self.parent, self.child)) | |
self.edge_label = edge_label | |
def __str__(self): | |
return "<Dependency %s -> %s>" % (self.parent, self.child) | |
def __bytes__(self): | |
return unicode(self).encode("utf-8") | |
def __hash__(self): | |
return hash((self.parent, self.child)) | |
def __eq__(self, other): | |
"""Equal dependencies have the same parent and child""" | |
if isinstance(other, Dependency): | |
return self.parent == other.parent and self.child == other.child | |
return False | |
class ADAG(InvokeMixin, MetadataMixin): | |
"""ADAG(name[,count][,index]) | |
Representation of a directed acyclic graph in XML (DAX). | |
Examples: | |
dax = ADAG('diamond') | |
or, if you want to use the old style count/index partitioning stuff: | |
part5 = ADAG('partition_5',count=10,index=5) | |
Adding jobs: | |
a = Job(...) | |
dax.addJob(a) | |
Adding parent-child control-flow dependency: | |
dax.addDependency(Dependency(parent=a,child=b)) | |
dax.addDependency(Dependency(parent=a,child=c)) | |
dax.addDependency(Dependency(parent=b,child=d)) | |
dax.addDependency(Dependency(parent=c,child=d)) | |
or: | |
dax.depends(child=b, parent=a) | |
Adding Files (not required if you have a replica catalog): | |
input = File(...) | |
dax.addFile(input) | |
Adding Executables (not required if you have a transformation catalog): | |
exe = Executable(...) | |
dax.addExecutable(exe) | |
Adding Transformations (not required if you have a transformation catalog): | |
xform = Transformation(...) | |
dax.addTransformation(xform) | |
Writing a DAX out to a file: | |
f = open('diamond.dax','w') | |
dax.writeXML(f) | |
f.close() | |
""" | |
def __init__(self, name, count=None, index=None, auto=False): | |
""" | |
Arguments: | |
name: The name of the workflow | |
count: Total number of DAXes that will be created | |
index: Zero-based index of this DAX | |
""" | |
if not name: | |
raise FormatError("Invalid ADAG name", name) | |
self.name = name | |
if count: count = int(count) | |
if index: index = int(index) | |
self.count = count | |
self.index = index | |
self._auto = auto if auto is True else False | |
# This is used to generate unique ID numbers | |
self.sequence = 1 | |
self.jobs = {} | |
self.files = set() | |
self.executables = set() | |
self.dependencies = set() | |
self.transformations = set() | |
self.invocations = set() | |
self._metadata = set() | |
def __str__(self): | |
return u"<ADAG %s>" % self.name | |
def __bytes__(self): | |
return unicode(self).encode("utf-8") | |
def nextJobID(self): | |
"""Get an autogenerated ID for the next job""" | |
next = None | |
while not next or next in self.jobs: | |
next = "ID%07d" % self.sequence | |
self.sequence += 1 | |
return next | |
def getJob(self, jobid): | |
"""Get a Job/DAG/DAX""" | |
if not jobid in self.jobs: | |
raise NotFoundError("Job not found", jobid) | |
return self.jobs[jobid] | |
def addJob(self, job): | |
"""Add a job to this ADAG""" | |
# Add an auto-generated ID if the job doesn't have one | |
if job.id is None: | |
job.id = self.nextJobID() | |
if self.hasJob(job): | |
raise DuplicateError("Duplicate job %s" % job) | |
self.jobs[job.id] = job | |
def hasJob(self, job): | |
"""Test to see if job is in this ADAG | |
The job parameter can be an object or a job ID | |
""" | |
if isinstance(job, AbstractJob): | |
return job.id in self.jobs | |
else: | |
return job in self.jobs | |
def removeJob(self, job): | |
"""Remove job from this ADAG""" | |
if not self.hasJob(job): | |
raise NotFoundError("Job not found", job) | |
if isinstance(job, AbstractJob): | |
del self.jobs[job.id] | |
else: | |
del self.jobs[job] | |
def clearJobs(self): | |
"""Remove all jobs""" | |
self.jobs = {} | |
def addDAX(self, dax): | |
"""Add a sub-DAX (synonym for addJob)""" | |
if not isinstance(dax, DAX): | |
raise FormatError("Not a DAX", dax) | |
self.addJob(dax) | |
def addDAG(self, dag): | |
"""Add a sub-DAG (synonym for addJob)""" | |
if not isinstance(dag, DAG): | |
raise FormatError("Not a DAG", dag) | |
self.addJob(dag) | |
def addFile(self, file): | |
"""Add a file to the DAX""" | |
if not isinstance(file, File): | |
raise FormatError("Invalid File", file) | |
if self.hasFile(file): | |
raise DuplicateError("Duplicate file %s" % file) | |
self.files.add(file) | |
def hasFile(self, file): | |
"""Check to see if file is in this ADAG""" | |
return file in self.files | |
def removeFile(self, file): | |
"""Remove file from this ADAG""" | |
if not self.hasFile(file): | |
raise NotFoundError("File not found", file) | |
self.files.remove(file) | |
def clearFiles(self): | |
"""Remove all files""" | |
self.files.clear() | |
def addExecutable(self, executable): | |
"""Add an executable to this ADAG""" | |
if self.hasExecutable(executable): | |
raise DuplicateError("Duplicate executable %s" % executable) | |
self.executables.add(executable) | |
def hasExecutable(self, executable): | |
"""Check if executable is in this ADAG""" | |
return executable in self.executables | |
def removeExecutable(self, executable): | |
"""Remove executable from this ADAG""" | |
if not self.hasExecutable(executable): | |
raise NotFoundError("Executable not found %s" % executable) | |
self.executables.remove(executable) | |
def clearExecutables(self): | |
"""Remove all executables""" | |
self.executables.clear() | |
def addTransformation(self, transformation): | |
"""Add a transformation to this ADAG""" | |
if self.hasTransformation(transformation): | |
raise DuplicateError("Duplicate tranformation %s" % transformation) | |
self.transformations.add(transformation) | |
def hasTransformation(self, transformation): | |
"""Check to see if transformation is in this ADAG""" | |
return transformation in self.transformations | |
def removeTransformation(self, transformation): | |
"""Remove transformation from this ADAG""" | |
if not self.hasTransformation(transformation): | |
raise NotFoundError("Transformation not found %s" % transformation) | |
self.transformations.remove(transformation) | |
def clearTransformations(self): | |
"""Remove all transformations""" | |
self.transformations.clear() | |
def depends(self, child, parent, edge_label=None): | |
"""Add a dependency to the workflow | |
Arguments: | |
child: The child job/dax/dag or id | |
parent: The parent job/dax/dag or id | |
edge_label: A label for the edge (optional) | |
""" | |
d = Dependency(parent, child, edge_label) | |
self.addDependency(d) | |
def addDependency(self, dep): | |
"""Add a dependency to the workflow | |
The old way to call this method is no longer valid. Please change: | |
adag.addDependency(parent="ID01", child="ID02", edge_label="E01") | |
to be: | |
adag.addDependency(Dependency(parent="ID01", child="ID02", edge_label="E01")) | |
or: | |
adag.depends(parent="ID01", child="ID02", edge_label="E01") | |
""" | |
if self.hasDependency(dep): | |
raise DuplicateError("Duplicate dependency %s" % dep) | |
# Check the jobs | |
if dep.parent not in self.jobs: | |
raise NotFoundError("Parent not found", dep.parent) | |
if dep.child not in self.jobs: | |
raise NotFoundError("Child not found", dep.child) | |
self.dependencies.add(dep) | |
def hasDependency(self, dep): | |
"""Check to see if dependency exists""" | |
return dep in self.dependencies | |
def removeDependency(self, dep): | |
"""Remove dependency from workflow""" | |
if not self.hasDependency(dep): | |
raise NotFoundError("Dependency not found", dep) | |
self.dependencies.remove(dep) | |
def clearDependencies(self): | |
"""Remove all dependencies""" | |
self.dependencies.clear() | |
def toXML(self): | |
"""Get the XML string for this ADAG | |
This is primarily intended for testing. If you have a large ADAG | |
you should use writeXML instead. | |
""" | |
s = StringIO() | |
self.writeXML(s) | |
xml = s.getvalue() | |
s.close() | |
return xml | |
def writeXMLFile(self, filename): | |
"""Write the ADAG to an XML file""" | |
file = codecs.open(filename, "w", "utf-8") | |
self.writeXML(file) | |
file.close() | |
def _autoDependencies(self): | |
"""Automatically compute job dependencies based on input/output files used by a job""" | |
if self._auto is False: | |
return | |
mapping = {} | |
def addOutput(job, file_obj): | |
if file_obj: | |
file_obj = file_obj.name | |
if file_obj not in mapping: | |
mapping[file_obj] = (set(), set()) | |
mapping[file_obj][1].add(job) | |
# Automatically determine dependencies | |
# Traverse each job | |
for job_id, job in self.jobs.iteritems(): | |
file_used = job.used | |
# If job produces to stdout, identify it as an output file | |
addOutput(job, job.stdout) | |
# If job produces to stderr, identify it as an output file | |
addOutput(job, job.stderr) | |
# If job consumes from stdin, identify it as an input file | |
if job.stdin: | |
if job.stdin.name not in mapping: | |
mapping[job.stdin.name] = (set(), set()) | |
mapping[job.stdin.name][0].add(job) | |
for f in file_used: | |
if f.name not in mapping: | |
mapping[f.name] = (set(), set()) | |
if f.link == Link.INPUT: | |
mapping[f.name][0].add(job) | |
else: | |
mapping[f.name][1].add(job) | |
for file_name, io in mapping.iteritems(): | |
# Go through the mapping and for each file add dependencies between the | |
# job producing a file and the jobs consuming the file | |
inputs = io[0] | |
if len(io[1]) > 0: | |
output = io[1].pop() | |
for _input in inputs: | |
try: | |
self.depends(parent=output, child=_input) | |
except DuplicateError: | |
pass | |
def writeXML(self, out): | |
"""Write the ADAG as XML to a stream""" | |
self._autoDependencies() | |
# Preamble | |
out.write('<?xml version="1.0" encoding="UTF-8"?>\n') | |
out.write('<!-- generated: %s -->\n' % datetime.datetime.now()) | |
if os.name == 'posix': | |
import pwd | |
username = pwd.getpwuid(os.getuid())[0] | |
elif os.name == 'nt': | |
username = os.getenv("USERNAME", "N/A") | |
else: | |
username = "N/A" | |
out.write('<!-- generated by: %s -->\n' % username) | |
out.write('<!-- generator: python -->\n') | |
# Open tag | |
out.write('<adag xmlns="%s" ' % SCHEMA_NAMESPACE) | |
out.write('xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" ') | |
out.write('xsi:schemaLocation="%s %s" ' % (SCHEMA_NAMESPACE, SCHEMA_LOCATION)) | |
out.write('version="%s" ' % SCHEMA_VERSION) | |
out.write('name="%s"' % self.name) | |
if self.count: out.write(' count="%d"' % self.count) | |
if self.index: out.write(' index="%d"' % self.index) | |
out.write('>\n') | |
# Metadata | |
for m in self._metadata: | |
out.write('\t') | |
m.toXML().write(stream=out, level=1) | |
out.write('\n') | |
# Invocations | |
for i in self.invocations: | |
out.write('\t') | |
i.toXML().write(stream=out, level=1) | |
out.write('\n') | |
# Files | |
for f in self.files: | |
out.write('\t') | |
f.toXML().write(stream=out, level=1) | |
out.write('\n') | |
# Executables | |
for e in self.executables: | |
out.write('\t') | |
e.toXML().write(stream=out, level=1) | |
out.write('\n') | |
# Transformations | |
for t in self.transformations: | |
out.write('\t') | |
t.toXML().write(stream=out, level=1) | |
out.write('\n') | |
# Jobs | |
keys = self.jobs.keys() | |
keys = sorted(keys) | |
for job_id in keys: | |
job = self.jobs[job_id] | |
out.write('\t') | |
job.toXML().write(stream=out, level=1) | |
out.write('\n') | |
# Dependencies | |
# Since we store dependencies as tuples, but we need to print them as nested elements | |
# we first build a map of all the children that maps child -> [(parent,label),...] | |
children = {} | |
for dep in self.dependencies: | |
if not dep.child in children: | |
children[dep.child] = [] | |
children[dep.child].append((dep.parent, dep.edge_label)) | |
# Now output all the xml in sorted order by child, then parent | |
keys = children.keys() | |
keys = sorted(keys) | |
for child in keys: | |
out.write('\t') | |
c = Element("child", [("ref", child)]) | |
parents = children[child] | |
parents = sorted(parents) | |
for parent, edge_label in parents: | |
p = Element("parent", [ | |
("ref", parent), | |
("edge-label", edge_label) | |
]) | |
c.element(p) | |
c.write(stream=out, level=1) | |
out.write('\n') | |
# Close tag | |
out.write('</adag>\n') | |
def parseString(string): | |
s = StringIO(string) | |
return parse(s) | |
def parse(infile): | |
try: | |
import xml.etree.cElementTree as etree | |
except: | |
try: | |
import xml.etree.ElementTree as etree | |
except: | |
try: | |
import elementtree.ElementTree as etree | |
except: | |
raise Exception("Please install elementtree") | |
NS = "{http://pegasus.isi.edu/schema/DAX}" | |
def QN(tag): | |
return NS + tag | |
def badattr(e, exc): | |
return ParseError("Attribute '%s' is required for element %s" % (exc.args[0], e.tag)) | |
def parse_invoke(e): | |
try: | |
return Invoke(when=e.attrib["when"], what=e.text) | |
except KeyError as ke: | |
raise badattr(e, ke) | |
def parse_adag(e): | |
try: | |
name = e.attrib['name'] | |
count = e.get("count", None) | |
index = e.get("index", None) | |
return ADAG(name=name, count=count, index=index) | |
except KeyError as ke: | |
raise badattr(e, ke) | |
def parse_profile(e): | |
try: | |
return Profile( | |
namespace=e.attrib["namespace"], | |
key=e.attrib["key"], | |
value=e.text) | |
except KeyError as ke: | |
raise badattr(e, ke) | |
def parse_metadata(e): | |
try: | |
return Metadata( | |
key=e.attrib['key'], | |
value=e.text) | |
except KeyError as ke: | |
raise badattr(e, ke) | |
def parse_pfn(e): | |
try: | |
p = PFN( | |
url=e.attrib['url'], | |
site=e.get("site", None) | |
) | |
except KeyError as ke: | |
raise badattr(e, ke) | |
for pr in e.findall(QN("profile")): | |
p.addProfile(parse_profile(pr)) | |
return p | |
def parse_catalog(e, f): | |
for p in e.findall(QN("profile")): | |
f.addProfile(parse_profile(p)) | |
for m in e.findall(QN("metadata")): | |
f.addMetadata(parse_metadata(m)) | |
for p in e.findall(QN("pfn")): | |
f.addPFN(parse_pfn(p)) | |
return f | |
def parse_file(e): | |
try: | |
f = File(e.attrib['name']) | |
except KeyError as ke: | |
raise badattr(e, ke) | |
return parse_catalog(e, f) | |
def parse_executable(e): | |
try: | |
exe = Executable( | |
name=e.attrib['name'], | |
namespace=e.get("namespace", None), | |
version=e.get("version", None), | |
arch=e.get("arch", None), | |
os=e.get("os", None), | |
osrelease=e.get("osrelease", None), | |
osversion=e.get("osversion", None), | |
glibc=e.get("glibc", None), | |
installed=e.get("installed", None) | |
) | |
except KeyError as ke: | |
raise badattr(e, ke) | |
parse_catalog(e, exe) | |
for i in e.findall(QN("invoke")): | |
exe.addInvoke(parse_invoke(i)) | |
return exe | |
def parse_uses(e): | |
try: | |
u = Use( | |
e.attrib['name'], | |
namespace=e.get('namespace', None), | |
version=e.get('version', None), | |
link=e.get('link', None), | |
register=e.get('register', None), | |
transfer=e.get('transfer', None), | |
optional=e.get('optional', None), | |
executable=e.get('executable', None) | |
) | |
except KeyError as ke: | |
raise badattr(e, ke) | |
for m in e.findall(QN("metadata")): | |
u.addMetadata(parse_metadata(m)) | |
return u | |
def parse_transformation(e): | |
try: | |
t = Transformation( | |
namespace=e.get("namespace", None), | |
name=e.attrib['name'], | |
version=e.get("version", None)) | |
except KeyError as ke: | |
raise badattr(e, ke) | |
for u in e.findall(QN("uses")): | |
t.addUse(parse_uses(u)) | |
for i in e.findall(QN("invoke")): | |
t.addInvoke(parse_invoke(i)) | |
for m in e.findall(QN("metadata")): | |
t.addMetadata(parse_metadata(m)) | |
return t | |
def iterelem(e): | |
if e.text: | |
yield e.text | |
for f in e: | |
if f.text: | |
yield f.text | |
yield f | |
if f.tail: | |
yield f.tail | |
def parse_absjob(e, j): | |
args = e.find(QN("argument")) | |
if args is not None: | |
for i in iterelem(args): | |
if isinstance(i, basestring): | |
j.addRawArguments(i) | |
else: | |
j.addRawArguments(File(i.attrib['name'])) | |
try: | |
s = e.find(QN("stdin")) | |
if s is not None: | |
j.setStdin(s.attrib['name']) | |
s = e.find(QN("stdout")) | |
if s is not None: | |
j.setStdout(s.attrib['name']) | |
s = e.find(QN("stderr")) | |
if s is not None: | |
j.setStderr(s.attrib['name']) | |
except KeyError as ke: | |
raise badattr(s, ke) | |
for p in e.findall(QN("profile")): | |
j.addProfile(parse_profile(p)) | |
for u in e.findall(QN("uses")): | |
j.addUse(parse_uses(u)) | |
for i in e.findall(QN("invoke")): | |
j.addInvoke(parse_invoke(i)) | |
for m in e.findall(QN("metadata")): | |
j.addMetadata(parse_metadata(m)) | |
return j | |
def parse_job(e): | |
try: | |
j = Job( | |
name=e.attrib["name"], | |
id=e.attrib["id"], | |
namespace=e.get("namespace", None), | |
version=e.get("version", None), | |
node_label=e.get("node-label", None) | |
) | |
except KeyError as ke: | |
raise badattr(e, ke) | |
return parse_absjob(e, j) | |
def parse_dax(e): | |
try: | |
d = DAX( | |
file=e.attrib["file"], | |
id=e.attrib["id"], | |
node_label=e.get("node-label", None) | |
) | |
except KeyError as ke: | |
raise badattr(e, ke) | |
return parse_absjob(e, d) | |
def parse_dag(e): | |
try: | |
d = DAG( | |
file=e.attrib["file"], | |
id=e.attrib["id"], | |
node_label=e.get("node-label", None) | |
) | |
except KeyError as ke: | |
raise badattr(e, ke) | |
return parse_absjob(e, d) | |
def parse_dependencies(e): | |
try: | |
child = e.attrib["ref"] | |
except KeyError as ke: | |
raise badattr(e, ke) | |
for p in e.findall(QN("parent")): | |
try: | |
parent = p.attrib["ref"] | |
label = p.attrib.get("edge-label", None) | |
yield Dependency(parent, child, label) | |
except KeyError as ke: | |
raise badattr(p, ke) | |
# We use iterparse because we don't have to read in the | |
# entire document | |
iterator = etree.iterparse(infile, events=("start", "end")) | |
iterator = iter(iterator) | |
# Get the document element (should be <adag>) | |
event, root = iterator.next() | |
adag = parse_adag(root) | |
# This function reads all the children of "node" | |
def expand(node): | |
event, elem = iterator.next() | |
while elem != node: | |
event, elem = iterator.next() | |
# We clear the document element to prevent | |
# the memory usage from growing | |
root.clear() | |
for ev, elem in iterator: | |
if ev == "end": | |
continue | |
# Read in the entire element and children | |
expand(elem) | |
if elem.tag == QN("job"): | |
j = parse_job(elem) | |
adag.addJob(j) | |
elif elem.tag == QN("child"): | |
for d in parse_dependencies(elem): | |
adag.addDependency(d) | |
elif elem.tag == QN("file"): | |
f = parse_file(elem) | |
adag.addFile(f) | |
elif elem.tag == QN("executable"): | |
e = parse_executable(elem) | |
adag.addExecutable(e) | |
elif elem.tag == QN("transformation"): | |
t = parse_transformation(elem) | |
adag.addTransformation(t) | |
elif elem.tag == QN("dag"): | |
d = parse_dag(elem) | |
adag.addJob(d) | |
elif elem.tag == QN("dax"): | |
d = parse_dax(elem) | |
adag.addJob(d) | |
elif elem.tag == QN("invoke"): | |
adag.addInvoke(parse_invoke(elem)) | |
elif elem.tag == QN("metadata"): | |
adag.addMetadata(parse_metadata(elem)) | |
else: | |
raise ParseError("Unknown tag", elem.tag) | |
return adag | |
def main(): | |
"""Simple smoke test""" | |
# Create a DAX | |
diamond = ADAG("diamond") | |
# Add some metadata | |
diamond.metadata("name", "diamond") | |
diamond.metadata("createdby", "Gideon Juve") | |
# add some invoke condition | |
diamond.invoke('on_error', '/usr/bin/update_db -failure') | |
# Add input file to the DAX-level replica catalog | |
a = File("f.a") | |
a.addPFN(PFN("gsiftp://site.com/inputs/f.a", "site")) | |
a.metadata("size", "1024") | |
diamond.addFile(a) | |
# Add executables to the DAX-level replica catalog | |
e_preprocess = Executable(namespace="diamond", name="preprocess", version="4.0", os="linux", arch="x86_64") | |
e_preprocess.metadata("size", "2048") | |
e_preprocess.addPFN(PFN("gsiftp://site.com/bin/preprocess", "site")) | |
diamond.addExecutable(e_preprocess) | |
e_findrange = Executable(namespace="diamond", name="findrange", version="4.0", os="linux", arch="x86_64") | |
e_findrange.addPFN(PFN("gsiftp://site.com/bin/findrange", "site")) | |
diamond.addExecutable(e_findrange) | |
e_analyze = Executable(namespace="diamond", name="analyze", version="4.0", os="linux", arch="x86_64") | |
e_analyze.addPFN(PFN("gsiftp://site.com/bin/analyze", "site")) | |
e_analyze.addProfile(Profile(namespace="env", key="APP_HOME", value="/app")) | |
diamond.addExecutable(e_analyze) | |
# Add a preprocess job | |
preprocess = Job(e_preprocess) | |
preprocess.metadata("time", "60") | |
b1 = File("f.b1") | |
b2 = File("f.b2") | |
preprocess.addArguments("-a preprocess", "-T60", "-i", a, "-o", b1, b2) | |
preprocess.uses(a, link=Link.INPUT) | |
preprocess.uses(b1, link=Link.OUTPUT, transfer=True) | |
preprocess.uses(b2, link=Link.OUTPUT, transfer=True) | |
diamond.addJob(preprocess) | |
# Add left Findrange job | |
frl = Job(e_findrange) | |
frl.metadata("time", "60") | |
c1 = File("f.c1") | |
frl.addArguments("-a findrange", "-T60", "-i", b1, "-o", c1) | |
frl.uses(b1, link=Link.INPUT) | |
frl.uses(c1, link=Link.OUTPUT, transfer=True) | |
diamond.addJob(frl) | |
# Add right Findrange job | |
frr = Job(e_findrange) | |
frr.metadata("time", "60") | |
c2 = File("f.c2") | |
frr.addArguments("-a findrange", "-T60", "-i", b2, "-o", c2) | |
frr.uses(b2, link=Link.INPUT) | |
frr.uses(c2, link=Link.OUTPUT, transfer=True) | |
diamond.addJob(frr) | |
# Add Analyze job | |
analyze = Job(e_analyze) | |
analyze.metadata("time", "60") | |
d = File("f.d") | |
analyze.addArguments("-a analyze", "-T60", "-i", c1, c2, "-o", d) | |
analyze.uses(c1, link=Link.INPUT) | |
analyze.uses(c2, link=Link.INPUT) | |
analyze.uses(d, link=Link.OUTPUT, transfer=True, register=True) | |
diamond.addJob(analyze) | |
# Add dependencies | |
diamond.depends(parent=preprocess, child=frl) | |
diamond.depends(parent=preprocess, child=frr) | |
diamond.depends(parent=frl, child=analyze) | |
diamond.depends(parent=frr, child=analyze) | |
# Get generated diamond dax | |
import sys | |
diamond.writeXML(sys.stdout) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment