Last active
July 10, 2018 23:53
-
-
Save mpkocher/94f3472ec2381f6c1d6445fe447accbc to your computer and use it in GitHub Desktop.
smrtlink-scraper.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
"""Tool for Scrapping Jobs and DataSets from a SMRT Link instance""" | |
import argparse | |
import base64 | |
import datetime | |
import sys | |
import json | |
import os | |
import time | |
import functools | |
import itertools | |
import json | |
import logging | |
import requests | |
import requests.exceptions | |
# To disable the ssl cert check warning | |
from requests.packages.urllib3.exceptions import InsecureRequestWarning | |
requests.packages.urllib3.disable_warnings(InsecureRequestWarning) | |
__version__ = '0.1.0' | |
log = logging.getLogger(__name__) | |
class Constants(object): | |
PREFIX = "SMRTLink/1.0.0" | |
BIHOURLY = "http://smrtlink-bihourly:8081" | |
ALPHA_NIGHTLY = "http://smrtlink-alpha-nightly:8081" | |
ALPHA = "http://smrtlink-alpha:8081" | |
BETA = "http://smrtlink-beta:8081" | |
SMS = "http://smrtlink-sms:8081" | |
def _to_server_url_name(base_url): | |
# Hack to workaround auth vs non-auth | |
return base_url.replace("https", "http").replace("8243", "8081") | |
def r_get(url, access_token): | |
headers = { | |
"Authorization": "Bearer {}".format(access_token), | |
"Accept": "application/json" | |
} | |
verify_cert = False | |
r = requests.get(url, headers=headers, verify=verify_cert) | |
r.raise_for_status() | |
return r.json() | |
def r_get_with_retry(url, token, num_retries=3, retry_sleep=5): | |
try: | |
return r_get(url, token) | |
except requests.exceptions.ConnectionError: | |
if num_retries <= 0: | |
raise | |
else: | |
n_retries = num_retries - 1 | |
log.warning("Retrying ({}) URL {} ".format(num_retries, url)) | |
time.sleep(retry_sleep) | |
return r_get_with_retry(url, token, num_retries=n_retries) | |
def write_json(d, output_file): | |
with open(output_file, 'w+') as f: | |
json.dump(d, f) | |
return d | |
def no_throttle(): | |
return None | |
def to_throttler(throttle_time=None): | |
def throttle_func(): | |
if throttle_time is None: | |
return no_throttle() | |
else: | |
log.debug("sleeping for {} sec".format(throttle_time)) | |
time.sleep(throttle_time) | |
return throttle_func | |
def sanitize_job_d(job_d): | |
if not isinstance(job_d, dict): | |
raise TypeError("Unsupported type {} '{}'".format(type(job_d), job_d)) | |
dx = job_d.copy() | |
def to_d(k): | |
return json.loads(dx[k]) | |
settings = to_d('jsonSettings') | |
job_d['jsonSettings'] = settings | |
return job_d | |
def to_status_url(base_url): | |
return "{}/{}/status".format(base_url, Constants.PREFIX) | |
def to_jobs_url(base_url, job_type): | |
return "{}/{}/smrt-link/job-manager/jobs/{}".format(base_url, Constants.PREFIX, job_type) | |
def to_multi_jobs_url(base_url, job_type): | |
return "{}/{}/smrt-link/job-manager/multi-jobs/{}".format(base_url, Constants.PREFIX, job_type) | |
def to_job_raw_url(base_url, job_id): | |
# dirty get job by id | |
return to_jobs_url(base_url, job_id) | |
def to_job_url(base_url, job_type, job_id): | |
return "{}/{}".format(to_jobs_url(base_url, job_type), job_id) | |
def to_job_datastore_url(base_url, job_id): | |
return "{}/{}/smrt-link/job-manager/jobs/{}/datastore".format(base_url, Constants.PREFIX, job_id) | |
def to_job_entry_point_url(base_url, job_id): | |
return "{}/{}".format(to_job_raw_url(base_url, job_id), 'entry-points') | |
def to_job_reports_url(base_url, job_id): | |
# the job type issue should be fixed in the root API | |
job_type = "pbsmrtpipe" | |
return "{}/{}/smrt-link/job-manager/jobs/{}/{}/reports".format(base_url, Constants.PREFIX, job_type, job_id) | |
def to_job_report_details_url(base_url, job_id, report_uuid): | |
return "{}/{}".format(to_job_reports_url(base_url, job_id), report_uuid) | |
def to_job_tasks_url(base_url, job_id): | |
return "{}/{}".format(to_job_raw_url(base_url, job_id), "tasks") | |
def to_job_events_url(base_url, job_id): | |
return "{}/{}".format(to_job_raw_url(base_url, job_id), "events") | |
def to_job_types_url(base_url): | |
return "{}/{}/smrt-link/job-manager/job-types".format(base_url, Constants.PREFIX) | |
def get_job_types(base_url, token): | |
return r_get_with_retry(to_job_types_url(base_url), token) | |
def scrape_smrtlink_job_type(base_url, token, job_type, output_root, throttle_time=None, is_multi_job=False): | |
f = to_multi_jobs_url if is_multi_job else to_jobs_url | |
jobs_url = f(base_url, job_type) | |
# there's potential collision here with the multi-job and "core" jobs | |
output_json = os.path.join(output_root, 'smrtlink-jobs-{}.json'.format(job_type)) | |
sjobs = [] | |
throttle_func = to_throttler(throttle_time) | |
for x in r_get_with_retry(jobs_url, token): | |
s_job_d = sanitize_job_d(x) | |
job_id = s_job_d['id'] | |
job_state = s_job_d['state'] | |
sjobs.append(s_job_d) | |
# Only scrape completed jobs and Job wasn't already scraped | |
if job_state in ('SUCCESSFUL', 'FAILED', 'TERMINATED'): | |
job_dir = to_job_dir(output_root, job_id) | |
if os.path.exists(job_dir): | |
log.debug("skipping Job Id {} from {} Already scraped in {}".format(base_url, job_id, job_dir)) | |
else: | |
os.mkdir(job_dir) | |
scrape_smrtlink_job_details(base_url, token, output_root, job_id) | |
throttle_func() | |
write_json(sjobs, output_json) | |
log.info("scraped {} total jobs from {} job type {}".format(len(sjobs), | |
base_url, | |
job_type)) | |
return sjobs | |
def scrape_all_smrtlink_job_types(base_url, token, output_root, throttle_time=None): | |
for d in get_job_types(base_url, token): | |
job_type = d['jobTypeId'] | |
is_multi_job = d.get('isMultiJob', False) | |
yield scrape_smrtlink_job_type(base_url, token, job_type, output_root, | |
throttle_time=throttle_time, | |
is_multi_job=is_multi_job) | |
def scrape_all_smrtlink_job_types_and_write(base_url, token, output_root, | |
throttle_time=None): | |
""" | |
Scrape all SL Job types | |
:param throttle_time: Time between retries | |
:param throttle_time None | int : Sleep time between scraping jobs | |
""" | |
started_at = datetime.datetime.now() | |
jobs = [] | |
for js in scrape_all_smrtlink_job_types(base_url, token, output_root, throttle_time=throttle_time): | |
jobs.extend(js) | |
# the naming in explicitly to break the the convention to enable | |
# globbing of the file system | |
output_json = os.path.join(output_root, "smrtlink-all-jobs.json") | |
write_json(jobs, output_json) | |
completed_at = datetime.datetime.now() | |
run_time = (completed_at - started_at).total_seconds() | |
log.info("completed scraping all job types ({} total jobs) for {} in {:.2f} sec".format(len(jobs), base_url, run_time)) | |
return jobs | |
def to_job_dir(output_root, job_id): | |
relative_dir = str(job_id).zfill(6)[:3].zfill(6) | |
x = os.path.join(output_root, relative_dir) | |
if not os.path.exists(x): | |
os.mkdir(x) | |
i = os.path.join(x, str(job_id).zfill(6)) | |
# It's the responsibility of the caller to create the directory | |
return i | |
def scrape_smrtlink_job_details(base_url, token, output_root, job_id): | |
""" | |
Downloads reports and datastore from job | |
""" | |
job_url = to_job_raw_url(base_url, job_id) | |
output_dir = to_job_dir(output_root, job_id) | |
def to_o(file_name): | |
return os.path.join(output_dir, file_name) | |
output_job_json = to_o("job.json") | |
output_datastore_json = to_o("datastore.json") | |
output_report_dir = to_o('reports') | |
output_ep_json = to_o("entry-points.json") | |
output_metrics_json = to_o("metrics-all.json") | |
output_events_json = to_o("events.json") | |
output_tasks_json = to_o("tasks.json") | |
if not os.path.exists(output_report_dir): | |
os.mkdir(output_report_dir) | |
def _to_report_json(report_uuid_): | |
return os.path.join(output_report_dir, "{}.json".format(report_uuid_)) | |
job_d = sanitize_job_d(r_get_with_retry(job_url, token)) | |
job_uuid = job_d['uuid'] | |
job_d['server'] = _to_server_url_name(base_url) | |
job_type = job_d['jobTypeId'] | |
write_json(job_d, output_job_json) | |
# A little bit of etl'ing here to make the data more easily consumable | |
def _add(dx): | |
dx['server'] = _to_server_url_name(base_url) | |
dx['jobId'] = job_id | |
dx['jobUUID'] = job_uuid | |
return dx | |
def _add_to_items(dx_list): | |
for dx_ in dx_list: | |
_add(dx_) | |
return dx_list | |
# DataStore | |
datastore_url = to_job_datastore_url(base_url, job_id) | |
ds_d = r_get_with_retry(datastore_url, token) | |
for ds_file in ds_d: | |
_add(ds_file) | |
write_json(ds_d, output_datastore_json) | |
# Entry Points | |
ep_url = to_job_entry_point_url(base_url, job_id) | |
ep_d = r_get_with_retry(ep_url, token) | |
_add_to_items(ep_d) | |
write_json(ep_d, output_ep_json) | |
# Tasks | |
tasks_url = to_job_tasks_url(base_url, job_id) | |
tasks_d = r_get_with_retry(tasks_url, token) | |
_add_to_items(tasks_d) | |
write_json(tasks_d, output_tasks_json) | |
# Events | |
events_url = to_job_tasks_url(base_url, job_id) | |
events_d = r_get_with_retry(events_url, token) | |
_add_to_items(events_d) | |
write_json(events_d, output_events_json) | |
report_uuids = [d['uuid'] for d in ds_d if d['fileTypeId'] == 'PacBio.FileTypes.JsonReport'] | |
metrics = [] | |
for report_uuid in report_uuids: | |
r_url = to_job_report_details_url(base_url, job_id, report_uuid) | |
output_report_json = _to_report_json(report_uuid) | |
r_d = r_get_with_retry(r_url, token) | |
_add(r_d) | |
write_json(r_d, output_report_json) | |
for a_d in r_d['attributes']: | |
_add(a_d) | |
metrics.append(a_d) | |
write_json(metrics, output_metrics_json) | |
log.info("scraped Job Id:{} type:{} details from {}".format(job_id, | |
job_type, | |
base_url)) | |
return job_d | |
def to_dataset_types_url(base_url): | |
return "{}/{}/smrt-link/dataset-types".format(base_url, Constants.PREFIX) | |
def to_dataset_type_url(base_url, dataset_type): | |
return "{}/{}/smrt-link/datasets/{}?showAll=true".format(base_url, Constants.PREFIX, dataset_type) | |
def to_dataset_type_by_id_url(base_url, dataset_type, dataset_id): | |
return "{}/{}".format(to_dataset_type_url(base_url, dataset_type), | |
dataset_id) | |
def to_dataset_type_details_url(base_url, dataset_type, dataset_id): | |
return "{}/details".format(to_dataset_type_by_id_url(base_url, | |
dataset_type, | |
dataset_id)) | |
def scrape_smrtlink_dataset_type_and_write(base_url, token, dataset_type, output_dir, | |
throttle_time=None): | |
""" | |
Scrape all datasets of a specific dataset type | |
""" | |
# dataset_output_dir = os.path.join(output_dir, 'datasets') | |
dataset_output_dir = output_dir | |
output_dataset_json = os.path.join(dataset_output_dir, 'smrtlink-datasets-{}.json'.format(dataset_type)) | |
datasets_url = to_dataset_type_url(base_url, dataset_type) | |
log.info("scraping dataset type {} from {}".format(dataset_type, base_url)) | |
jx = r_get_with_retry(datasets_url, token) | |
for x in jx: | |
x['server'] = _to_server_url_name(base_url) | |
# Not sure if this is really encoded within the dataset response | |
x['dataset_short_name'] = dataset_type | |
write_json(jx, output_dataset_json) | |
log.info("scraped dataset type {} with {} records from {}".format(dataset_type, len(jx), base_url)) | |
return jx | |
def get_all_dataset_types(base_url, token): | |
url = to_dataset_types_url(base_url) | |
key = 'shortName' | |
return [x[key] for x in r_get_with_retry(url, token)] | |
def scrape_all_smrtlink_dataset_types_and_write(base_url, token, output_dir, throttle_time=None): | |
datasets = [] | |
for dataset_type in get_all_dataset_types(base_url, token): | |
jxs = scrape_smrtlink_dataset_type_and_write(base_url, token, dataset_type, output_dir, throttle_time=throttle_time) | |
datasets.extend(jxs) | |
output_json = os.path.join(output_dir, "smrtlink-all-datasets.json") | |
# this is a polymorphic mess | |
write_json(datasets, output_json) | |
log.info("scraped all dataset types {} for {}".format(len(datasets), base_url)) | |
return datasets | |
def get_status(base_url, token): | |
url = to_status_url(base_url) | |
sx = r_get_with_retry(url, token) | |
sx['server'] = _to_server_url_name(base_url) | |
return sx | |
def scrape_status(base_url, token, output_dir): | |
output_json = os.path.join(output_dir, 'smrtlink-status.json') | |
sx = get_status(base_url, token) | |
write_json(sx, output_json) | |
return sx | |
def _create_auth(secret, consumer_key): | |
return base64.b64encode(":".join([secret, consumer_key])) | |
def get_token(host, port, user, password, scopes, secret, consumer_key): | |
url = "https://{}:{}/token".format(host, port) | |
basic_auth = _create_auth(secret, consumer_key) | |
# To be explicit for pedagogical purposes | |
headers = { | |
"Authorization": "Basic {}".format(basic_auth), | |
"Content-Type": "application/x-www-form-urlencoded" | |
} | |
scope_str = " ".join({s for s in scopes}) | |
payload = dict(grant_type="password", | |
username=user, | |
password=password, | |
scope=scope_str) | |
# verify is false to disable the SSL cert verification | |
return requests.post(url, payload, headers=headers, verify=False) | |
def get_smrtlink_token(host, port, user, password): | |
secret = "KMLz5g7fbmx8RVFKKdu0NOrJic4a" | |
consumer_key = "6NjRXBcFfLZOwHc0Xlidiz4ywcsa" | |
scopes = ["welcome", "run-design", "run-qc", "openid" "admin", | |
"sample-setup", "data-management"] | |
r = get_token(host, port, user, password, scopes, secret, consumer_key) | |
j = r.json() | |
access_token = j['access_token'] | |
refresh_token = j['refresh_token'] | |
scopes = j['scope'].split(" ") | |
return access_token, refresh_token, scopes | |
def get_parser(): | |
p = argparse.ArgumentParser(version=__version__, | |
description=__doc__, | |
formatter_class=argparse.ArgumentDefaultsHelpFormatter) | |
f = p.add_argument | |
f('host', help="SMRT Link Host", type=str) | |
f('--port', help="SMRT Link Port (WARNING this is no longer used)", type=int, default=8081) | |
f('--auth-port', help="SMRT Link Port", type=int, default=8243) | |
f('--output', help="Root Output Data Warehouse Output dir. A subdirectory within the root will be created for each server based on the <root>/<host>-<port> convention ", type=str, default=os.getcwd()) | |
f('--throttle', help="Throttle time (in sec) between each job request", type=float, default=1.0) | |
f('--log-file', help="Output Log file", default=os.path.join(os.getcwd(), "scraped.log"), type=str) | |
f('-u', '--user', help="User name", default="admin") | |
f('-p', '--password', help="User password", default="admin") | |
return p | |
def main(args): | |
p = get_parser() | |
pargs = p.parse_args(args) | |
# print pargs | |
host = pargs.host | |
throttle_time = pargs.throttle | |
auth_port = pargs.auth_port | |
# Note, this has changed with the auth | |
system_id = "{}-{}".format(host, pargs.port) | |
output_dir = os.path.join(os.path.abspath(pargs.output), system_id) | |
base_url = "https://{}:{}".format(pargs.host, auth_port) | |
if not os.path.exists(output_dir): | |
os.mkdir(output_dir) | |
log_format = '[%(levelname)s] %(asctime)-15sZ [%(name)s] %(message)s' | |
logging.basicConfig(filename=pargs.log_file, level=logging.INFO, | |
format=log_format) | |
log.info(pargs) | |
token, _, _ = get_smrtlink_token(host, auth_port, pargs.user, pargs.password) | |
log.info("Got token {}".format(token)) | |
scrape_status(base_url, token, output_dir) | |
scrape_all_smrtlink_job_types_and_write(base_url, token, output_dir, | |
throttle_time=throttle_time) | |
scrape_all_smrtlink_dataset_types_and_write(base_url, token, output_dir, | |
throttle_time=throttle_time) | |
return 0 | |
if __name__ == '__main__': | |
sys.exit(main(sys.argv[1:])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment