Last active
October 7, 2022 08:55
-
-
Save mr-c/3b87d8538817c023197b7ed4bb1f0210 to your computer and use it in GitHub Desktop.
cwltool --print-input-schema examples for MGnify workflows & tools
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# This file was autogenerated using schema-salad-tool --codegen=python | |
# The code itself is released under the Apache 2.0 license and the help text is | |
# subject to the license of the original schema. | |
import copy | |
import logging | |
import os | |
import pathlib | |
import re | |
import tempfile | |
import uuid as _uuid__ # pylint: disable=unused-import # noqa: F401 | |
import xml.sax # nosec | |
from abc import ABC, abstractmethod | |
from io import StringIO | |
from typing import ( | |
Any, | |
Dict, | |
List, | |
MutableMapping, | |
MutableSequence, | |
Optional, | |
Sequence, | |
Tuple, | |
Type, | |
Union, | |
cast, | |
) | |
from urllib.parse import quote, urldefrag, urlparse, urlsplit, urlunsplit | |
from urllib.request import pathname2url | |
from rdflib import Graph | |
from rdflib.plugins.parsers.notation3 import BadSyntax | |
from ruamel.yaml.comments import CommentedMap | |
from schema_salad.exceptions import SchemaSaladException, ValidationException | |
from schema_salad.fetcher import DefaultFetcher, Fetcher, MemoryCachingFetcher | |
from schema_salad.sourceline import SourceLine, add_lc_filename | |
from schema_salad.utils import CacheType, yaml_no_ts # requires schema-salad v8.2+ | |
_vocab: Dict[str, str] = {} | |
_rvocab: Dict[str, str] = {} | |
_logger = logging.getLogger("salad") | |
IdxType = MutableMapping[str, Tuple[Any, "LoadingOptions"]] | |
class LoadingOptions: | |
idx: IdxType | |
fileuri: Optional[str] | |
baseuri: str | |
namespaces: MutableMapping[str, str] | |
schemas: MutableSequence[str] | |
original_doc: Optional[Any] | |
addl_metadata: MutableMapping[str, Any] | |
fetcher: Fetcher | |
vocab: Dict[str, str] | |
rvocab: Dict[str, str] | |
cache: CacheType | |
def __init__( | |
self, | |
fetcher: Optional[Fetcher] = None, | |
namespaces: Optional[Dict[str, str]] = None, | |
schemas: Optional[List[str]] = None, | |
fileuri: Optional[str] = None, | |
copyfrom: Optional["LoadingOptions"] = None, | |
original_doc: Optional[Any] = None, | |
addl_metadata: Optional[Dict[str, str]] = None, | |
baseuri: Optional[str] = None, | |
idx: Optional[IdxType] = None, | |
) -> None: | |
"""Create a LoadingOptions object.""" | |
self.original_doc = original_doc | |
if idx is not None: | |
self.idx = idx | |
else: | |
self.idx = copyfrom.idx if copyfrom is not None else {} | |
if fileuri is not None: | |
self.fileuri = fileuri | |
else: | |
self.fileuri = copyfrom.fileuri if copyfrom is not None else None | |
if baseuri is not None: | |
self.baseuri = baseuri | |
else: | |
self.baseuri = copyfrom.baseuri if copyfrom is not None else "" | |
if namespaces is not None: | |
self.namespaces = namespaces | |
else: | |
self.namespaces = copyfrom.namespaces if copyfrom is not None else {} | |
if schemas is not None: | |
self.schemas = schemas | |
else: | |
self.schemas = copyfrom.schemas if copyfrom is not None else [] | |
if addl_metadata is not None: | |
self.addl_metadata = addl_metadata | |
else: | |
self.addl_metadata = copyfrom.addl_metadata if copyfrom is not None else {} | |
if fetcher is not None: | |
self.fetcher = fetcher | |
elif copyfrom is not None: | |
self.fetcher = copyfrom.fetcher | |
else: | |
import requests | |
from cachecontrol.caches import FileCache | |
from cachecontrol.wrapper import CacheControl | |
root = pathlib.Path(os.environ.get("HOME", tempfile.gettempdir())) | |
session = CacheControl( | |
requests.Session(), | |
cache=FileCache(root / ".cache" / "salad"), | |
) | |
self.fetcher: Fetcher = DefaultFetcher({}, session) | |
self.cache = ( | |
self.fetcher.cache if isinstance(self.fetcher, MemoryCachingFetcher) else {} | |
) | |
self.vocab = _vocab | |
self.rvocab = _rvocab | |
if namespaces is not None: | |
self.vocab = self.vocab.copy() | |
self.rvocab = self.rvocab.copy() | |
for k, v in namespaces.items(): | |
self.vocab[k] = v | |
self.rvocab[v] = k | |
@property | |
def graph(self) -> Graph: | |
"""Generate a merged rdflib.Graph from all entries in self.schemas.""" | |
graph = Graph() | |
if not self.schemas: | |
return graph | |
key = str(hash(tuple(self.schemas))) | |
if key in self.cache: | |
return cast(Graph, self.cache[key]) | |
for schema in self.schemas: | |
fetchurl = ( | |
self.fetcher.urljoin(self.fileuri, schema) | |
if self.fileuri is not None | |
else pathlib.Path(schema).resolve().as_uri() | |
) | |
try: | |
if fetchurl not in self.cache or self.cache[fetchurl] is True: | |
_logger.debug("Getting external schema %s", fetchurl) | |
content = self.fetcher.fetch_text(fetchurl) | |
self.cache[fetchurl] = newGraph = Graph() | |
for fmt in ["xml", "turtle"]: | |
try: | |
newGraph.parse( | |
data=content, format=fmt, publicID=str(fetchurl) | |
) | |
break | |
except (xml.sax.SAXParseException, TypeError, BadSyntax): | |
pass | |
graph += self.cache[fetchurl] | |
except Exception as e: | |
_logger.warning( | |
"Could not load extension schema %s: %s", fetchurl, str(e) | |
) | |
self.cache[key] = graph | |
return graph | |
class Saveable(ABC): | |
"""Mark classes than have a save() and fromDoc() function.""" | |
@classmethod | |
@abstractmethod | |
def fromDoc( | |
cls, | |
_doc: Any, | |
baseuri: str, | |
loadingOptions: LoadingOptions, | |
docRoot: Optional[str] = None, | |
) -> "Saveable": | |
"""Construct this object from the result of yaml.load().""" | |
@abstractmethod | |
def save( | |
self, top: bool = False, base_url: str = "", relative_uris: bool = True | |
) -> Dict[str, Any]: | |
"""Convert this object to a JSON/YAML friendly dictionary.""" | |
def load_field(val, fieldtype, baseuri, loadingOptions): | |
# type: (Union[str, Dict[str, str]], _Loader, str, LoadingOptions) -> Any | |
if isinstance(val, MutableMapping): | |
if "$import" in val: | |
if loadingOptions.fileuri is None: | |
raise SchemaSaladException("Cannot load $import without fileuri") | |
result, metadata = _document_load_by_url( | |
fieldtype, | |
loadingOptions.fetcher.urljoin(loadingOptions.fileuri, val["$import"]), | |
loadingOptions, | |
) | |
return result | |
elif "$include" in val: | |
if loadingOptions.fileuri is None: | |
raise SchemaSaladException("Cannot load $import without fileuri") | |
val = loadingOptions.fetcher.fetch_text( | |
loadingOptions.fetcher.urljoin(loadingOptions.fileuri, val["$include"]) | |
) | |
return fieldtype.load(val, baseuri, loadingOptions) | |
save_type = Optional[ | |
Union[MutableMapping[str, Any], MutableSequence[Any], int, float, bool, str] | |
] | |
def save( | |
val: Any, | |
top: bool = True, | |
base_url: str = "", | |
relative_uris: bool = True, | |
) -> save_type: | |
if isinstance(val, Saveable): | |
return val.save(top=top, base_url=base_url, relative_uris=relative_uris) | |
if isinstance(val, MutableSequence): | |
return [ | |
save(v, top=False, base_url=base_url, relative_uris=relative_uris) | |
for v in val | |
] | |
if isinstance(val, MutableMapping): | |
newdict = {} | |
for key in val: | |
newdict[key] = save( | |
val[key], top=False, base_url=base_url, relative_uris=relative_uris | |
) | |
return newdict | |
if val is None or isinstance(val, (int, float, bool, str)): | |
return val | |
raise Exception("Not Saveable: %s" % type(val)) | |
def save_with_metadata( | |
val: Any, | |
valLoadingOpts: LoadingOptions, | |
top: bool = True, | |
base_url: str = "", | |
relative_uris: bool = True, | |
) -> save_type: | |
"""Save and set $namespaces, $schemas, $base and any other metadata fields at the top level.""" | |
saved_val = save(val, top, base_url, relative_uris) | |
newdict: MutableMapping[str, Any] = {} | |
if isinstance(saved_val, MutableSequence): | |
newdict = {"$graph": saved_val} | |
elif isinstance(saved_val, MutableMapping): | |
newdict = saved_val | |
if valLoadingOpts.namespaces: | |
newdict["$namespaces"] = valLoadingOpts.namespaces | |
if valLoadingOpts.schemas: | |
newdict["$schemas"] = valLoadingOpts.schemas | |
if valLoadingOpts.baseuri: | |
newdict["$base"] = valLoadingOpts.baseuri | |
for k, v in valLoadingOpts.addl_metadata.items(): | |
if k not in newdict: | |
newdict[k] = v | |
return newdict | |
def expand_url( | |
url, # type: str | |
base_url, # type: str | |
loadingOptions, # type: LoadingOptions | |
scoped_id=False, # type: bool | |
vocab_term=False, # type: bool | |
scoped_ref=None, # type: Optional[int] | |
): | |
# type: (...) -> str | |
if url in ("@id", "@type"): | |
return url | |
if vocab_term and url in loadingOptions.vocab: | |
return url | |
if bool(loadingOptions.vocab) and ":" in url: | |
prefix = url.split(":")[0] | |
if prefix in loadingOptions.vocab: | |
url = loadingOptions.vocab[prefix] + url[len(prefix) + 1 :] | |
split = urlsplit(url) | |
if ( | |
(bool(split.scheme) and split.scheme in ["http", "https", "file"]) | |
or url.startswith("$(") | |
or url.startswith("${") | |
): | |
pass | |
elif scoped_id and not bool(split.fragment): | |
splitbase = urlsplit(base_url) | |
frg = "" | |
if bool(splitbase.fragment): | |
frg = splitbase.fragment + "/" + split.path | |
else: | |
frg = split.path | |
pt = splitbase.path if splitbase.path != "" else "/" | |
url = urlunsplit((splitbase.scheme, splitbase.netloc, pt, splitbase.query, frg)) | |
elif scoped_ref is not None and not bool(split.fragment): | |
splitbase = urlsplit(base_url) | |
sp = splitbase.fragment.split("/") | |
n = scoped_ref | |
while n > 0 and len(sp) > 0: | |
sp.pop() | |
n -= 1 | |
sp.append(url) | |
url = urlunsplit( | |
( | |
splitbase.scheme, | |
splitbase.netloc, | |
splitbase.path, | |
splitbase.query, | |
"/".join(sp), | |
) | |
) | |
else: | |
url = loadingOptions.fetcher.urljoin(base_url, url) | |
if vocab_term: | |
split = urlsplit(url) | |
if bool(split.scheme): | |
if url in loadingOptions.rvocab: | |
return loadingOptions.rvocab[url] | |
else: | |
raise ValidationException(f"Term '{url}' not in vocabulary") | |
return url | |
class _Loader: | |
def load(self, doc, baseuri, loadingOptions, docRoot=None): | |
# type: (Any, str, LoadingOptions, Optional[str]) -> Any | |
pass | |
class _AnyLoader(_Loader): | |
def load(self, doc, baseuri, loadingOptions, docRoot=None): | |
# type: (Any, str, LoadingOptions, Optional[str]) -> Any | |
if doc is not None: | |
return doc | |
raise ValidationException("Expected non-null") | |
class _PrimitiveLoader(_Loader): | |
def __init__(self, tp): | |
# type: (Union[type, Tuple[Type[str], Type[str]]]) -> None | |
self.tp = tp | |
def load(self, doc, baseuri, loadingOptions, docRoot=None): | |
# type: (Any, str, LoadingOptions, Optional[str]) -> Any | |
if not isinstance(doc, self.tp): | |
raise ValidationException( | |
"Expected a {} but got {}".format( | |
self.tp.__class__.__name__, doc.__class__.__name__ | |
) | |
) | |
return doc | |
def __repr__(self): # type: () -> str | |
return str(self.tp) | |
class _ArrayLoader(_Loader): | |
def __init__(self, items): | |
# type: (_Loader) -> None | |
self.items = items | |
def load(self, doc, baseuri, loadingOptions, docRoot=None): | |
# type: (Any, str, LoadingOptions, Optional[str]) -> Any | |
if not isinstance(doc, MutableSequence): | |
raise ValidationException("Expected a list, was {}".format(type(doc))) | |
r = [] # type: List[Any] | |
errors = [] # type: List[SchemaSaladException] | |
for i in range(0, len(doc)): | |
try: | |
lf = load_field( | |
doc[i], _UnionLoader((self, self.items)), baseuri, loadingOptions | |
) | |
if isinstance(lf, MutableSequence): | |
r.extend(lf) | |
else: | |
r.append(lf) | |
except ValidationException as e: | |
errors.append(e.with_sourceline(SourceLine(doc, i, str))) | |
if errors: | |
raise ValidationException("", None, errors) | |
return r | |
def __repr__(self): # type: () -> str | |
return f"array<{self.items}>" | |
class _EnumLoader(_Loader): | |
def __init__(self, symbols, name): | |
# type: (Sequence[str], str) -> None | |
self.symbols = symbols | |
self.name = name | |
def load(self, doc, baseuri, loadingOptions, docRoot=None): | |
# type: (Any, str, LoadingOptions, Optional[str]) -> Any | |
if doc in self.symbols: | |
return doc | |
else: | |
raise ValidationException(f"Expected one of {self.symbols}") | |
def __repr__(self): # type: () -> str | |
return self.name | |
class _SecondaryDSLLoader(_Loader): | |
def __init__(self, inner): | |
# type: (_Loader) -> None | |
self.inner = inner | |
def load(self, doc, baseuri, loadingOptions, docRoot=None): | |
# type: (Any, str, LoadingOptions, Optional[str]) -> Any | |
r: List[Dict[str, Any]] = [] | |
if isinstance(doc, MutableSequence): | |
for d in doc: | |
if isinstance(d, str): | |
if d.endswith("?"): | |
r.append({"pattern": d[:-1], "required": False}) | |
else: | |
r.append({"pattern": d}) | |
elif isinstance(d, dict): | |
new_dict: Dict[str, Any] = {} | |
if "pattern" in d: | |
new_dict["pattern"] = d.pop("pattern") | |
else: | |
raise ValidationException( | |
"Missing pattern in secondaryFiles specification entry: {}".format( | |
d | |
) | |
) | |
new_dict["required"] = ( | |
d.pop("required") if "required" in d else None | |
) | |
if len(d): | |
raise ValidationException( | |
"Unallowed values in secondaryFiles specification entry: {}".format( | |
d | |
) | |
) | |
r.append(new_dict) | |
else: | |
raise ValidationException( | |
"Expected a string or sequence of (strings or mappings)." | |
) | |
elif isinstance(doc, MutableMapping): | |
new_dict = {} | |
if "pattern" in doc: | |
new_dict["pattern"] = doc.pop("pattern") | |
else: | |
raise ValidationException( | |
"Missing pattern in secondaryFiles specification entry: {}".format( | |
doc | |
) | |
) | |
new_dict["required"] = doc.pop("required") if "required" in doc else None | |
if len(doc): | |
raise ValidationException( | |
"Unallowed values in secondaryFiles specification entry: {}".format( | |
doc | |
) | |
) | |
r.append(new_dict) | |
elif isinstance(doc, str): | |
if doc.endswith("?"): | |
r.append({"pattern": doc[:-1], "required": False}) | |
else: | |
r.append({"pattern": doc}) | |
else: | |
raise ValidationException("Expected str or sequence of str") | |
return self.inner.load(r, baseuri, loadingOptions, docRoot) | |
class _RecordLoader(_Loader): | |
def __init__(self, classtype): | |
# type: (Type[Saveable]) -> None | |
self.classtype = classtype | |
def load(self, doc, baseuri, loadingOptions, docRoot=None): | |
# type: (Any, str, LoadingOptions, Optional[str]) -> Any | |
if not isinstance(doc, MutableMapping): | |
raise ValidationException("Expected a dict, was {}".format(type(doc))) | |
return self.classtype.fromDoc(doc, baseuri, loadingOptions, docRoot=docRoot) | |
def __repr__(self): # type: () -> str | |
return str(self.classtype.__name__) | |
class _ExpressionLoader(_Loader): | |
def __init__(self, items: Type[str]) -> None: | |
self.items = items | |
def load(self, doc, baseuri, loadingOptions, docRoot=None): | |
# type: (Any, str, LoadingOptions, Optional[str]) -> Any | |
if not isinstance(doc, str): | |
raise ValidationException("Expected a str, was {}".format(type(doc))) | |
return doc | |
class _UnionLoader(_Loader): | |
def __init__(self, alternates): | |
# type: (Sequence[_Loader]) -> None | |
self.alternates = alternates | |
def load(self, doc, baseuri, loadingOptions, docRoot=None): | |
# type: (Any, str, LoadingOptions, Optional[str]) -> Any | |
errors = [] | |
for t in self.alternates: | |
try: | |
return t.load(doc, baseuri, loadingOptions, docRoot=docRoot) | |
except ValidationException as e: | |
errors.append(ValidationException(f"tried {t} but", None, [e])) | |
raise ValidationException("", None, errors, "-") | |
def __repr__(self): # type: () -> str | |
return " | ".join(str(a) for a in self.alternates) | |
class _URILoader(_Loader): | |
def __init__(self, inner, scoped_id, vocab_term, scoped_ref): | |
# type: (_Loader, bool, bool, Union[int, None]) -> None | |
self.inner = inner | |
self.scoped_id = scoped_id | |
self.vocab_term = vocab_term | |
self.scoped_ref = scoped_ref | |
def load(self, doc, baseuri, loadingOptions, docRoot=None): | |
# type: (Any, str, LoadingOptions, Optional[str]) -> Any | |
if isinstance(doc, MutableSequence): | |
newdoc = [] | |
for i in doc: | |
if isinstance(i, str): | |
newdoc.append( | |
expand_url( | |
i, | |
baseuri, | |
loadingOptions, | |
self.scoped_id, | |
self.vocab_term, | |
self.scoped_ref, | |
) | |
) | |
else: | |
newdoc.append(i) | |
doc = newdoc | |
elif isinstance(doc, str): | |
doc = expand_url( | |
doc, | |
baseuri, | |
loadingOptions, | |
self.scoped_id, | |
self.vocab_term, | |
self.scoped_ref, | |
) | |
return self.inner.load(doc, baseuri, loadingOptions) | |
class _TypeDSLLoader(_Loader): | |
typeDSLregex = re.compile(r"^([^[?]+)(\[\])?(\?)?$") | |
def __init__(self, inner, refScope): | |
# type: (_Loader, Union[int, None]) -> None | |
self.inner = inner | |
self.refScope = refScope | |
def resolve( | |
self, | |
doc, # type: str | |
baseuri, # type: str | |
loadingOptions, # type: LoadingOptions | |
): | |
# type: (...) -> Union[List[Union[Dict[str, str], str]], Dict[str, str], str] | |
m = self.typeDSLregex.match(doc) | |
if m: | |
group1 = m.group(1) | |
assert group1 is not None # nosec | |
first = expand_url( | |
group1, baseuri, loadingOptions, False, True, self.refScope | |
) | |
second = third = None | |
if bool(m.group(2)): | |
second = {"type": "array", "items": first} | |
# second = CommentedMap((("type", "array"), | |
# ("items", first))) | |
# second.lc.add_kv_line_col("type", lc) | |
# second.lc.add_kv_line_col("items", lc) | |
# second.lc.filename = filename | |
if bool(m.group(3)): | |
third = ["null", second or first] | |
# third = CommentedSeq(["null", second or first]) | |
# third.lc.add_kv_line_col(0, lc) | |
# third.lc.add_kv_line_col(1, lc) | |
# third.lc.filename = filename | |
return third or second or first | |
return doc | |
def load(self, doc, baseuri, loadingOptions, docRoot=None): | |
# type: (Any, str, LoadingOptions, Optional[str]) -> Any | |
if isinstance(doc, MutableSequence): | |
r = [] # type: List[Any] | |
for d in doc: | |
if isinstance(d, str): | |
resolved = self.resolve(d, baseuri, loadingOptions) | |
if isinstance(resolved, MutableSequence): | |
for i in resolved: | |
if i not in r: | |
r.append(i) | |
else: | |
if resolved not in r: | |
r.append(resolved) | |
else: | |
r.append(d) | |
doc = r | |
elif isinstance(doc, str): | |
doc = self.resolve(doc, baseuri, loadingOptions) | |
return self.inner.load(doc, baseuri, loadingOptions) | |
class _IdMapLoader(_Loader): | |
def __init__(self, inner, mapSubject, mapPredicate): | |
# type: (_Loader, str, Union[str, None]) -> None | |
self.inner = inner | |
self.mapSubject = mapSubject | |
self.mapPredicate = mapPredicate | |
def load(self, doc, baseuri, loadingOptions, docRoot=None): | |
# type: (Any, str, LoadingOptions, Optional[str]) -> Any | |
if isinstance(doc, MutableMapping): | |
r = [] # type: List[Any] | |
for k in sorted(doc.keys()): | |
val = doc[k] | |
if isinstance(val, CommentedMap): | |
v = copy.copy(val) | |
v.lc.data = val.lc.data | |
v.lc.filename = val.lc.filename | |
v[self.mapSubject] = k | |
r.append(v) | |
elif isinstance(val, MutableMapping): | |
v2 = copy.copy(val) | |
v2[self.mapSubject] = k | |
r.append(v2) | |
else: | |
if self.mapPredicate: | |
v3 = {self.mapPredicate: val} | |
v3[self.mapSubject] = k | |
r.append(v3) | |
else: | |
raise ValidationException("No mapPredicate") | |
doc = r | |
return self.inner.load(doc, baseuri, loadingOptions) | |
def _document_load( | |
loader: _Loader, | |
doc: Union[str, MutableMapping[str, Any], MutableSequence[Any]], | |
baseuri: str, | |
loadingOptions: LoadingOptions, | |
addl_metadata_fields: Optional[MutableSequence[str]] = None, | |
) -> Tuple[Any, LoadingOptions]: | |
if isinstance(doc, str): | |
return _document_load_by_url( | |
loader, | |
loadingOptions.fetcher.urljoin(baseuri, doc), | |
loadingOptions, | |
addl_metadata_fields=addl_metadata_fields, | |
) | |
if isinstance(doc, MutableMapping): | |
addl_metadata = {} | |
if addl_metadata_fields is not None: | |
for mf in addl_metadata_fields: | |
if mf in doc: | |
addl_metadata[mf] = doc[mf] | |
docuri = baseuri | |
if "$base" in doc: | |
baseuri = doc["$base"] | |
loadingOptions = LoadingOptions( | |
copyfrom=loadingOptions, | |
namespaces=doc.get("$namespaces", None), | |
schemas=doc.get("$schemas", None), | |
baseuri=doc.get("$base", None), | |
addl_metadata=addl_metadata, | |
) | |
doc = { | |
k: v | |
for k, v in doc.items() | |
if k not in ("$namespaces", "$schemas", "$base") | |
} | |
if "$graph" in doc: | |
loadingOptions.idx[baseuri] = ( | |
loader.load(doc["$graph"], baseuri, loadingOptions), | |
loadingOptions, | |
) | |
else: | |
loadingOptions.idx[baseuri] = ( | |
loader.load(doc, baseuri, loadingOptions, docRoot=baseuri), | |
loadingOptions, | |
) | |
if docuri != baseuri: | |
loadingOptions.idx[docuri] = loadingOptions.idx[baseuri] | |
return loadingOptions.idx[baseuri] | |
if isinstance(doc, MutableSequence): | |
loadingOptions.idx[baseuri] = ( | |
loader.load(doc, baseuri, loadingOptions), | |
loadingOptions, | |
) | |
return loadingOptions.idx[baseuri] | |
raise ValidationException( | |
"Expected URI string, MutableMapping or MutableSequence, got %s" % type(doc) | |
) | |
def _document_load_by_url( | |
loader: _Loader, | |
url: str, | |
loadingOptions: LoadingOptions, | |
addl_metadata_fields: Optional[MutableSequence[str]] = None, | |
) -> Tuple[Any, LoadingOptions]: | |
if url in loadingOptions.idx: | |
return loadingOptions.idx[url] | |
doc_url, frg = urldefrag(url) | |
text = loadingOptions.fetcher.fetch_text(doc_url) | |
if isinstance(text, bytes): | |
textIO = StringIO(text.decode("utf-8")) | |
else: | |
textIO = StringIO(text) | |
textIO.name = str(doc_url) | |
yaml = yaml_no_ts() | |
result = yaml.load(textIO) | |
add_lc_filename(result, doc_url) | |
loadingOptions = LoadingOptions(copyfrom=loadingOptions, fileuri=doc_url) | |
_document_load( | |
loader, | |
result, | |
doc_url, | |
loadingOptions, | |
addl_metadata_fields=addl_metadata_fields, | |
) | |
return loadingOptions.idx[url] | |
def file_uri(path, split_frag=False): # type: (str, bool) -> str | |
if path.startswith("file://"): | |
return path | |
if split_frag: | |
pathsp = path.split("#", 2) | |
frag = "#" + quote(str(pathsp[1])) if len(pathsp) == 2 else "" | |
urlpath = pathname2url(str(pathsp[0])) | |
else: | |
urlpath = pathname2url(path) | |
frag = "" | |
if urlpath.startswith("//"): | |
return f"file:{urlpath}{frag}" | |
else: | |
return f"file://{urlpath}{frag}" | |
def prefix_url(url: str, namespaces: Dict[str, str]) -> str: | |
"""Expand short forms into full URLs using the given namespace dictionary.""" | |
for k, v in namespaces.items(): | |
if url.startswith(v): | |
return k + ":" + url[len(v) :] | |
return url | |
def save_relative_uri( | |
uri: Any, | |
base_url: str, | |
scoped_id: bool, | |
ref_scope: Optional[int], | |
relative_uris: bool, | |
) -> Any: | |
"""Convert any URI to a relative one, obeying the scoping rules.""" | |
if isinstance(uri, MutableSequence): | |
return [ | |
save_relative_uri(u, base_url, scoped_id, ref_scope, relative_uris) | |
for u in uri | |
] | |
elif isinstance(uri, str): | |
if not relative_uris or uri == base_url: | |
return uri | |
urisplit = urlsplit(uri) | |
basesplit = urlsplit(base_url) | |
if urisplit.scheme == basesplit.scheme and urisplit.netloc == basesplit.netloc: | |
if urisplit.path != basesplit.path: | |
p = os.path.relpath(urisplit.path, os.path.dirname(basesplit.path)) | |
if urisplit.fragment: | |
p = p + "#" + urisplit.fragment | |
return p | |
basefrag = basesplit.fragment + "/" | |
if ref_scope: | |
sp = basefrag.split("/") | |
i = 0 | |
while i < ref_scope: | |
sp.pop() | |
i += 1 | |
basefrag = "/".join(sp) | |
if urisplit.fragment.startswith(basefrag): | |
return urisplit.fragment[len(basefrag) :] | |
else: | |
return urisplit.fragment | |
return uri | |
else: | |
return save(uri, top=False, base_url=base_url, relative_uris=relative_uris) | |
def shortname(inputid: str) -> str: | |
""" | |
Compute the shortname of a fully qualified identifier. | |
See https://w3id.org/cwl/v1.2/SchemaSalad.html#Short_names. | |
""" | |
parsed_id = urlparse(inputid) | |
if parsed_id.fragment: | |
return parsed_id.fragment.split("/")[-1] | |
return parsed_id.path.split("/")[-1] | |
def parser_info() -> str: | |
return "uk.ac.ebi.metagenomics.mgnify.pipeline-v5.workflows.raw-reads-wf--v.5-cond.inputs" | |
class File(Saveable): | |
def __init__( | |
self, | |
location: Any, | |
path: Any, | |
basename: Any, | |
dirname: Any, | |
nameroot: Any, | |
nameext: Any, | |
checksum: Any, | |
size: Any, | |
secondaryFiles: Any, | |
format: Any, | |
contents: Any, | |
extension_fields: Optional[Dict[str, Any]] = None, | |
loadingOptions: Optional[LoadingOptions] = None, | |
) -> None: | |
if extension_fields: | |
self.extension_fields = extension_fields | |
else: | |
self.extension_fields = CommentedMap() | |
if loadingOptions: | |
self.loadingOptions = loadingOptions | |
else: | |
self.loadingOptions = LoadingOptions() | |
self.class_ = "File" | |
self.location = location | |
self.path = path | |
self.basename = basename | |
self.dirname = dirname | |
self.nameroot = nameroot | |
self.nameext = nameext | |
self.checksum = checksum | |
self.size = size | |
self.secondaryFiles = secondaryFiles | |
self.format = format | |
self.contents = contents | |
def __eq__(self, other: Any) -> bool: | |
if isinstance(other, File): | |
return bool( | |
self.class_ == other.class_ | |
and self.location == other.location | |
and self.path == other.path | |
and self.basename == other.basename | |
and self.dirname == other.dirname | |
and self.nameroot == other.nameroot | |
and self.nameext == other.nameext | |
and self.checksum == other.checksum | |
and self.size == other.size | |
and self.secondaryFiles == other.secondaryFiles | |
and self.format == other.format | |
and self.contents == other.contents | |
) | |
return False | |
def __hash__(self) -> int: | |
return hash( | |
( | |
self.class_, | |
self.location, | |
self.path, | |
self.basename, | |
self.dirname, | |
self.nameroot, | |
self.nameext, | |
self.checksum, | |
self.size, | |
self.secondaryFiles, | |
self.format, | |
self.contents, | |
) | |
) | |
@classmethod | |
def fromDoc( | |
cls, | |
doc: Any, | |
baseuri: str, | |
loadingOptions: LoadingOptions, | |
docRoot: Optional[str] = None, | |
) -> "File": | |
_doc = copy.copy(doc) | |
if hasattr(doc, "lc"): | |
_doc.lc.data = doc.lc.data | |
_doc.lc.filename = doc.lc.filename | |
_errors__ = [] | |
if _doc.get("class") != "File": | |
raise ValidationException("Not a File") | |
try: | |
location = load_field( | |
_doc.get("location"), | |
uri_union_of_None_type_or_strtype_False_True_None, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `location` field is not valid because:", | |
SourceLine(_doc, "location", str), | |
[e], | |
) | |
) | |
try: | |
path = load_field( | |
_doc.get("path"), | |
uri_union_of_None_type_or_strtype_False_False_None, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `path` field is not valid because:", | |
SourceLine(_doc, "path", str), | |
[e], | |
) | |
) | |
try: | |
basename = load_field( | |
_doc.get("basename"), | |
uri_union_of_None_type_or_strtype_False_False_None, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `basename` field is not valid because:", | |
SourceLine(_doc, "basename", str), | |
[e], | |
) | |
) | |
try: | |
dirname = load_field( | |
_doc.get("dirname"), | |
union_of_None_type_or_strtype, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `dirname` field is not valid because:", | |
SourceLine(_doc, "dirname", str), | |
[e], | |
) | |
) | |
try: | |
nameroot = load_field( | |
_doc.get("nameroot"), | |
union_of_None_type_or_strtype, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `nameroot` field is not valid because:", | |
SourceLine(_doc, "nameroot", str), | |
[e], | |
) | |
) | |
try: | |
nameext = load_field( | |
_doc.get("nameext"), | |
union_of_None_type_or_strtype, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `nameext` field is not valid because:", | |
SourceLine(_doc, "nameext", str), | |
[e], | |
) | |
) | |
try: | |
checksum = load_field( | |
_doc.get("checksum"), | |
union_of_None_type_or_strtype, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `checksum` field is not valid because:", | |
SourceLine(_doc, "checksum", str), | |
[e], | |
) | |
) | |
try: | |
size = load_field( | |
_doc.get("size"), | |
union_of_None_type_or_inttype, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `size` field is not valid because:", | |
SourceLine(_doc, "size", str), | |
[e], | |
) | |
) | |
try: | |
secondaryFiles = load_field( | |
_doc.get("secondaryFiles"), | |
secondaryfilesdsl_union_of_None_type_or_array_of_union_of_FileLoader_or_DirectoryLoader, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `secondaryFiles` field is not valid because:", | |
SourceLine(_doc, "secondaryFiles", str), | |
[e], | |
) | |
) | |
try: | |
format = load_field( | |
_doc.get("format"), | |
uri_union_of_None_type_or_strtype_True_False_None, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `format` field is not valid because:", | |
SourceLine(_doc, "format", str), | |
[e], | |
) | |
) | |
try: | |
contents = load_field( | |
_doc.get("contents"), | |
union_of_None_type_or_strtype, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `contents` field is not valid because:", | |
SourceLine(_doc, "contents", str), | |
[e], | |
) | |
) | |
extension_fields: Dict[str, Any] = {} | |
for k in _doc.keys(): | |
if k not in cls.attrs: | |
if ":" in k: | |
ex = expand_url( | |
k, "", loadingOptions, scoped_id=False, vocab_term=False | |
) | |
extension_fields[ex] = _doc[k] | |
else: | |
_errors__.append( | |
ValidationException( | |
"invalid field `{}`, expected one of: `class`, `location`, `path`, `basename`, `dirname`, `nameroot`, `nameext`, `checksum`, `size`, `secondaryFiles`, `format`, `contents`".format( | |
k | |
), | |
SourceLine(_doc, k, str), | |
) | |
) | |
break | |
if _errors__: | |
raise ValidationException("Trying 'File'", None, _errors__) | |
_constructed = cls( | |
location=location, | |
path=path, | |
basename=basename, | |
dirname=dirname, | |
nameroot=nameroot, | |
nameext=nameext, | |
checksum=checksum, | |
size=size, | |
secondaryFiles=secondaryFiles, | |
format=format, | |
contents=contents, | |
extension_fields=extension_fields, | |
loadingOptions=loadingOptions, | |
) | |
return _constructed | |
def save( | |
self, top: bool = False, base_url: str = "", relative_uris: bool = True | |
) -> Dict[str, Any]: | |
r: Dict[str, Any] = {} | |
if relative_uris: | |
for ef in self.extension_fields: | |
r[prefix_url(ef, self.loadingOptions.vocab)] = self.extension_fields[ef] | |
else: | |
for ef in self.extension_fields: | |
r[ef] = self.extension_fields[ef] | |
r["class"] = "File" | |
if self.location is not None: | |
u = save_relative_uri(self.location, base_url, False, None, relative_uris) | |
r["location"] = u | |
if self.path is not None: | |
u = save_relative_uri(self.path, base_url, False, None, relative_uris) | |
r["path"] = u | |
if self.basename is not None: | |
u = save_relative_uri(self.basename, base_url, False, None, relative_uris) | |
r["basename"] = u | |
if self.dirname is not None: | |
r["dirname"] = save( | |
self.dirname, top=False, base_url=base_url, relative_uris=relative_uris | |
) | |
if self.nameroot is not None: | |
r["nameroot"] = save( | |
self.nameroot, top=False, base_url=base_url, relative_uris=relative_uris | |
) | |
if self.nameext is not None: | |
r["nameext"] = save( | |
self.nameext, top=False, base_url=base_url, relative_uris=relative_uris | |
) | |
if self.checksum is not None: | |
r["checksum"] = save( | |
self.checksum, top=False, base_url=base_url, relative_uris=relative_uris | |
) | |
if self.size is not None: | |
r["size"] = save( | |
self.size, top=False, base_url=base_url, relative_uris=relative_uris | |
) | |
if self.secondaryFiles is not None: | |
r["secondaryFiles"] = save( | |
self.secondaryFiles, | |
top=False, | |
base_url=base_url, | |
relative_uris=relative_uris, | |
) | |
if self.format is not None: | |
u = save_relative_uri(self.format, base_url, True, None, relative_uris) | |
r["format"] = u | |
if self.contents is not None: | |
r["contents"] = save( | |
self.contents, top=False, base_url=base_url, relative_uris=relative_uris | |
) | |
# top refers to the directory level | |
if top: | |
if self.loadingOptions.namespaces: | |
r["$namespaces"] = self.loadingOptions.namespaces | |
if self.loadingOptions.schemas: | |
r["$schemas"] = self.loadingOptions.schemas | |
return r | |
attrs = frozenset( | |
[ | |
"class", | |
"location", | |
"path", | |
"basename", | |
"dirname", | |
"nameroot", | |
"nameext", | |
"checksum", | |
"size", | |
"secondaryFiles", | |
"format", | |
"contents", | |
] | |
) | |
class Directory(Saveable): | |
def __init__( | |
self, | |
location: Any, | |
path: Any, | |
basename: Any, | |
listing: Any, | |
extension_fields: Optional[Dict[str, Any]] = None, | |
loadingOptions: Optional[LoadingOptions] = None, | |
) -> None: | |
if extension_fields: | |
self.extension_fields = extension_fields | |
else: | |
self.extension_fields = CommentedMap() | |
if loadingOptions: | |
self.loadingOptions = loadingOptions | |
else: | |
self.loadingOptions = LoadingOptions() | |
self.class_ = "Directory" | |
self.location = location | |
self.path = path | |
self.basename = basename | |
self.listing = listing | |
def __eq__(self, other: Any) -> bool: | |
if isinstance(other, Directory): | |
return bool( | |
self.class_ == other.class_ | |
and self.location == other.location | |
and self.path == other.path | |
and self.basename == other.basename | |
and self.listing == other.listing | |
) | |
return False | |
def __hash__(self) -> int: | |
return hash( | |
(self.class_, self.location, self.path, self.basename, self.listing) | |
) | |
@classmethod | |
def fromDoc( | |
cls, | |
doc: Any, | |
baseuri: str, | |
loadingOptions: LoadingOptions, | |
docRoot: Optional[str] = None, | |
) -> "Directory": | |
_doc = copy.copy(doc) | |
if hasattr(doc, "lc"): | |
_doc.lc.data = doc.lc.data | |
_doc.lc.filename = doc.lc.filename | |
_errors__ = [] | |
if _doc.get("class") != "Directory": | |
raise ValidationException("Not a Directory") | |
try: | |
location = load_field( | |
_doc.get("location"), | |
uri_union_of_None_type_or_strtype_False_True_None, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `location` field is not valid because:", | |
SourceLine(_doc, "location", str), | |
[e], | |
) | |
) | |
try: | |
path = load_field( | |
_doc.get("path"), | |
uri_union_of_None_type_or_strtype_False_False_None, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `path` field is not valid because:", | |
SourceLine(_doc, "path", str), | |
[e], | |
) | |
) | |
try: | |
basename = load_field( | |
_doc.get("basename"), | |
uri_union_of_None_type_or_strtype_False_False_None, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `basename` field is not valid because:", | |
SourceLine(_doc, "basename", str), | |
[e], | |
) | |
) | |
try: | |
listing = load_field( | |
_doc.get("listing"), | |
union_of_None_type_or_array_of_union_of_FileLoader_or_DirectoryLoader, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `listing` field is not valid because:", | |
SourceLine(_doc, "listing", str), | |
[e], | |
) | |
) | |
extension_fields: Dict[str, Any] = {} | |
for k in _doc.keys(): | |
if k not in cls.attrs: | |
if ":" in k: | |
ex = expand_url( | |
k, "", loadingOptions, scoped_id=False, vocab_term=False | |
) | |
extension_fields[ex] = _doc[k] | |
else: | |
_errors__.append( | |
ValidationException( | |
"invalid field `{}`, expected one of: `class`, `location`, `path`, `basename`, `listing`".format( | |
k | |
), | |
SourceLine(_doc, k, str), | |
) | |
) | |
break | |
if _errors__: | |
raise ValidationException("Trying 'Directory'", None, _errors__) | |
_constructed = cls( | |
location=location, | |
path=path, | |
basename=basename, | |
listing=listing, | |
extension_fields=extension_fields, | |
loadingOptions=loadingOptions, | |
) | |
return _constructed | |
def save( | |
self, top: bool = False, base_url: str = "", relative_uris: bool = True | |
) -> Dict[str, Any]: | |
r: Dict[str, Any] = {} | |
if relative_uris: | |
for ef in self.extension_fields: | |
r[prefix_url(ef, self.loadingOptions.vocab)] = self.extension_fields[ef] | |
else: | |
for ef in self.extension_fields: | |
r[ef] = self.extension_fields[ef] | |
r["class"] = "Directory" | |
if self.location is not None: | |
u = save_relative_uri(self.location, base_url, False, None, relative_uris) | |
r["location"] = u | |
if self.path is not None: | |
u = save_relative_uri(self.path, base_url, False, None, relative_uris) | |
r["path"] = u | |
if self.basename is not None: | |
u = save_relative_uri(self.basename, base_url, False, None, relative_uris) | |
r["basename"] = u | |
if self.listing is not None: | |
r["listing"] = save( | |
self.listing, top=False, base_url=base_url, relative_uris=relative_uris | |
) | |
# top refers to the directory level | |
if top: | |
if self.loadingOptions.namespaces: | |
r["$namespaces"] = self.loadingOptions.namespaces | |
if self.loadingOptions.schemas: | |
r["$schemas"] = self.loadingOptions.schemas | |
return r | |
attrs = frozenset(["class", "location", "path", "basename", "listing"]) | |
class inputs_record_schema(Saveable): | |
def __init__( | |
self, | |
_5_8s_pattern: Any, | |
_5s_pattern: Any, | |
CGC_config: Any, | |
CGC_postfixes: Any, | |
EggNOG_data_dir: Any, | |
EggNOG_db: Any, | |
EggNOG_diamond_db: Any, | |
HMM_gathering_bit_score: Any, | |
HMM_name_database: Any, | |
HMM_omit_alignment: Any, | |
InterProScan_applications: Any, | |
InterProScan_databases: Any, | |
InterProScan_outputFormat: Any, | |
cgc_chunk_size: Any, | |
forward_reads: Any, | |
func_ann_names_hmmer: Any, | |
func_ann_names_ips: Any, | |
go_config: Any, | |
hmmsearch_header: Any, | |
ips_header: Any, | |
ko_file: Any, | |
lsu_db: Any, | |
lsu_label: Any, | |
lsu_otus: Any, | |
lsu_tax: Any, | |
other_ncRNA_models: Any, | |
protein_chunk_size_IPS: Any, | |
protein_chunk_size_hmm: Any, | |
qc_min_length: Any, | |
reverse_reads: Any, | |
rfam_model_clans: Any, | |
rfam_models: Any, | |
single_reads: Any, | |
ssu_db: Any, | |
ssu_label: Any, | |
ssu_otus: Any, | |
ssu_tax: Any, | |
extension_fields: Optional[Dict[str, Any]] = None, | |
loadingOptions: Optional[LoadingOptions] = None, | |
) -> None: | |
if extension_fields: | |
self.extension_fields = extension_fields | |
else: | |
self.extension_fields = CommentedMap() | |
if loadingOptions: | |
self.loadingOptions = loadingOptions | |
else: | |
self.loadingOptions = LoadingOptions() | |
self._5_8s_pattern = _5_8s_pattern | |
self._5s_pattern = _5s_pattern | |
self.CGC_config = CGC_config | |
self.CGC_postfixes = CGC_postfixes | |
self.EggNOG_data_dir = EggNOG_data_dir | |
self.EggNOG_db = EggNOG_db | |
self.EggNOG_diamond_db = EggNOG_diamond_db | |
self.HMM_gathering_bit_score = HMM_gathering_bit_score | |
self.HMM_name_database = HMM_name_database | |
self.HMM_omit_alignment = HMM_omit_alignment | |
self.InterProScan_applications = InterProScan_applications | |
self.InterProScan_databases = InterProScan_databases | |
self.InterProScan_outputFormat = InterProScan_outputFormat | |
self.cgc_chunk_size = cgc_chunk_size | |
self.forward_reads = forward_reads | |
self.func_ann_names_hmmer = func_ann_names_hmmer | |
self.func_ann_names_ips = func_ann_names_ips | |
self.go_config = go_config | |
self.hmmsearch_header = hmmsearch_header | |
self.ips_header = ips_header | |
self.ko_file = ko_file | |
self.lsu_db = lsu_db | |
self.lsu_label = lsu_label | |
self.lsu_otus = lsu_otus | |
self.lsu_tax = lsu_tax | |
self.other_ncRNA_models = other_ncRNA_models | |
self.protein_chunk_size_IPS = protein_chunk_size_IPS | |
self.protein_chunk_size_hmm = protein_chunk_size_hmm | |
self.qc_min_length = qc_min_length | |
self.reverse_reads = reverse_reads | |
self.rfam_model_clans = rfam_model_clans | |
self.rfam_models = rfam_models | |
self.single_reads = single_reads | |
self.ssu_db = ssu_db | |
self.ssu_label = ssu_label | |
self.ssu_otus = ssu_otus | |
self.ssu_tax = ssu_tax | |
def __eq__(self, other: Any) -> bool: | |
if isinstance(other, inputs_record_schema): | |
return bool( | |
self._5_8s_pattern == other._5_8s_pattern | |
and self._5s_pattern == other._5s_pattern | |
and self.CGC_config == other.CGC_config | |
and self.CGC_postfixes == other.CGC_postfixes | |
and self.EggNOG_data_dir == other.EggNOG_data_dir | |
and self.EggNOG_db == other.EggNOG_db | |
and self.EggNOG_diamond_db == other.EggNOG_diamond_db | |
and self.HMM_gathering_bit_score == other.HMM_gathering_bit_score | |
and self.HMM_name_database == other.HMM_name_database | |
and self.HMM_omit_alignment == other.HMM_omit_alignment | |
and self.InterProScan_applications == other.InterProScan_applications | |
and self.InterProScan_databases == other.InterProScan_databases | |
and self.InterProScan_outputFormat == other.InterProScan_outputFormat | |
and self.cgc_chunk_size == other.cgc_chunk_size | |
and self.forward_reads == other.forward_reads | |
and self.func_ann_names_hmmer == other.func_ann_names_hmmer | |
and self.func_ann_names_ips == other.func_ann_names_ips | |
and self.go_config == other.go_config | |
and self.hmmsearch_header == other.hmmsearch_header | |
and self.ips_header == other.ips_header | |
and self.ko_file == other.ko_file | |
and self.lsu_db == other.lsu_db | |
and self.lsu_label == other.lsu_label | |
and self.lsu_otus == other.lsu_otus | |
and self.lsu_tax == other.lsu_tax | |
and self.other_ncRNA_models == other.other_ncRNA_models | |
and self.protein_chunk_size_IPS == other.protein_chunk_size_IPS | |
and self.protein_chunk_size_hmm == other.protein_chunk_size_hmm | |
and self.qc_min_length == other.qc_min_length | |
and self.reverse_reads == other.reverse_reads | |
and self.rfam_model_clans == other.rfam_model_clans | |
and self.rfam_models == other.rfam_models | |
and self.single_reads == other.single_reads | |
and self.ssu_db == other.ssu_db | |
and self.ssu_label == other.ssu_label | |
and self.ssu_otus == other.ssu_otus | |
and self.ssu_tax == other.ssu_tax | |
) | |
return False | |
def __hash__(self) -> int: | |
return hash( | |
( | |
self._5_8s_pattern, | |
self._5s_pattern, | |
self.CGC_config, | |
self.CGC_postfixes, | |
self.EggNOG_data_dir, | |
self.EggNOG_db, | |
self.EggNOG_diamond_db, | |
self.HMM_gathering_bit_score, | |
self.HMM_name_database, | |
self.HMM_omit_alignment, | |
self.InterProScan_applications, | |
self.InterProScan_databases, | |
self.InterProScan_outputFormat, | |
self.cgc_chunk_size, | |
self.forward_reads, | |
self.func_ann_names_hmmer, | |
self.func_ann_names_ips, | |
self.go_config, | |
self.hmmsearch_header, | |
self.ips_header, | |
self.ko_file, | |
self.lsu_db, | |
self.lsu_label, | |
self.lsu_otus, | |
self.lsu_tax, | |
self.other_ncRNA_models, | |
self.protein_chunk_size_IPS, | |
self.protein_chunk_size_hmm, | |
self.qc_min_length, | |
self.reverse_reads, | |
self.rfam_model_clans, | |
self.rfam_models, | |
self.single_reads, | |
self.ssu_db, | |
self.ssu_label, | |
self.ssu_otus, | |
self.ssu_tax, | |
) | |
) | |
@classmethod | |
def fromDoc( | |
cls, | |
doc: Any, | |
baseuri: str, | |
loadingOptions: LoadingOptions, | |
docRoot: Optional[str] = None, | |
) -> "inputs_record_schema": | |
_doc = copy.copy(doc) | |
if hasattr(doc, "lc"): | |
_doc.lc.data = doc.lc.data | |
_doc.lc.filename = doc.lc.filename | |
_errors__ = [] | |
try: | |
_5_8s_pattern = load_field( | |
_doc.get("5.8s_pattern"), | |
strtype, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `5.8s_pattern` field is not valid because:", | |
SourceLine(_doc, "5.8s_pattern", str), | |
[e], | |
) | |
) | |
try: | |
_5s_pattern = load_field( | |
_doc.get("5s_pattern"), | |
strtype, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `5s_pattern` field is not valid because:", | |
SourceLine(_doc, "5s_pattern", str), | |
[e], | |
) | |
) | |
try: | |
CGC_config = load_field( | |
_doc.get("CGC_config"), | |
strtype, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `CGC_config` field is not valid because:", | |
SourceLine(_doc, "CGC_config", str), | |
[e], | |
) | |
) | |
try: | |
CGC_postfixes = load_field( | |
_doc.get("CGC_postfixes"), | |
array_of_strtype, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `CGC_postfixes` field is not valid because:", | |
SourceLine(_doc, "CGC_postfixes", str), | |
[e], | |
) | |
) | |
try: | |
EggNOG_data_dir = load_field( | |
_doc.get("EggNOG_data_dir"), | |
union_of_None_type_or_strtype, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `EggNOG_data_dir` field is not valid because:", | |
SourceLine(_doc, "EggNOG_data_dir", str), | |
[e], | |
) | |
) | |
try: | |
EggNOG_db = load_field( | |
_doc.get("EggNOG_db"), | |
union_of_None_type_or_strtype, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `EggNOG_db` field is not valid because:", | |
SourceLine(_doc, "EggNOG_db", str), | |
[e], | |
) | |
) | |
try: | |
EggNOG_diamond_db = load_field( | |
_doc.get("EggNOG_diamond_db"), | |
union_of_None_type_or_strtype, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `EggNOG_diamond_db` field is not valid because:", | |
SourceLine(_doc, "EggNOG_diamond_db", str), | |
[e], | |
) | |
) | |
try: | |
HMM_gathering_bit_score = load_field( | |
_doc.get("HMM_gathering_bit_score"), | |
booltype, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `HMM_gathering_bit_score` field is not valid because:", | |
SourceLine(_doc, "HMM_gathering_bit_score", str), | |
[e], | |
) | |
) | |
try: | |
HMM_name_database = load_field( | |
_doc.get("HMM_name_database"), | |
strtype, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `HMM_name_database` field is not valid because:", | |
SourceLine(_doc, "HMM_name_database", str), | |
[e], | |
) | |
) | |
try: | |
HMM_omit_alignment = load_field( | |
_doc.get("HMM_omit_alignment"), | |
booltype, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `HMM_omit_alignment` field is not valid because:", | |
SourceLine(_doc, "HMM_omit_alignment", str), | |
[e], | |
) | |
) | |
try: | |
InterProScan_applications = load_field( | |
_doc.get("InterProScan_applications"), | |
array_of_strtype, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `InterProScan_applications` field is not valid because:", | |
SourceLine(_doc, "InterProScan_applications", str), | |
[e], | |
) | |
) | |
try: | |
InterProScan_databases = load_field( | |
_doc.get("InterProScan_databases"), | |
strtype, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `InterProScan_databases` field is not valid because:", | |
SourceLine(_doc, "InterProScan_databases", str), | |
[e], | |
) | |
) | |
try: | |
InterProScan_outputFormat = load_field( | |
_doc.get("InterProScan_outputFormat"), | |
array_of_strtype, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `InterProScan_outputFormat` field is not valid because:", | |
SourceLine(_doc, "InterProScan_outputFormat", str), | |
[e], | |
) | |
) | |
try: | |
cgc_chunk_size = load_field( | |
_doc.get("cgc_chunk_size"), | |
inttype, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `cgc_chunk_size` field is not valid because:", | |
SourceLine(_doc, "cgc_chunk_size", str), | |
[e], | |
) | |
) | |
try: | |
forward_reads = load_field( | |
_doc.get("forward_reads"), | |
union_of_None_type_or_FileLoader, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `forward_reads` field is not valid because:", | |
SourceLine(_doc, "forward_reads", str), | |
[e], | |
) | |
) | |
try: | |
func_ann_names_hmmer = load_field( | |
_doc.get("func_ann_names_hmmer"), | |
strtype, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `func_ann_names_hmmer` field is not valid because:", | |
SourceLine(_doc, "func_ann_names_hmmer", str), | |
[e], | |
) | |
) | |
try: | |
func_ann_names_ips = load_field( | |
_doc.get("func_ann_names_ips"), | |
strtype, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `func_ann_names_ips` field is not valid because:", | |
SourceLine(_doc, "func_ann_names_ips", str), | |
[e], | |
) | |
) | |
try: | |
go_config = load_field( | |
_doc.get("go_config"), | |
strtype, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `go_config` field is not valid because:", | |
SourceLine(_doc, "go_config", str), | |
[e], | |
) | |
) | |
try: | |
hmmsearch_header = load_field( | |
_doc.get("hmmsearch_header"), | |
strtype, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `hmmsearch_header` field is not valid because:", | |
SourceLine(_doc, "hmmsearch_header", str), | |
[e], | |
) | |
) | |
try: | |
ips_header = load_field( | |
_doc.get("ips_header"), | |
strtype, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `ips_header` field is not valid because:", | |
SourceLine(_doc, "ips_header", str), | |
[e], | |
) | |
) | |
try: | |
ko_file = load_field( | |
_doc.get("ko_file"), | |
strtype, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `ko_file` field is not valid because:", | |
SourceLine(_doc, "ko_file", str), | |
[e], | |
) | |
) | |
try: | |
lsu_db = load_field( | |
_doc.get("lsu_db"), | |
FileLoader, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `lsu_db` field is not valid because:", | |
SourceLine(_doc, "lsu_db", str), | |
[e], | |
) | |
) | |
try: | |
lsu_label = load_field( | |
_doc.get("lsu_label"), | |
strtype, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `lsu_label` field is not valid because:", | |
SourceLine(_doc, "lsu_label", str), | |
[e], | |
) | |
) | |
try: | |
lsu_otus = load_field( | |
_doc.get("lsu_otus"), | |
strtype, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `lsu_otus` field is not valid because:", | |
SourceLine(_doc, "lsu_otus", str), | |
[e], | |
) | |
) | |
try: | |
lsu_tax = load_field( | |
_doc.get("lsu_tax"), | |
strtype, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `lsu_tax` field is not valid because:", | |
SourceLine(_doc, "lsu_tax", str), | |
[e], | |
) | |
) | |
try: | |
other_ncRNA_models = load_field( | |
_doc.get("other_ncRNA_models"), | |
array_of_strtype, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `other_ncRNA_models` field is not valid because:", | |
SourceLine(_doc, "other_ncRNA_models", str), | |
[e], | |
) | |
) | |
try: | |
protein_chunk_size_IPS = load_field( | |
_doc.get("protein_chunk_size_IPS"), | |
inttype, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `protein_chunk_size_IPS` field is not valid because:", | |
SourceLine(_doc, "protein_chunk_size_IPS", str), | |
[e], | |
) | |
) | |
try: | |
protein_chunk_size_hmm = load_field( | |
_doc.get("protein_chunk_size_hmm"), | |
inttype, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `protein_chunk_size_hmm` field is not valid because:", | |
SourceLine(_doc, "protein_chunk_size_hmm", str), | |
[e], | |
) | |
) | |
try: | |
qc_min_length = load_field( | |
_doc.get("qc_min_length"), | |
inttype, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `qc_min_length` field is not valid because:", | |
SourceLine(_doc, "qc_min_length", str), | |
[e], | |
) | |
) | |
try: | |
reverse_reads = load_field( | |
_doc.get("reverse_reads"), | |
union_of_None_type_or_FileLoader, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `reverse_reads` field is not valid because:", | |
SourceLine(_doc, "reverse_reads", str), | |
[e], | |
) | |
) | |
try: | |
rfam_model_clans = load_field( | |
_doc.get("rfam_model_clans"), | |
strtype, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `rfam_model_clans` field is not valid because:", | |
SourceLine(_doc, "rfam_model_clans", str), | |
[e], | |
) | |
) | |
try: | |
rfam_models = load_field( | |
_doc.get("rfam_models"), | |
array_of_strtype, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `rfam_models` field is not valid because:", | |
SourceLine(_doc, "rfam_models", str), | |
[e], | |
) | |
) | |
try: | |
single_reads = load_field( | |
_doc.get("single_reads"), | |
union_of_None_type_or_FileLoader, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `single_reads` field is not valid because:", | |
SourceLine(_doc, "single_reads", str), | |
[e], | |
) | |
) | |
try: | |
ssu_db = load_field( | |
_doc.get("ssu_db"), | |
FileLoader, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `ssu_db` field is not valid because:", | |
SourceLine(_doc, "ssu_db", str), | |
[e], | |
) | |
) | |
try: | |
ssu_label = load_field( | |
_doc.get("ssu_label"), | |
strtype, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `ssu_label` field is not valid because:", | |
SourceLine(_doc, "ssu_label", str), | |
[e], | |
) | |
) | |
try: | |
ssu_otus = load_field( | |
_doc.get("ssu_otus"), | |
strtype, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `ssu_otus` field is not valid because:", | |
SourceLine(_doc, "ssu_otus", str), | |
[e], | |
) | |
) | |
try: | |
ssu_tax = load_field( | |
_doc.get("ssu_tax"), | |
strtype, | |
baseuri, | |
loadingOptions, | |
) | |
except ValidationException as e: | |
_errors__.append( | |
ValidationException( | |
"the `ssu_tax` field is not valid because:", | |
SourceLine(_doc, "ssu_tax", str), | |
[e], | |
) | |
) | |
extension_fields: Dict[str, Any] = {} | |
for k in _doc.keys(): | |
if k not in cls.attrs: | |
if ":" in k: | |
ex = expand_url( | |
k, "", loadingOptions, scoped_id=False, vocab_term=False | |
) | |
extension_fields[ex] = _doc[k] | |
else: | |
_errors__.append( | |
ValidationException( | |
"invalid field `{}`, expected one of: `5.8s_pattern`, `5s_pattern`, `CGC_config`, `CGC_postfixes`, `EggNOG_data_dir`, `EggNOG_db`, `EggNOG_diamond_db`, `HMM_gathering_bit_score`, `HMM_name_database`, `HMM_omit_alignment`, `InterProScan_applications`, `InterProScan_databases`, `InterProScan_outputFormat`, `cgc_chunk_size`, `forward_reads`, `func_ann_names_hmmer`, `func_ann_names_ips`, `go_config`, `hmmsearch_header`, `ips_header`, `ko_file`, `lsu_db`, `lsu_label`, `lsu_otus`, `lsu_tax`, `other_ncRNA_models`, `protein_chunk_size_IPS`, `protein_chunk_size_hmm`, `qc_min_length`, `reverse_reads`, `rfam_model_clans`, `rfam_models`, `single_reads`, `ssu_db`, `ssu_label`, `ssu_otus`, `ssu_tax`".format( | |
k | |
), | |
SourceLine(_doc, k, str), | |
) | |
) | |
break | |
if _errors__: | |
raise ValidationException("Trying 'inputs_record_schema'", None, _errors__) | |
_constructed = cls( | |
_5_8s_pattern=_5_8s_pattern, | |
_5s_pattern=_5s_pattern, | |
CGC_config=CGC_config, | |
CGC_postfixes=CGC_postfixes, | |
EggNOG_data_dir=EggNOG_data_dir, | |
EggNOG_db=EggNOG_db, | |
EggNOG_diamond_db=EggNOG_diamond_db, | |
HMM_gathering_bit_score=HMM_gathering_bit_score, | |
HMM_name_database=HMM_name_database, | |
HMM_omit_alignment=HMM_omit_alignment, | |
InterProScan_applications=InterProScan_applications, | |
InterProScan_databases=InterProScan_databases, | |
InterProScan_outputFormat=InterProScan_outputFormat, | |
cgc_chunk_size=cgc_chunk_size, | |
forward_reads=forward_reads, | |
func_ann_names_hmmer=func_ann_names_hmmer, | |
func_ann_names_ips=func_ann_names_ips, | |
go_config=go_config, | |
hmmsearch_header=hmmsearch_header, | |
ips_header=ips_header, | |
ko_file=ko_file, | |
lsu_db=lsu_db, | |
lsu_label=lsu_label, | |
lsu_otus=lsu_otus, | |
lsu_tax=lsu_tax, | |
other_ncRNA_models=other_ncRNA_models, | |
protein_chunk_size_IPS=protein_chunk_size_IPS, | |
protein_chunk_size_hmm=protein_chunk_size_hmm, | |
qc_min_length=qc_min_length, | |
reverse_reads=reverse_reads, | |
rfam_model_clans=rfam_model_clans, | |
rfam_models=rfam_models, | |
single_reads=single_reads, | |
ssu_db=ssu_db, | |
ssu_label=ssu_label, | |
ssu_otus=ssu_otus, | |
ssu_tax=ssu_tax, | |
extension_fields=extension_fields, | |
loadingOptions=loadingOptions, | |
) | |
return _constructed | |
def save( | |
self, top: bool = False, base_url: str = "", relative_uris: bool = True | |
) -> Dict[str, Any]: | |
r: Dict[str, Any] = {} | |
if relative_uris: | |
for ef in self.extension_fields: | |
r[prefix_url(ef, self.loadingOptions.vocab)] = self.extension_fields[ef] | |
else: | |
for ef in self.extension_fields: | |
r[ef] = self.extension_fields[ef] | |
if self._5_8s_pattern is not None: | |
r["5.8s_pattern"] = save( | |
self._5_8s_pattern, | |
top=False, | |
base_url=base_url, | |
relative_uris=relative_uris, | |
) | |
if self._5s_pattern is not None: | |
r["5s_pattern"] = save( | |
self._5s_pattern, | |
top=False, | |
base_url=base_url, | |
relative_uris=relative_uris, | |
) | |
if self.CGC_config is not None: | |
r["CGC_config"] = save( | |
self.CGC_config, | |
top=False, | |
base_url=base_url, | |
relative_uris=relative_uris, | |
) | |
if self.CGC_postfixes is not None: | |
r["CGC_postfixes"] = save( | |
self.CGC_postfixes, | |
top=False, | |
base_url=base_url, | |
relative_uris=relative_uris, | |
) | |
if self.EggNOG_data_dir is not None: | |
r["EggNOG_data_dir"] = save( | |
self.EggNOG_data_dir, | |
top=False, | |
base_url=base_url, | |
relative_uris=relative_uris, | |
) | |
if self.EggNOG_db is not None: | |
r["EggNOG_db"] = save( | |
self.EggNOG_db, | |
top=False, | |
base_url=base_url, | |
relative_uris=relative_uris, | |
) | |
if self.EggNOG_diamond_db is not None: | |
r["EggNOG_diamond_db"] = save( | |
self.EggNOG_diamond_db, | |
top=False, | |
base_url=base_url, | |
relative_uris=relative_uris, | |
) | |
if self.HMM_gathering_bit_score is not None: | |
r["HMM_gathering_bit_score"] = save( | |
self.HMM_gathering_bit_score, | |
top=False, | |
base_url=base_url, | |
relative_uris=relative_uris, | |
) | |
if self.HMM_name_database is not None: | |
r["HMM_name_database"] = save( | |
self.HMM_name_database, | |
top=False, | |
base_url=base_url, | |
relative_uris=relative_uris, | |
) | |
if self.HMM_omit_alignment is not None: | |
r["HMM_omit_alignment"] = save( | |
self.HMM_omit_alignment, | |
top=False, | |
base_url=base_url, | |
relative_uris=relative_uris, | |
) | |
if self.InterProScan_applications is not None: | |
r["InterProScan_applications"] = save( | |
self.InterProScan_applications, | |
top=False, | |
base_url=base_url, | |
relative_uris=relative_uris, | |
) | |
if self.InterProScan_databases is not None: | |
r["InterProScan_databases"] = save( | |
self.InterProScan_databases, | |
top=False, | |
base_url=base_url, | |
relative_uris=relative_uris, | |
) | |
if self.InterProScan_outputFormat is not None: | |
r["InterProScan_outputFormat"] = save( | |
self.InterProScan_outputFormat, | |
top=False, | |
base_url=base_url, | |
relative_uris=relative_uris, | |
) | |
if self.cgc_chunk_size is not None: | |
r["cgc_chunk_size"] = save( | |
self.cgc_chunk_size, | |
top=False, | |
base_url=base_url, | |
relative_uris=relative_uris, | |
) | |
if self.forward_reads is not None: | |
r["forward_reads"] = save( | |
self.forward_reads, | |
top=False, | |
base_url=base_url, | |
relative_uris=relative_uris, | |
) | |
if self.func_ann_names_hmmer is not None: | |
r["func_ann_names_hmmer"] = save( | |
self.func_ann_names_hmmer, | |
top=False, | |
base_url=base_url, | |
relative_uris=relative_uris, | |
) | |
if self.func_ann_names_ips is not None: | |
r["func_ann_names_ips"] = save( | |
self.func_ann_names_ips, | |
top=False, | |
base_url=base_url, | |
relative_uris=relative_uris, | |
) | |
if self.go_config is not None: | |
r["go_config"] = save( | |
self.go_config, | |
top=False, | |
base_url=base_url, | |
relative_uris=relative_uris, | |
) | |
if self.hmmsearch_header is not None: | |
r["hmmsearch_header"] = save( | |
self.hmmsearch_header, | |
top=False, | |
base_url=base_url, | |
relative_uris=relative_uris, | |
) | |
if self.ips_header is not None: | |
r["ips_header"] = save( | |
self.ips_header, | |
top=False, | |
base_url=base_url, | |
relative_uris=relative_uris, | |
) | |
if self.ko_file is not None: | |
r["ko_file"] = save( | |
self.ko_file, top=False, base_url=base_url, relative_uris=relative_uris | |
) | |
if self.lsu_db is not None: | |
r["lsu_db"] = save( | |
self.lsu_db, top=False, base_url=base_url, relative_uris=relative_uris | |
) | |
if self.lsu_label is not None: | |
r["lsu_label"] = save( | |
self.lsu_label, | |
top=False, | |
base_url=base_url, | |
relative_uris=relative_uris, | |
) | |
if self.lsu_otus is not None: | |
r["lsu_otus"] = save( | |
self.lsu_otus, top=False, base_url=base_url, relative_uris=relative_uris | |
) | |
if self.lsu_tax is not None: | |
r["lsu_tax"] = save( | |
self.lsu_tax, top=False, base_url=base_url, relative_uris=relative_uris | |
) | |
if self.other_ncRNA_models is not None: | |
r["other_ncRNA_models"] = save( | |
self.other_ncRNA_models, | |
top=False, | |
base_url=base_url, | |
relative_uris=relative_uris, | |
) | |
if self.protein_chunk_size_IPS is not None: | |
r["protein_chunk_size_IPS"] = save( | |
self.protein_chunk_size_IPS, | |
top=False, | |
base_url=base_url, | |
relative_uris=relative_uris, | |
) | |
if self.protein_chunk_size_hmm is not None: | |
r["protein_chunk_size_hmm"] = save( | |
self.protein_chunk_size_hmm, | |
top=False, | |
base_url=base_url, | |
relative_uris=relative_uris, | |
) | |
if self.qc_min_length is not None: | |
r["qc_min_length"] = save( | |
self.qc_min_length, | |
top=False, | |
base_url=base_url, | |
relative_uris=relative_uris, | |
) | |
if self.reverse_reads is not None: | |
r["reverse_reads"] = save( | |
self.reverse_reads, | |
top=False, | |
base_url=base_url, | |
relative_uris=relative_uris, | |
) | |
if self.rfam_model_clans is not None: | |
r["rfam_model_clans"] = save( | |
self.rfam_model_clans, | |
top=False, | |
base_url=base_url, | |
relative_uris=relative_uris, | |
) | |
if self.rfam_models is not None: | |
r["rfam_models"] = save( | |
self.rfam_models, | |
top=False, | |
base_url=base_url, | |
relative_uris=relative_uris, | |
) | |
if self.single_reads is not None: | |
r["single_reads"] = save( | |
self.single_reads, | |
top=False, | |
base_url=base_url, | |
relative_uris=relative_uris, | |
) | |
if self.ssu_db is not None: | |
r["ssu_db"] = save( | |
self.ssu_db, top=False, base_url=base_url, relative_uris=relative_uris | |
) | |
if self.ssu_label is not None: | |
r["ssu_label"] = save( | |
self.ssu_label, | |
top=False, | |
base_url=base_url, | |
relative_uris=relative_uris, | |
) | |
if self.ssu_otus is not None: | |
r["ssu_otus"] = save( | |
self.ssu_otus, top=False, base_url=base_url, relative_uris=relative_uris | |
) | |
if self.ssu_tax is not None: | |
r["ssu_tax"] = save( | |
self.ssu_tax, top=False, base_url=base_url, relative_uris=relative_uris | |
) | |
# top refers to the directory level | |
if top: | |
if self.loadingOptions.namespaces: | |
r["$namespaces"] = self.loadingOptions.namespaces | |
if self.loadingOptions.schemas: | |
r["$schemas"] = self.loadingOptions.schemas | |
return r | |
attrs = frozenset( | |
[ | |
"5.8s_pattern", | |
"5s_pattern", | |
"CGC_config", | |
"CGC_postfixes", | |
"EggNOG_data_dir", | |
"EggNOG_db", | |
"EggNOG_diamond_db", | |
"HMM_gathering_bit_score", | |
"HMM_name_database", | |
"HMM_omit_alignment", | |
"InterProScan_applications", | |
"InterProScan_databases", | |
"InterProScan_outputFormat", | |
"cgc_chunk_size", | |
"forward_reads", | |
"func_ann_names_hmmer", | |
"func_ann_names_ips", | |
"go_config", | |
"hmmsearch_header", | |
"ips_header", | |
"ko_file", | |
"lsu_db", | |
"lsu_label", | |
"lsu_otus", | |
"lsu_tax", | |
"other_ncRNA_models", | |
"protein_chunk_size_IPS", | |
"protein_chunk_size_hmm", | |
"qc_min_length", | |
"reverse_reads", | |
"rfam_model_clans", | |
"rfam_models", | |
"single_reads", | |
"ssu_db", | |
"ssu_label", | |
"ssu_otus", | |
"ssu_tax", | |
] | |
) | |
_vocab = { | |
"Directory": "https://w3id.org/cwl/cwl#Directory", | |
"File": "https://w3id.org/cwl/cwl#File", | |
"inputs_record_schema": "https://w3id.org/cwl/cwl#inputs_record_schema", | |
} | |
_rvocab = { | |
"https://w3id.org/cwl/cwl#Directory": "Directory", | |
"https://w3id.org/cwl/cwl#File": "File", | |
"https://w3id.org/cwl/cwl#inputs_record_schema": "inputs_record_schema", | |
} | |
strtype = _PrimitiveLoader(str) | |
inttype = _PrimitiveLoader(int) | |
floattype = _PrimitiveLoader(float) | |
booltype = _PrimitiveLoader(bool) | |
None_type = _PrimitiveLoader(type(None)) | |
Any_type = _AnyLoader() | |
FileLoader = _RecordLoader(File) | |
DirectoryLoader = _RecordLoader(Directory) | |
inputs_record_schemaLoader = _RecordLoader(inputs_record_schema) | |
File_classLoader = _EnumLoader(("File",), "File_class") | |
uri_File_classLoader_False_True_None = _URILoader(File_classLoader, False, True, None) | |
union_of_None_type_or_strtype = _UnionLoader( | |
( | |
None_type, | |
strtype, | |
) | |
) | |
uri_union_of_None_type_or_strtype_False_True_None = _URILoader( | |
union_of_None_type_or_strtype, False, True, None | |
) | |
uri_union_of_None_type_or_strtype_False_False_None = _URILoader( | |
union_of_None_type_or_strtype, False, False, None | |
) | |
union_of_None_type_or_inttype = _UnionLoader( | |
( | |
None_type, | |
inttype, | |
) | |
) | |
union_of_FileLoader_or_DirectoryLoader = _UnionLoader( | |
( | |
FileLoader, | |
DirectoryLoader, | |
) | |
) | |
array_of_union_of_FileLoader_or_DirectoryLoader = _ArrayLoader( | |
union_of_FileLoader_or_DirectoryLoader | |
) | |
union_of_None_type_or_array_of_union_of_FileLoader_or_DirectoryLoader = _UnionLoader( | |
( | |
None_type, | |
array_of_union_of_FileLoader_or_DirectoryLoader, | |
) | |
) | |
secondaryfilesdsl_union_of_None_type_or_array_of_union_of_FileLoader_or_DirectoryLoader = _SecondaryDSLLoader( | |
union_of_None_type_or_array_of_union_of_FileLoader_or_DirectoryLoader | |
) | |
uri_union_of_None_type_or_strtype_True_False_None = _URILoader( | |
union_of_None_type_or_strtype, True, False, None | |
) | |
Directory_classLoader = _EnumLoader(("Directory",), "Directory_class") | |
uri_Directory_classLoader_False_True_None = _URILoader( | |
Directory_classLoader, False, True, None | |
) | |
array_of_strtype = _ArrayLoader(strtype) | |
union_of_None_type_or_FileLoader = _UnionLoader( | |
( | |
None_type, | |
FileLoader, | |
) | |
) | |
union_of_inputs_record_schemaLoader = _UnionLoader((inputs_record_schemaLoader,)) | |
array_of_union_of_inputs_record_schemaLoader = _ArrayLoader( | |
union_of_inputs_record_schemaLoader | |
) | |
union_of_inputs_record_schemaLoader_or_array_of_union_of_inputs_record_schemaLoader = ( | |
_UnionLoader( | |
( | |
inputs_record_schemaLoader, | |
array_of_union_of_inputs_record_schemaLoader, | |
) | |
) | |
) | |
def load_document( | |
doc: Any, | |
baseuri: Optional[str] = None, | |
loadingOptions: Optional[LoadingOptions] = None, | |
) -> Any: | |
if baseuri is None: | |
baseuri = file_uri(os.getcwd()) + "/" | |
if loadingOptions is None: | |
loadingOptions = LoadingOptions() | |
result, metadata = _document_load( | |
union_of_inputs_record_schemaLoader_or_array_of_union_of_inputs_record_schemaLoader, | |
doc, | |
baseuri, | |
loadingOptions, | |
) | |
return result | |
def load_document_with_metadata( | |
doc: Any, | |
baseuri: Optional[str] = None, | |
loadingOptions: Optional[LoadingOptions] = None, | |
addl_metadata_fields: Optional[MutableSequence[str]] = None, | |
) -> Any: | |
if baseuri is None: | |
baseuri = file_uri(os.getcwd()) + "/" | |
if loadingOptions is None: | |
loadingOptions = LoadingOptions(fileuri=baseuri) | |
return _document_load( | |
union_of_inputs_record_schemaLoader_or_array_of_union_of_inputs_record_schemaLoader, | |
doc, | |
baseuri, | |
loadingOptions, | |
addl_metadata_fields=addl_metadata_fields, | |
) | |
def load_document_by_string( | |
string: Any, | |
uri: str, | |
loadingOptions: Optional[LoadingOptions] = None, | |
) -> Any: | |
yaml = yaml_no_ts() | |
result = yaml.load(string) | |
add_lc_filename(result, uri) | |
if loadingOptions is None: | |
loadingOptions = LoadingOptions(fileuri=uri) | |
result, metadata = _document_load( | |
union_of_inputs_record_schemaLoader_or_array_of_union_of_inputs_record_schemaLoader, | |
result, | |
uri, | |
loadingOptions, | |
) | |
return result | |
def load_document_by_yaml( | |
yaml: Any, | |
uri: str, | |
loadingOptions: Optional[LoadingOptions] = None, | |
) -> Any: | |
""" | |
Shortcut to load via a YAML object. | |
yaml: must be from ruamel.yaml.main.YAML.load with preserve_quotes=True | |
""" | |
add_lc_filename(yaml, uri) | |
if loadingOptions is None: | |
loadingOptions = LoadingOptions(fileuri=uri) | |
result, metadata = _document_load( | |
union_of_inputs_record_schemaLoader_or_array_of_union_of_inputs_record_schemaLoader, | |
yaml, | |
uri, | |
loadingOptions, | |
) | |
return result |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
saladVersion: v1.1 | |
$base: https://w3id.org/cwl/cwl# | |
$namespaces: | |
cwl: https://w3id.org/cwl/cwl# | |
$graph: | |
- name: File | |
type: record | |
fields: | |
- name: class | |
type: | |
type: enum | |
name: File_class | |
symbols: | |
- cwl:File | |
jsonldPredicate: | |
_id: '@type' | |
_type: '@vocab' | |
- name: location | |
type: string? | |
jsonldPredicate: | |
_id: '@type' | |
_type: '@vocab' | |
- name: path | |
type: string? | |
jsonldPredicate: | |
_id: cwl:path | |
_type: '@id' | |
- name: basename | |
type: string? | |
jsonldPredicate: | |
_id: cwl:basename | |
_type: '@id' | |
- name: dirname | |
type: string? | |
- name: nameroot | |
type: string? | |
- name: nameext | |
type: string? | |
- name: checksum | |
type: string? | |
- name: size | |
type: long? | |
- name: secondaryFiles | |
type: | |
- 'null' | |
- type: array | |
items: | |
- File | |
- Directory | |
jsonldPredicate: | |
_id: cwl:secondaryFiles | |
secondaryFilesDSL: true | |
- name: format | |
type: string? | |
jsonldPredicate: | |
_id: cwl:format | |
_type: '@id' | |
identity: true | |
- name: contents | |
type: string? | |
- name: Directory | |
type: record | |
fields: | |
- name: class | |
type: | |
type: enum | |
name: Directory_class | |
symbols: | |
- cwl:Directory | |
jsonldPredicate: | |
_id: '@type' | |
_type: '@vocab' | |
- name: location | |
type: string? | |
jsonldPredicate: | |
_id: '@type' | |
_type: '@vocab' | |
- name: path | |
type: string? | |
jsonldPredicate: | |
_id: cwl:path | |
_type: '@id' | |
- name: basename | |
type: string? | |
jsonldPredicate: | |
_id: cwl:basename | |
_type: '@id' | |
- name: listing | |
type: | |
- 'null' | |
- type: array | |
items: | |
- File | |
- Directory | |
jsonldPredicate: | |
_id: cwl:listing | |
- name: inputs_record_schema | |
type: record | |
fields: | |
- name: 5.8s_pattern | |
type: string | |
- name: 5s_pattern | |
type: string | |
- name: CGC_config | |
type: string | |
- name: CGC_postfixes | |
type: | |
type: array | |
items: string | |
- name: EggNOG_data_dir | |
type: | |
- 'null' | |
- string | |
- name: EggNOG_db | |
type: | |
- 'null' | |
- string | |
- name: EggNOG_diamond_db | |
type: | |
- 'null' | |
- string | |
- name: HMM_gathering_bit_score | |
type: boolean | |
- name: HMM_name_database | |
type: string | |
- name: HMM_omit_alignment | |
type: boolean | |
- name: InterProScan_applications | |
type: | |
type: array | |
items: string | |
- name: InterProScan_databases | |
type: string | |
- name: InterProScan_outputFormat | |
type: | |
type: array | |
items: string | |
- name: cgc_chunk_size | |
type: int | |
- name: forward_reads | |
type: | |
- 'null' | |
- cwl:File | |
- name: func_ann_names_hmmer | |
type: string | |
- name: func_ann_names_ips | |
type: string | |
- name: go_config | |
type: string | |
- name: hmmsearch_header | |
type: string | |
- name: ips_header | |
type: string | |
- name: ko_file | |
type: string | |
- name: lsu_db | |
type: cwl:File | |
- name: lsu_label | |
type: string | |
- name: lsu_otus | |
type: string | |
- name: lsu_tax | |
type: string | |
- name: other_ncRNA_models | |
type: | |
type: array | |
items: string | |
- name: protein_chunk_size_IPS | |
type: int | |
- name: protein_chunk_size_hmm | |
type: int | |
- name: qc_min_length | |
type: int | |
- name: reverse_reads | |
type: | |
- 'null' | |
- cwl:File | |
- name: rfam_model_clans | |
type: string | |
- name: rfam_models | |
type: | |
type: array | |
items: string | |
- name: single_reads | |
type: | |
- 'null' | |
- cwl:File | |
- name: ssu_db | |
type: cwl:File | |
- name: ssu_label | |
type: string | |
- name: ssu_otus | |
type: string | |
- name: ssu_tax | |
type: string | |
documentRoot: true |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment