Created
August 26, 2016 21:02
-
-
Save nvictus/d82cbac6775cfa67cca7f6bd082d9af9 to your computer and use it in GitHub Desktop.
UCSC fetcher
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
Request snapshots from the UCSC Genome Browser. | |
""" | |
from __future__ import division, print_function, unicode_literals | |
from collections import OrderedDict | |
import sys | |
import re | |
if sys.version_info[0] >= 3: | |
from io import BytesIO | |
else: | |
try: | |
from cStringIO import StringIO as BytesIO | |
except ImportError: | |
from StringIO import StringIO as BytesIO | |
from matplotlib.image import imread | |
import numpy as np | |
import requests | |
import bs4 | |
# European mirror is http://genome-euro.ucsc.edu/ | |
BASE_URL = "https://genome.ucsc.edu" | |
def _scrape_for_pdf(resp, ideogram=False): | |
# Scrape the pdf download page | |
soup = bs4.BeautifulSoup(resp.text, 'html.parser') | |
if ideogram: | |
name = 'hgtIdeo_genome' | |
else: | |
name = 'hgt_genome' | |
a_tags = soup.find_all('a') | |
for a in a_tags: | |
if name in a.attrs['href']: | |
pdf_url = BASE_URL + a.attrs['href'][2:] | |
break | |
else: | |
raise ValueError('Link to PDF not found') | |
r_pdf = requests.get(pdf_url) | |
return r_pdf.content | |
def _scrape_for_images(resp): | |
# Scrape the UCSC browser page | |
soup = bs4.BeautifulSoup(resp.text, 'html.parser') | |
# Image of the main panel | |
img_tag1 = soup.find(id="img_data_ruler") | |
r_data = requests.get(BASE_URL + img_tag1.attrs['src'][2:]) | |
r_data.raise_for_status() | |
im_data = imread(BytesIO(r_data.content)) | |
# Image of the side panel | |
img_tag2 = soup.find(id="img_side_ruler") | |
r_side = requests.get(BASE_URL + img_tag2.attrs['src'][2:]) | |
r_side.raise_for_status() | |
im_side = imread(BytesIO(r_side.content)) | |
# Get the width of the side panel | |
css = img_tag1.attrs['style'] | |
margin_txt = css.split(';')[0].split(':')[1] | |
margin_px = int(next(re.finditer(r'\d+', margin_txt)).group()) | |
return im_side[:, :margin_px, :], im_data[:, margin_px:, :] | |
defaults = { | |
'intronEst': 'hide', | |
'knownGene': 'hide', | |
'mrna': 'hide', | |
'pubs': 'hide', | |
'refGene': 'hide', | |
'snp142Common': 'hide', | |
'cons100way': 'hide', | |
'wgEncodeReg': 'hide', | |
'snp144Common': 'hide', | |
'rmsk': 'hide', | |
} | |
def _process_params(params, region, db, session_id): | |
if isinstance(region, tuple): | |
params['position'] = '{}:{}-{}'.format(*region) | |
else: | |
params['position'] = region | |
params['db'] = db | |
if session_id is not None: | |
params['hgsid'] = session_id | |
else: | |
# hide all default tracks not explicitly turned on | |
for k, v in defaults.items(): | |
params.setdefault(k, v) | |
def fetch(region, db='hg19', session_id=None, params=None, http_session=None): | |
""" | |
Download snapshot from UCSC Genome Browser as RGB array. | |
Input | |
----- | |
region: str | |
genomic coordinate query string | |
db: str | |
name of genome assembly | |
session_id: str, optional | |
genome browser session ID (hgsid) | |
params: dict, optional | |
url parameters, e.g. names of tracks | |
Notes | |
----- | |
UCSC track display modes: | |
'hide', 'dense', 'squish', 'pack', 'full' | |
Returns | |
------- | |
Two RGB image arrays (M x N x 3) | |
im_side: image of the side panel, variable width | |
im_data: image of the main panel, spans the genomic interval exactly | |
Example | |
------- | |
>>> side, data = fetch('chrX:15,000,000-16,000,000', params={'knownGene': 'full'}) | |
""" | |
if params is None: | |
params = {} | |
_process_params(params, region, db, session_id) | |
if http_session is not None: | |
r = http_session.get(BASE_URL + "/cgi-bin/hgTracks", params=params) | |
else: | |
r = requests.get(BASE_URL + "/cgi-bin/hgTracks", params=params) | |
r.raise_for_status() | |
return _scrape_for_images(r) | |
def fetch_pdf(filepath, region, db='hg19', session_id=None, params=None, http_session=None): | |
""" | |
Download snapshot from UCSC Genome Browser as PDF file. | |
""" | |
if params is None: | |
params = {} | |
_process_params(params, region, db, session_id) | |
params['hgt.psOutput'] = 'on' | |
if http_session is not None: | |
r = http_session.get(BASE_URL + "/cgi-bin/hgTracks", params=params) | |
else: | |
r = requests.get(BASE_URL + "/cgi-bin/hgTracks", params=params) | |
r.raise_for_status() | |
content = _scrape_for_pdf(r) | |
with open(filepath, 'wb') as f: | |
f.write(content) | |
def fetch_ideogram(region, db='hg19', session_id=None, params=None): | |
if params is None: | |
params = {} | |
_process_params(params, region, db, session_id) | |
params['hgt.psOutput'] = 'on' | |
r = requests.get(BASE_URL + "/cgi-bin/hgTracks", params=params) | |
r.raise_for_status() | |
content = _scrape_for_pdf(r, ideogram=True) | |
return content | |
class UCSCSession(object): | |
def __init__(self, db='hg19', params=None): | |
self._session = requests.Session() | |
self.params = OrderedDict({'db': db}) | |
self.params.update(defaults) | |
if params is not None: | |
self.params.update(params) | |
self.refresh() | |
def _from_text(self, text): | |
params = OrderedDict() | |
lines = text.split('\n') | |
for line in lines: | |
key, _, value = line.partition(' ') | |
params[key] = value | |
return params | |
def _to_text(self, params): | |
return '\n'.join('{} {}'.format(k, v) for k,v in params.items()) | |
def _push(self): | |
content = self._to_text(self.params).encode('utf-8') | |
r = self._session.post( | |
BASE_URL + "/cgi-bin/hgSession", | |
data={'hgS_doLoadLocal':'submit'}, | |
files={'hgS_loadLocalFileName': ('hgSession.txt', content, 'text/plain')}) | |
r.raise_for_status() | |
return r | |
def _pull(self): | |
r = self._session.get(BASE_URL + '/cgi-bin/hgSession', | |
data={'hgS_doSaveLocal': 'submit', | |
'hgS_saveLocalFileName': '', | |
'hgS_saveLocalFileCompress': 'plain text'}) | |
r.raise_for_status() | |
self.params.update(self._from_text(r.text)) | |
return r | |
def refresh(self): | |
self._push() | |
self._pull() | |
def __getitem__(self, key): | |
self._pull() | |
return self.params[key] | |
def __setitem__(self, key, value): | |
self.params[key] = value | |
self.refresh() | |
def __delitem__(self, key): | |
del self.params[key] | |
self.refresh() | |
def update(self, *args, **kwargs): | |
self.params.update(*args, **kwargs) | |
self.refresh() | |
def loads(self, text): | |
self.params = self._from_text(text) | |
def dumps(self): | |
return self._to_text(self.params) | |
def load(self, filepath): | |
with open(filepath, 'r') as f: | |
self.params = self._from_text(f.read()) | |
self.refresh() | |
def save(filepath): | |
with open(filepath, 'w') as f: | |
filepath.write(self._to_text(self.params)) | |
def fetch(self, region): | |
r = self._session.get(BASE_URL + "/cgi-bin/hgTracks", data={'position': region}) | |
r.raise_for_status() | |
return _scrape_for_images(r) | |
if __name__ == '__main__': | |
import matplotlib.pyplot as plt | |
im0, im1 = fetch('chr16:52,800,000-56,000,000', params={'knownGene': 'full'}) | |
plt.imshow(im1) | |
plt.show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I have to say, this is amazing. Thank you for sharing, Nezar!