Last active
August 29, 2015 14:21
-
-
Save nanxstats/22063967fc746078e263 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import sys, os, re, time | |
from subprocess import Popen, PIPE | |
import json | |
import requests | |
__license__ = "MIT" | |
__doc__ = """ | |
This script is meant as an example of Seven Bridges Genomics IGOR API usage. | |
Running it will annotate all files in specified directory, upload these files, process them using the specified pipeline and download the results. | |
Please edit the script to configure the following: | |
token: Supplied auth token for your user. | |
client_id: Supplied API key for your organization. | |
project_id: ID of project to use. | |
pipeline_id: ID of prepared pipeline to run. The pipeline should have all inputs pre-set, except one. | |
uploader_path: Path to IGOR command line uploader. | |
base_meta: A dictionary with metadata common for all files that you will be uploading. | |
file_regexp: A regular expression containing capture groups used to set metadata for individual files (from file name). | |
Usage: igor_api_example.py path_to_dir_containing_files_to_process | |
""" | |
# Supplied auth token and api key | |
TOKEN, CLIENT_ID = 'MY_AUTH_TOKEN', 'MY_CLIENT_ID' | |
# IDs of project, task template and input node | |
PROJECT_ID, TEMPLATE_TASK_ID, INPUT_NODE_ID = '111', '222', '333' | |
# Path to SBG uploader | |
UPLOADER_PATH = '/path/to/sbg-uploader.sh' | |
# Metadata common for all files | |
BASE_META = {'file_type': 'fastq', 'seq_tech': 'illumina'} | |
# Use capture groups to extract metadata from file name | |
FILE_REGEXP = re.compile(r'^(?P<sample>.*)_(?P<paired_end>[12])\.f(ast)?q$') | |
class SbgApi(object): | |
""" | |
Example wrapper around v0.9 API calls. | |
Initialize with supplied client id and auth token. | |
""" | |
def __init__(self, client_id, auth_token): | |
self.endpoint = 'https://api.sbgenomics.com/0.9/' | |
self.headers = { | |
'X-SBG-CLIENT-ID': client_id, | |
'X-SBG-AUTH-TOKEN': auth_token, | |
'Accept': 'application/json', | |
'Content-type': 'application/json', | |
} | |
def _request(self, path, method='GET', query=None, data=None): | |
data = json.dumps(data) if isinstance(data, dict) else None | |
print method, self.endpoint + path, query or {}, data | |
response = requests.request(method, self.endpoint + path, params=query, data=data, headers=self.headers) | |
response_dict = json.loads(response.content) if response.content else {} | |
if response.status_code / 100 != 2: | |
raise Exception(response_dict.get('message', 'Server responded with status code %s.' % response.status_code)) | |
return response_dict | |
def upload_list(self): | |
return self._request('upload/')['items'] | |
def upload_details(self, upload_name): | |
return self._request('upload/%s/' % upload_name) | |
def project_list(self): | |
return self._request('project/')['items'] | |
def project_details(self, project_id): | |
return self._request('project/%s/' % project_id) | |
def file_list(self, project_id): | |
return self._request('project/%s/file/' % project_id)['items'] | |
def file_details(self, project_id, file_id): | |
return self._request('project/%s/file/%s/' % (project_id, file_id)) | |
def file_update(self, project_id, file): | |
return self._request('project/%s/file/%s/' % (project_id, file.get('id')), method='PUT', data=file) | |
def file_delete(self, project_id, file_id): | |
return self._request('project/%s/file/%s/' % (project_id, file_id), method='DELETE') | |
def file_details(self, project_id, file_id): | |
return self._request('project/%s/file/%s/' % (project_id, file_id)) | |
def file_download_url(self, project_id, file_id): | |
return self._request('project/%s/file/%s/download/' % (project_id, file_id))['url'] | |
def file_copy_from_upload(self, project_id, upload_name): | |
return self._request('project/%s/file/' % project_id, query={'action':'copy', 'from':'/upload/%s'%upload_name}, method='POST') | |
def pipeline_list(self, project_id): | |
return self._request('project/%s/pipeline/' % project_id)['items'] | |
def pipeline_details(self, project_id, pipeline_id): | |
return self._request('project/%s/pipeline/%s/' % (project_id, pipeline_id)) | |
def task_list(self, project_id): | |
return self._request('project/%s/task/' % project_id)['items'] | |
def task_details(self, project_id, task_id): | |
return self._request('project/%s/task/%s/' % (project_id, task_id)) | |
def task_new(self, project_id, task): | |
return self._request('project/%s/task/' % project_id, method='POST', data=task) | |
def task_stop(self, project_id, task_id): | |
return self._request('project/%s/task/%s/' % (project_id, task_id), method='POST', query={'action':'stop'}) | |
def run_task_template(self, project_id, task_id, node_id, file_id_list, task_name=None): | |
""" | |
Creates a task based on existing task. | |
""" | |
task = self.task_details(project_id, task_id) | |
del task['id'] | |
task['inputs'][node_id] = file_id_list | |
task['outputs'] = {} | |
if task_name: | |
task['name'] = task_name | |
print task | |
return self.task_new(project_id, task) | |
def get_task_outputs(self, project_id, task_id): | |
""" | |
Returns a list of all output file IDs if specified task completed successfully. | |
Returns None if task is still running. | |
Raises exception if task was aborted or failed. | |
""" | |
task = self.task_details(project_id, task_id) | |
full_status = task.get('status', {}) | |
if full_status.get('status') == 'active': | |
return None | |
elif full_status.get('status') != 'completed': | |
raise Exception('Task %s' % full_status.get('status')) | |
result = [] | |
for file_id_list in task.get('outputs').values(): | |
result += file_id_list | |
return result | |
def upload_directory(uploader_path, directory_path, upload_name, common_metadata=None, file_regexp='.*'): | |
""" | |
Utility function that wraps SBG uploader. It annotates files with metadata before uploading. | |
Returns a dict that maps uploaded file path to its ID on IGOR. | |
To annotate files, specify a dict containing common metadata for all files, and a regular expression that contains | |
capture groups for fields that vary from file to file. | |
""" | |
if isinstance(file_regexp, basestring): | |
file_regexp = re.compile(file_regexp) | |
base_meta = common_metadata or {} | |
matched_files = [] | |
for dir_path, dirs, files in os.walk(directory_path): | |
for file_name in files: | |
match = file_regexp.match(file_name) | |
if not match: | |
continue | |
file_metadata = dict(base_meta, **match.groupdict()) | |
file_path = os.path.join(dir_path, file_name) | |
with open(file_path + '.meta', 'w') as f: | |
json.dump(file_metadata, f) | |
matched_files.append(file_path) | |
p = Popen([uploader_path, '-v', '-t', TOKEN, '-i', CLIENT_ID, '-n', upload_name] + matched_files, stdout=PIPE) | |
ret_code = p.wait() | |
if ret_code: | |
raise Exception('Uploader exited with code %s' % ret_code) | |
if __name__ == '__main__': | |
if len(sys.argv) == 1 or sys.argv[1].startswith('-'): | |
print __doc__ | |
sys.exit() | |
# Get directory to upload from args | |
data_dir = sys.argv[1] | |
# Use it as upload name | |
upload_name = os.path.basename(data_dir) | |
# Run uploader | |
upload_directory(UPLOADER_PATH, data_dir, upload_name, BASE_META, FILE_REGEXP) | |
# Prepare for API calls | |
api = SbgApi(CLIENT_ID, TOKEN) | |
# Copy the files to project | |
file_id_list = [f['id'] for f in api.file_copy_from_upload(PROJECT_ID, upload_name)['items']] | |
# Create a task with our uploaded files replacing old inputs on INPUT_NODE_ID | |
task_id = api.run_task_template(PROJECT_ID, TEMPLATE_TASK_ID, INPUT_NODE_ID, file_id_list)['id'] | |
# Wait for the task to finish | |
while True: | |
results = api.get_task_outputs(PROJECT_ID, task_id) | |
if results is not None: break | |
time.sleep(30) | |
# Download all output files | |
for file_id in results: | |
url = api.file_download_url(PROJECT_ID, file_id) | |
print 'Downloading file %s from %s' % (file_id, url) | |
print 'Failed' if Popen(['axel', url]).wait() else 'Done' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests, json, time | |
def api(path, method='GET', query=None, data=None): | |
""" Helper function for API calls """ | |
data = json.dumps(data) if isinstance(data, dict) else None | |
base_url = 'https://api.sbgenomics.com/0.9/' | |
headers = { | |
'X-SBG-Client-Id': 'MY CLIENT ID', | |
'X-SBG-Auth-Token': 'MY AUTH TOKEN', | |
'Accept': 'application/json', | |
'Content-type': 'application/json', | |
} | |
print method, base_url+path, query, data | |
response = requests.request(method, base_url + path, params=query, data=data, headers=headers) | |
response_dict = json.loads(response.content) if response.content else {} | |
if response.status_code / 100 != 2: | |
raise Exception(response_dict.get('message', 'Server responded with status code %s.' % response.status_code)) | |
return response_dict | |
def find_fastqc(): | |
""" Finds a pipeline which name starts with "FastQC" across all accessible projects. Returns project_id and pipeline_id. """ | |
for project in api('project/')['items']: | |
for pipeline in api('project/%s/pipeline/' % project['id'])['items']: | |
if pipeline['name'].startswith('FastQC'): | |
return project['id'], pipeline['id'] | |
raise Exception('FastQC Pipeline not found') | |
def get_input_id(project_id, pipeline_id): | |
""" Assumes there is a single input to pipeline and returns its ID. """ | |
return api('project/%s/pipeline/%s/' % (project_id, pipeline_id))['inputs'][0]['id'] | |
def get_fastq_file_id(project_id): | |
for file in api('project/%s/file/' % project_id)['items']: | |
if file['metadata'].get('file_type') == 'fastq': | |
return file['id'] | |
raise Exception('No fastq files in project') | |
def run_task(project_id, pipeline_id, input_id, file_id): | |
task = { | |
"name": "My task", | |
"description": "A text description", | |
"pipeline_id": pipeline_id, | |
"inputs": { | |
input_id: [file_id] | |
} | |
} | |
return api('project/%s/task' % project_id, method='POST', data=task) | |
def wait_for_task(project_id, task_id): | |
while True: | |
task = api('project/%s/task/%s/' % (project_id, task_id)) | |
if task['status']['status'] != 'active': break | |
print task['status']['message'] | |
time.sleep(30) | |
return task | |
def get_result_urls(project_id, task_id): | |
task = api('project/%s/task/%s' % (project_id, task_id)) | |
outputs, urls = [], [] | |
for file_id_list in task.get('outputs').values(): | |
outputs += file_id_list | |
for file_id in outputs: | |
urls.append(api('project/%s/file/%s/download' % (project_id, file_id))['url']) | |
return urls | |
def do_all(): | |
project_id, pipeline_id = find_fastqc() | |
input_id = get_input_id(project_id, pipeline_id) | |
file_id = get_fastq_file_id(project_id) | |
print project_id, pipeline_id, input_id, file_id | |
task = run_task(project_id, pipeline_id, input_id, file_id) | |
task = wait_for_task(project_id, task['id']) | |
return get_result_urls(project_id, task['id']) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment