Skip to content

Instantly share code, notes, and snippets.

@mpkocher
Last active July 9, 2019 17:20
Show Gist options
  • Save mpkocher/cc05927ab855d8275b34402877eaf0a1 to your computer and use it in GitHub Desktop.
Save mpkocher/cc05927ab855d8275b34402877eaf0a1 to your computer and use it in GitHub Desktop.
Elastic Search Importing Tools for SL and PA

Unnamed Project (WIP)

Goals

  1. Centralized "health" monitoring across multiple systems for PA and SA

    • Primary Analysis for all pac-alphaX systems
      • monitoring acquisitions success/failure
      • monitoring of transfer jobs success/failure, runtime
    • Secondary Analysis for all SL systems (e.g, smrtlink-beta, smrtlink-release)
      • monitor smrtlink analysis jobs failure, success, runtime
      • monitor smrtlink analysis tasks jobs, failure
      • central place for all report metrics from all analysis jobs
      • monitor smrtlink import-dataset and merge-dataset jobs for successful, failure
      • cluster (SGE) monitor of job state
  2. Centralized datastore across multiple systems

    • a. PA/SA LIMS to SubreadSet lookup
    • b. LIMS to Analysis jobs (i.e., list of all analysis jobs that use that subreadset)
      • import-dataset metrics, such as loading, filtering as used in RunQC
      • pbsmrtpipe/analysis core metric results (e.g, number of mapped reads, mean accuracy)
  3. Public User interface for common functionality leveraging searchkit. Very Minimal UI

    • a. Runcode -> SubreadSet lookup by runcode, experiment-id, project (leveraging 2a)
    • b. Runcode -> Analysis Jobs and Metrics (leveraging 2a + 2b)
    • c. For other data slicing/query, use kibana directly

Design Overview

  • fundamental data sources of are defined as PA/SA webservices.
  • "core" data stored in ES is pulled from PA/SA services and stored as documents (i.e., indexes, tables in SQL) in ES. These documents are specifically de-normalized structure that allows efficient queries. Join keys, such as the job_id, or acq_id can be used to query data in other tables.
  • "derived" document types are built from "core" documents and other sources. This docs are even more app-specific structure to enable specific applications.

ETL (Extract, Transform, Load) Tools

Tools to import Core Indices/Tables

"Core" tools that scrape the webservices, de-normalized the data to add necessary context (if necessary) and import into ES. All tools have a mode that enables them to be executed periodically via --periodic-time commandline option. They are currently running using this mode in login14-bifx01

  • import-alpha-acqs.py Scrapes PAWS a single pac-alpha{X} (or all systems) for all acquisitions.
  • import-alpha-transfers.py Scrapes DataTransfer Services on single pac-alpha{X} (or all systems) for all data transfer jobs.
  • import-smrtlink-analysis-jobs.py Scrapes SMRT Link/Analysis Services for pbsmrtpipe jobs. Also imports all task details and report metrics from job output. This creates several indexes/tables and requires filesystem access (for task details)
  • import-pbi-collections.py Scrapes /pbi/collections for all lims.yml files that have Runcode -> SubreadSet UUID metadata. De-normalizes some of the SubreadSet to enable common queries.
  • import-sge-jobs.py Snapshots of SGE jobs for monitoring purposes

Tools to import Derived Indices/Tables

TODO (This is an attempt to potentially replace Allen's siv1/Sequel page)

    1. build-siv.py (need better name) Creates an index/table of SubreadSet UUID (or LIMS Runcode) -> List of Analysis jobs and metrics.
    • TODO Add analyzing Run QC metrics from import-dataset jobs
    1. replace MJ's nibbler? (or does #1 do this?)

Document Versioning Model

Each "core" importing tool creates one or more indices (i.e., table) and document types within ES.

The versioning model is: - doc_type -> {id}{version} where name is the id of the index and version is v{X} where X is an int. Example smrtlink_analysis_job_v2 - the companion table/index in ES is {id}s{version}. Example smrtlink_analysis_jobs_v2

When the schema changes that doc version and file index should be incremented. Concurrent the new version of the doc, the old (n - 1) version of the doc should also be imported to not break derived collections and UI apps using these collections. This enables some delta of time where the components can be upgraded to use a newer doc model.

TODO. The naming convention isn't consistent in the current dev server

Example: Getting LIMS SubreadSet from ES

Get List of LimsSubreadSet records

http://login14-biofx01:9200/lims_subreadsets_v2/_search?size=1000&from=0

Get Lims SubreadSet by SubreadSet UUID (primary key)

http://login14-biofx01:9200/lims_subreadsets_v2/lims_subreadset_v2/160ee152-e37d-415e-abbe-9980fd575c68

The general form is:

{index}/{doc-type}/{primary-id}

Internal SMRT Link Servers

Hostname (all on Port 8081)

  • smrtlink-alpha

  • smrtlink-alpha-nightly

  • smrtlink-beta (most heavily used)

  • smrtlink-beta-incremental (useful for testing recent build)

  • smrtlink-beta-nightly (useful for testing)

  • smrtlink-bihourly (useful for testing)

  • smrtlink-release (?)

  • smrtlink-siv

  • smrtlink-siv-alpha (?)

Dev Server

  • up on login14-bifx01, ES on port 9200, kibana on 8099

  • Use ssh tunnel if VPN isn't working

    • ssh -L 8099:localhost:8099 mkocher@login14-biofx01
    • Open browser locally to localhost:8099
  • Using ES 2.3.2 pulled from here Extract and run /bin/elasticsearch (dont run this the cluster without setting the config to single node)

  • Using Kibana here by OS Extract and run bin/kibana

Somewhat noteworth changes

  • changed ES config to use single node
  • changed kibana default port to 8099
"""This isn't a proper package, so this will need to be in the same dir as
where the exe's are called"""
import os
import time
import logging
from collections import namedtuple
import uuid
import datetime
from elasticsearch import Elasticsearch
log = logging.getLogger(__name__)
# When importer processes a "batch" of docs to import jobs, a simple summary
# should be added back
ImportSummaryMessage = namedtuple("ImportSummaryMessage", "importer_id message")
_IMPORT_SUMMARY_MSG_DOC_TYPE = "import_summary_msg_v1"
_IMPORT_SUMMARY_MSG_INDEX = "import-summary_msgs_v1"
def modification_date(filename):
t = os.path.getmtime(filename)
return datetime.datetime.fromtimestamp(t)
def elastic_system(host="localhost", port=9200):
cx = dict(host=host, port=port)
es = Elasticsearch([cx])
return es
def importer(index, doc_type):
"""
:type es: Elasticsearch
"""
def f(es, id_, doc):
es.index(index=index,
doc_type=doc_type,
id=id_, body=doc)
_d = dict(i=index, t=doc_type, d=id_)
log.info("Inserting doc type {t} index:{i} id:{d}".format(**_d))
return doc
return f
def add_import_summary_message(es, import_summary_message):
"""
Use this when importing a "batch" of jobs
:type es: Elasticsearch
:type import_summary_message: ImportSummaryMessage
:return:
"""
import_msg = importer(_IMPORT_SUMMARY_MSG_INDEX, _IMPORT_SUMMARY_MSG_DOC_TYPE)
doc = dict(import_summary_message.__dict__)
doc['created_at'] = datetime.datetime.now().isoformat()
import_msg(es, uuid.uuid4(), doc)
return doc
def add_args_es_common(p):
f = p.add_argument
f('--es-host', default="localhost", help="Elastic Search host")
f('--es-port', default=9200, type=int, help="Elastic Search port")
return p
def add_arg_periodic(p):
f = p.add_argument
f('-p', '--periodic-time', default=None, type=int, help="Enable rerunning every X sec.")
return p
def runner_periodic(*args, **kwargs):
periodic_time_sec = kwargs.pop("periodic_time_sec", None)
# The main func is assumed to be the first positional arg
runner_func = args[0]
func_args = args[1:]
exit_code = None
if periodic_time_sec is None:
exit_code = runner_func(*func_args, **kwargs)
else:
try:
while True:
log.info("Running {f}".format(f=runner_func.__name__))
_ = runner_func(*func_args, **kwargs)
time.sleep(periodic_time_sec)
except KeyboardInterrupt:
log.info("Shutting down")
exit_code = 0
return exit_code
#!/usr/bin/env python
import logging
import pprint
import sys
import os
import datetime
import time
from pbcommand.common_options import add_base_options
from pbcommand.services.service_access_layer import ServiceAccessLayer
# pbcommand hasn't been updated to return some fields from the raw response
# using raw requests calls to fill in the gap (This applies for the auth as well)
from pbcommand.services.service_access_layer import rqget
from pbcommand.cli import get_default_argparser, pacbio_args_runner
from pbcommand.utils import setup_log
from elasticsearch import Elasticsearch
from etl_utils import *
__version__ = "0.1.2"
__doc__ = """Import Acquisitions from pac-alpha Boxes into Elastic Search"""
log = logging.getLogger(__name__)
DEV_LOG_OPTS = dict(level=logging.DEBUG, file=sys.stdout)
logging.basicConfig(**DEV_LOG_OPTS)
def _to_s(x):
return "{x}s".format(x=x)
class Constants(object):
IMPORTER_ID = "import_alpha_acqs"
# All the Pac alpha systems
PACS = ["pac-alpha{x}".format(x=x) for x in xrange(1, 13)] + ["pa-poca"]
PAWS_DEFAULT_PORT = 8091
# singular for the doc type type, plural for the index (i.e., table)
ACQ_DOC_TYPE = "pa_acq_v0"
ACQ_INDEX = "pa_acqs_v0"
def to_url(host, port, rest):
_d = dict(h=host, p=port, r=rest)
url = "http://{h}:{p}/{r}".format(**_d)
return url
def to_status_url(host, port):
return to_url(host, port, "status")
def to_acq_url(host, port):
def f(acq_id):
return to_url(host, port, "acquisitions/{a}".format(a=acq_id))
return f
def to_acqs_url(host, port):
return to_url(host, port, "acquisitions")
def get_acq_by_id(host, port, acq_id):
r = rqget(to_acq_url(host, port)(acq_id))
return r.json()
def get_acqs(host, port):
r = rqget(to_acqs_url(host, port))
return r.json()
insert_acq_doc = importer(Constants.ACQ_INDEX, Constants.ACQ_DOC_TYPE)
def modification_date(filename):
t = os.path.getmtime(filename)
return datetime.datetime.fromtimestamp(t)
def convert_acq_to_doc(d, paws_version):
# Remove subreadsetXml and statsXml. These are giant chunks of XML
# es doesn't like the rtmetricTimeStamp
keys_to_remove = ("subreadsetXml", "statsXml", "events", "rtmetricTimeStamp")
d['paws_version'] = paws_version
for key in keys_to_remove:
d.pop(key, None)
return d
def analyze_acqs_on_pac(host, port):
sal = ServiceAccessLayer(host, port)
status = sal.get_status()
log.debug(pprint.pformat(status))
log.info("analyzing system {b}".format(b=sal.base_url))
acqd_s = get_acqs(host, port)
log.debug("Found {n} acqs on {b}".format(n=len(acqd_s), b=sal.base_url))
return acqd_s
def analyze_acq_on_pac(es, host, port, acq_id, paws_version):
acq_d = get_acq_by_id(host, port, acq_id)
acq_doc = convert_acq_to_doc(acq_d, paws_version)
_ = insert_acq_doc(es, acq_id, acq_doc)
log.debug(pprint.pformat(acq_doc))
return acq_doc
def get_paws_version(sal):
status = sal.get_status()
return status['version']
def run_main(systems, es_host, es_port):
es = elastic_system(es_host, es_port)
started_at = time.time()
nsuccess = 0
nfailed = 0
ntotal = 0
def summary_msg(systems_):
run_time_ = time.time() - started_at
d_ = dict(s=nsuccess, f=nfailed, t=ntotal, r=run_time_, x=systems_)
return "Completed inserting successful {s} of {t} failed {f} of {t} in {r:.2f} sec on systems {x}".format(**d_)
for pac_host, pac_port in systems:
try:
paws_version = get_paws_version(ServiceAccessLayer(pac_host, pac_port))
acqs_d = analyze_acqs_on_pac(pac_host, pac_port)
try:
for acq_d in acqs_d:
ntotal += 1
acq_id = acq_d['acqId']
_ = analyze_acq_on_pac(es, pac_host, pac_port, acq_id, paws_version)
nsuccess += 1
add_import_summary_message(elastic_system(es_host, es_port), ImportSummaryMessage(Constants.IMPORTER_ID, summary_msg(["{h}:{p}".format(h=pac_host, p=pac_port)])))
except Exception as e:
nfailed += 1
log.exception("Failed to get Acqs from {h}:{p}".format(h=pac_host, p=pac_port))
except Exception as e:
log.exception("Failed to get Acqs from {h}:{p}".format(h=pac_host, p=pac_port))
log.info(summary_msg(systems))
return 0
def get_parser():
p = get_default_argparser(__version__, __doc__)
f = p.add_argument
f('-s', '--alpha-system', default=None, help="Specific ALPHA system to import. Defaults to ALL")
f('--alpha-port', default=Constants.PAWS_DEFAULT_PORT, help="Default PAWS Port")
add_args_es_common(p)
add_arg_periodic(p)
add_base_options(p, default_level=logging.INFO)
return p
def args_runner(args):
log.debug("Raw args {a}".format(a=args))
system_names = (args.alpha_system, ) if args.alpha_system is not None else Constants.PACS
systems = [(x, args.alpha_port) for x in system_names]
return runner_periodic(run_main, systems, args.es_host, args.es_port, periodic_time_sec=args.periodic_time)
def main(argv):
return pacbio_args_runner(argv[1:],
get_parser(),
args_runner,
log,
setup_log_func=setup_log)
if __name__ == '__main__':
sys.exit(main(sys.argv))
#!/usr/bin/env python
import logging
import pprint
import sys
import os
import datetime
import time
from pbcommand.common_options import add_base_options
from pbcommand.services.service_access_layer import ServiceAccessLayer
# pbcommand hasn't been updated to return some fields from the raw response
# using raw requests calls to fill in the gap (This applies for the auth as well)
from pbcommand.services.service_access_layer import rqget
from pbcommand.cli import get_default_argparser, pacbio_args_runner
from pbcommand.utils import setup_log
from elasticsearch import Elasticsearch
from etl_utils import *
__version__ = "0.2.1"
__doc__ = """Import Transfer Jobs from pac-alpha Boxes into Elastic Search"""
# there's quite a bit of copy and paste from import-alpha-acqs
log = logging.getLogger(__name__)
DEV_LOG_OPTS = dict(level=logging.DEBUG, file=sys.stdout)
logging.basicConfig(**DEV_LOG_OPTS)
def _to_s(x):
return "{x}s".format(x=x)
class Constants(object):
IMPORTER_ID = "import_pac_alpha_transfers"
# All the Pac alpha systems
PACS = ["pac-alpha{x}".format(x=x) for x in xrange(1, 13)] + ["pa-poca"]
TRANSFER_SERVICE_DEFAULT_PORT = 8090
# singular for the doc type type, plural for the index (i.e., table)
TRANSFER_JOB_DOC_TYPE = "pa_transfer_jobs_v0"
TRANSFER_JOB_INDEX = "pa_transfer_jobs_v0"
def to_url(host, port, rest):
_d = dict(h=host, p=port, r=rest)
url = "http://{h}:{p}/{r}".format(**_d)
return url
def to_transfer_job_url(host, port):
def f(job_id):
return to_url(host, port, "smrt-transfer/transfers/{a}".format(a=job_id))
return f
def to_transfer_jobs_url(host, port):
return to_url(host, port, "smrt-transfer/transfers")
def get_transfer_job_by_id(host, port, acq_id):
r = rqget(to_transfer_job_url(host, port)(acq_id))
return r.json()
def get_transfer_jobs(host, port):
r = rqget(to_transfer_jobs_url(host, port))
return r.json()
insert_acq_doc = importer(Constants.TRANSFER_JOB_INDEX, Constants.TRANSFER_JOB_DOC_TYPE)
def convert_acq_to_doc(d, transfer_service_version, acq_id):
d['transfer_service_version'] = transfer_service_version
# need to get the acq id associated with this id
# this would require a look up into paws
d['acq_id'] = acq_id
return d
def analyze_transfer_jobs_on_pac(host, port):
sal = ServiceAccessLayer(host, port)
status = sal.get_status()
log.debug(pprint.pformat(status))
log.info("analyzing system {b}".format(b=sal.base_url))
acqd_s = get_transfer_jobs(host, port)
log.debug("Found {n} acqs on {b}".format(n=len(acqd_s), b=sal.base_url))
return acqd_s
def analyze_transfer_job_on_pac(es, host, port, job_id, transfer_service_version, acq_id):
job_d = get_transfer_job_by_id(host, port, job_id)
job_doc = convert_acq_to_doc(job_d, transfer_service_version, acq_id)
_ = insert_acq_doc(es, job_id, job_doc)
log.debug(pprint.pformat(job_doc))
return job_doc
def get_paws_version(sal):
status = sal.get_status()
return status['version']
def to_acqs_url(host, port):
return to_url(host, port, "acquisitions")
def get_acqs(host, port):
r = rqget(to_acqs_url(host, port))
return r.json()
def get_datastore_files(host, port, acq_id):
u = to_url(host, port, "acquisitions/{i}/datastore-files".format(i=acq_id))
r = rqget(u)
return r.json()
def get_all_datastore_files_on_pac(host, port):
# This is a bit brutal
# Need to get all the datastore files on the system to
# create a map of datastore file (i.e., transfer job id) {uuid -> acqId}
dx = {}
for acq_d in get_acqs(host, port):
acq_id = acq_d['acqId']
dfiles_d = get_datastore_files(host, port, acq_id)
for dfile in dfiles_d:
acq_id = dfile['acqId']
# this is the same as the transfer job uuid
file_uuid = dfile['uuid']
dx[file_uuid] = acq_id
return dx
def run_main(systems, es_host, es_port):
es = elastic_system(es_host, es_port)
started_at = time.time()
nsuccess = 0
nfailed = 0
ntotal = 0
def summary_msg(systems_):
run_time_ = time.time() - started_at
d_ = dict(s=nsuccess, f=nfailed, t=ntotal, r=run_time_, x=systems_)
return "Completed inserting successful {s} of {t} failed {f} of {t} in {r:.2f} sec on systems {x}".format(**d_)
for pac_host, pac_port in systems:
try:
paws_version = get_paws_version(ServiceAccessLayer(pac_host, pac_port))
job_ds = analyze_transfer_jobs_on_pac(pac_host, pac_port)
file_to_acq_id_d = get_all_datastore_files_on_pac(pac_host, pac_port + 1)
try:
for job_d in job_ds:
ntotal += 1
# a user may have deleted the paws db
job_uuid = job_d.get('uuid', None)
acq_id = file_to_acq_id_d[job_uuid]
_ = analyze_transfer_job_on_pac(es, pac_host, pac_port, job_uuid, paws_version, acq_id)
nsuccess += 1
add_import_summary_message(elastic_system(es_host, es_port), ImportSummaryMessage(Constants.IMPORTER_ID, summary_msg(["{h}:{p}".format(h=pac_host, p=pac_port)])))
except Exception as e:
nfailed += 1
log.error("Failed to get Acqs from {h}:{p}".format(h=pac_host, p=pac_port), exc_info=True)
except Exception as e:
log.error("Failed to get Acqs from {h}:{p}".format(h=pac_host, p=pac_port), exc_info=True)
log.info(summary_msg(systems))
return 0
def get_parser():
p = get_default_argparser(__version__, __doc__)
f = p.add_argument
f('-s', '--alpha-system', default=None, help="Specific ALPHA system (e.g, pac-alpha1) to import. Defaults to ALL")
f('--alpha-port', default=Constants.TRANSFER_SERVICE_DEFAULT_PORT, help="Default TransferService Port")
add_args_es_common(p)
add_arg_periodic(p)
add_base_options(p, default_level=logging.INFO)
return p
def args_runner(args):
log.info("Raw args {a}".format(a=args))
system_names = (args.alpha_system, ) if args.alpha_system is not None else Constants.PACS
systems = [(x, args.alpha_port) for x in system_names]
return runner_periodic(run_main, systems, args.es_host, args.es_port, periodic_time_sec=args.periodic_time)
def main(argv):
return pacbio_args_runner(argv[1:],
get_parser(),
args_runner,
log,
setup_log_func=setup_log)
if __name__ == '__main__':
sys.exit(main(sys.argv))
#!/usr/bin/env python
"""Import SubreadSet XML files from /pbi/collections
This was taken from the extras in secondary-analysis-internal smrt server
and modified.
Template taken from:
https://github.com/PacificBiosciences/pbcommand/blob/master/pbcommand/cli/examples/template_simple.py
"""
import argparse
import os
import sys
import logging
from collections import namedtuple
from elasticsearch import Elasticsearch
from pbcommand.common_options import add_base_options
import yaml
from pbcommand.utils import setup_log, get_dataset_metadata_or_none, walker
from pbcommand.cli import get_default_argparser_with_base_opts, pacbio_args_runner
from pbcommand.services.service_access_layer import rqpost, rqget
# this is painful. This will require hdf5 just to parse an XML file
from pbcore.io.dataset.DataSetIO import SubreadSet
from etl_utils import *
log = logging.getLogger(__name__)
__version__ = "0.2.1"
__author__ = "M. Kocher"
FIELDS = "runcode exp_id path uuid context created_at ics_version pa_version sample_name inst_name inst_id"
class LimsSubreadSetRecord(namedtuple("LimsSubreadSetRecord", FIELDS)):
def to_dict(self):
# namedtuples are ordered dicts
return dict(self._asdict())
def _to_s(x):
return "{x}s".format(x=x)
class Constants(object):
IMPORTER_ID = "import_pbi_collections"
# singular for the doc type type, plural for the index (i.e., table)
NAME = "lims_subreadset"
SUBREADSET_VERSION = "_v2"
SUBREAD_DOC_TYPE = NAME + SUBREADSET_VERSION
SUBREAD_INDEX = _to_s(NAME) + SUBREADSET_VERSION
insert_lims_subreadset = importer(Constants.SUBREAD_INDEX, Constants.SUBREAD_DOC_TYPE)
def _get_subreadset_xml_from_dir(path):
for x in os.listdir(path):
if x.endswith("subreadset.xml"):
return os.path.join(path, x)
return None
def _get_subreadset_xml_or_raise(path):
x = _get_subreadset_xml_from_dir(path)
if x is None:
raise IOError("Unable to find subreadset in {x}".format(x=path))
return x
def load_subreadset_record_from_yaml_file(path):
"""
Load the yaml file and try to find the subreadset. Will also peek into
the subreadset to get the UUID of the dataset
This is messy.
"""
# look for the subreadset in the dir before doing anything
root_dir = os.path.dirname(path)
subreadset_path = _get_subreadset_xml_or_raise(root_dir)
# strict will allow loading of datasets where the external resources
# are not found (e.g., can't find the bam files)
subreadset = SubreadSet(subreadset_path, strict=False)
with open(path, 'r') as f:
d = yaml.load(f.read())
f = subreadset.metadata['Collections']['CollectionMetadata'].attrib
# Not clear if ES can import this format
# created_at = f['CreatedAt']
created_at = modification_date(subreadset_path)
# I don't quite understand the model here
x = subreadset.metadata.collections.submetadata[0]
g = x.attrib
movie_context = g['Context']
ics_version = x['InstCtrlVer'].getV("text")
pa_version = x['SigProcVer'].getV("text")
sample_name = x['WellSample']['WellName'].getV("text")
inst_name = g['InstrumentName']
inst_id = g['InstrumentId']
args = (d['runcode'],
int(d['expcode']),
subreadset_path,
subreadset.uuid,
movie_context,
created_at,
ics_version,
pa_version,
sample_name,
inst_name,
inst_id)
s = LimsSubreadSetRecord(*args)
return s
def run_import_from_yaml(es, path):
subreadset_record = load_subreadset_record_from_yaml_file(path)
result = insert_lims_subreadset(es, subreadset_record.uuid, subreadset_record.to_dict())
log.info("LimsSubreadSet:")
log.info(result)
return 0
def run_import_from_dir(es, root_dir):
def filter_func(path_):
return path_.endswith("lims.yml")
started_at = time.time()
nsuccess = 0
nfailed = 0
total = 0
results = []
def summary_msg():
run_time_ = time.time() - started_at
d_ = dict(s=nsuccess, f=nfailed, t=total, r=run_time_, x=root_dir)
return "Completed inserting successful {s} of {t} failed {f} of {t} in {r:.2f} sec from {x}".format(**d_)
for path in walker(root_dir, filter_func):
total += 1
try:
result = run_import_from_yaml(es, path)
results.append(result)
nsuccess += 0
except Exception as e:
log.error("Failed to analyze {p}".format(p=path), exc_info=True)
nfailed +=1
if not results:
log.warn("Failed to find any files to import.")
msg = summary_msg()
add_import_summary_message(es, ImportSummaryMessage(Constants.IMPORTER_ID, msg))
log.info(msg)
return 0
def validate_yaml_file(p):
try:
with open(p, 'r') as f:
_ = yaml.load(f.read())
return p
except Exception as e:
raise argparse.ArgumentTypeError("Invalid yaml file {p}. {e}".format(p=p, e=repr(e)))
def validate_path_or_yaml_file(p):
f = os.path.abspath
if os.path.isdir(p):
return f(p)
elif os.path.isfile(p):
return validate_yaml_file(f(p))
else:
raise IOError("Unable to find {p}".format(p=p))
def get_parser():
"""Define Parser. Use the helper methods in validators to validate input"""
p = get_default_argparser_with_base_opts(__version__, __doc__, default_level="INFO")
f = p.add_argument
f('path_to_file', type=validate_path_or_yaml_file,
help="Path to lims.yml or root directory to recursively analysis lims.yml files")
add_args_es_common(p)
add_arg_periodic(p)
return p
def run_main(host, port, dir_or_yaml):
func = run_import_from_yaml if os.path.isfile(dir_or_yaml) else run_import_from_dir
es = elastic_system(host, port)
return func(es, dir_or_yaml)
def args_runner(args):
log.info("Raw args {a}".format(a=args))
# this takes ~ 3 hours! to run on /pbi/collections root dir
return runner_periodic(run_main, args.es_host, args.es_port, args.path_to_file, periodic_time_sec=args.periodic_time)
def main(argv):
return pacbio_args_runner(argv[1:],
get_parser(),
args_runner,
log,
setup_log_func=setup_log)
if __name__ == '__main__':
sys.exit(main(sys.argv))
#!/usr/bin/env python
import datetime
import sys
import os
import logging
import tempfile
import subprocess
import time
import uuid as U
from collections import OrderedDict
from xml.etree.ElementTree import ElementTree, ParseError
from elasticsearch import Elasticsearch
from pbcommand.cli import get_default_argparser, pacbio_args_runner
from pbcommand.common_options import add_base_options
from pbcommand.utils import setup_log
__author__ = 'M. Kocher'
__version__ = '0.1.0'
log = logging.getLogger(__name__)
def elastic_system(host="localhost", port=9200):
cx = dict(host=host, port=port)
es = Elasticsearch([cx])
return es
class SGEJob(object):
def __init__(self, idx, name, user, state, queue_name, slots, created_at=None, uuid=None):
# Elastic search uuid, otherwise the the sge id (idx) will overwrite the doc
# this is now a snapshot mechanism. Getting all jobs should use DISTINCT
self.uuid = str(U.uuid4()) if uuid is None else uuid
self.idx = idx
self.name = name
self.user = user
self.state = state
self.queue_name = queue_name
self.nslots = slots
self.created_at = datetime.datetime.now() if created_at is None else created_at
def __repr__(self):
_d = dict(k=self.__class__.__name__, i=self.idx, u=self.user,
s=self.state, n=self.nslots, q=self.queue_name, x=self.name)
return "<{k} {i} nslots:{n} user:{u} state:{s} queue:{q} {x} >".format(**_d)
def to_dict(self):
# copy
d = dict(self.__dict__)
d['created_at'] = self.created_at.isoformat()
return d
def _get_data(e, element_name):
x = e.find(element_name)
v = x.text
return v
def __get_data_with_type(e, element_name, type_):
return type_(_get_data(e, element_name))
def __get_jobs_from_xml_node(xml_node_iter):
job_attrs = OrderedDict([('JB_job_number', int),
('JB_name', str),
('JB_owner', str),
('state', str),
('queue_name', str),
('slots', int)])
sge_jobs = []
for ji in xml_node_iter:
attrs = []
for element_name, etype in job_attrs.iteritems():
v = __get_data_with_type(ji, element_name, etype)
attrs.append(v)
job = SGEJob(*attrs)
sge_jobs.append(job)
return sge_jobs
def _parse_qstat_xml(file_name):
et = ElementTree(file=file_name)
r = et.getroot()
# get all running jobs
qis = r.findall('queue_info')
qi = qis[0]
jis = qi.findall('job_list')
# get all jobs in the queue
wis = r.findall('job_info')
wi = wis[0]
wjis = wi.findall('job_list')
sge_jobs = __get_jobs_from_xml_node(jis)
wsge_jobs = __get_jobs_from_xml_node(wjis)
return sge_jobs + wsge_jobs
def _to_qstat_command(output_file, user=None):
"""Generate a qstat command to supplied output_file"""
u = "*" if user is None else user
cmd = 'qstat -u "{u}" -xml > {o}'.format(u=u, o=output_file)
return cmd
def get_jobs(user=None):
"""Get the current SGE jobs """
f = tempfile.NamedTemporaryFile(suffix="_qstat.xml", delete=False)
f.close()
qstat_xml = f.name
cmd = _to_qstat_command(qstat_xml, user=user)
p = subprocess.Popen(cmd, shell=True)
p.wait()
try:
sge_jobs = _parse_qstat_xml(qstat_xml)
except ParseError:
log.warn("Unable to get qstat jobs")
sge_jobs = []
if os.path.exists(qstat_xml):
os.remove(qstat_xml)
return sge_jobs
def insert_qsub_job_doc(es, id_, job_doc):
"""
:type es: Elasticsearch
"""
es.index(index="sge_jobs_v2", doc_type="sge_jobs2", id=id_, body=job_doc)
log.debug("Successfully inserted doc {i}".format(i=id_))
return job_doc
def insert_jobs(es, sge_jobs):
for sge_job in sge_jobs:
id_ = sge_job.uuid
doc = sge_job.to_dict()
_ = insert_qsub_job_doc(es, id_, doc)
def get_and_insert_jobs(es):
try:
qsub_jobs = get_jobs()
insert_jobs(es, qsub_jobs)
except Exception as e:
log.error("Failed to insert jobs", exc_info=True)
def run_main(es_host, es_port, sleep_time_sec=None):
es = elastic_system(es_host, es_port)
exit_code = 1
if sleep_time_sec is None:
qsub_jobs = get_jobs()
insert_jobs(es, qsub_jobs)
exit_code = 0
else:
log.info("Entering looping model sleep time {s} sec".format(s=sleep_time_sec))
try:
while True:
# wrapped in a a try to avoid crashing
get_and_insert_jobs(es)
time.sleep(sleep_time_sec)
except KeyboardInterrupt:
exit_code = 0
return exit_code
def get_parser():
p = get_default_argparser(__version__, __doc__)
f = p.add_argument
f('--es-host', default="localhost", help="Elastic Search Host")
f('--es-port', default=9200, help="Elastic Search Port")
f('-t', '--sleep-time-sec', type=int, default=None, help="Enable periodic importing of cluster jobs into elastic search")
add_base_options(p, default_level=logging.INFO)
return p
def args_runner(args):
log.info("Raw args {a}".format(a=args))
return run_main(args.es_host, args.es_port, sleep_time_sec=args.sleep_time_sec)
def main(argv):
return pacbio_args_runner(argv[1:],
get_parser(),
args_runner,
log,
setup_log_func=setup_log)
if __name__ == '__main__':
sys.exit(main(sys.argv))
# This is taken from
# https://github.com/PacificBiosciences/instru_crawler/blob/master/requirements.txt
# The homebrew version is 2.3.0. Should probably use the same inst crawler version
elasticsearch==2.3.0
fabric
numpy
requests
#jupyter
# necessary for parsing the lims.yml file
pyyaml
# used for accessing the SL services
pbcommand>=0.4.2
# This is kinda a pain to install just to parse an XML file
pbcore>=1.2.10
-e git://github.com/PacificBiosciences/pbcommand.git#egg=pbcommand
# This is kinda a pain to install just to parse an XML file
-e git://github.com/PacificBiosciences/pbcore.git#egg=pbcore
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment