Skip to content

Instantly share code, notes, and snippets.

@jfeala
Last active August 29, 2015 14:25
Show Gist options
  • Save jfeala/896ec334a38154769388 to your computer and use it in GitHub Desktop.
Save jfeala/896ec334a38154769388 to your computer and use it in GitHub Desktop.
Mounting and caching snippets of BAM files using GTFuse
import requests
import json
from os.path import join
CGHUB_API_ENDPOINT = 'https://cghub.ucsc.edu/cghub/metadata'
SAMPLE_MAPPING = {'tumor': ['0' + str(i) for i in range(1,10)],
'normal': ['1' + str(i) for i in range(5)]}
LIBRARY_MAPPING = {'dna': 'DNA-Seq', 'rna': 'RNA-Seq', 'mirna': 'miRNA-Seq'}
def cgquery(full_details=False, **query_dict):
'''Query the CGHub REST API
>>> cgquery(legacy_sample_id = '*TCGA-AB-2929*')
{'result_set': {'@date': '2014-09-25 15:30:34',
'hits': 10,
'query': 'legacy_sample_id:TCGA-AC-A23H*',
'result_summary': {'downloadable_file_count': 18,
'downloadable_file_size': {'size': 54.4, 'units': 'GB'},
'state_count': {'live': 10}},
'results': [
{'@id': 1,
'aliquot_id': '0f490f28-6450-4da3-9b32-bf510aeab410',
'analysis_data_uri': 'https://cghub.ucsc.edu/cghub/data/analysis/download/2c5fb623-67fb-4919-b7f0-85a25e705bdc',
'analysis_full_uri': 'https://cghub.ucsc.edu/cghub/metadata/analysisFull/2c5fb623-67fb-4919-b7f0-85a25e705bdc',
'analysis_id': '2c5fb623-67fb-4919-b7f0-85a25e705bdc',
...
}
'''
if not query_dict or type(full_details) != bool:
raise ValueError('Please enter search parameters as keywords')
query = 'analysisFull' if full_details else 'analysisDetail'
url = join(CGHUB_API_ENDPOINT, query)
response = requests.get(url, params=query_dict, headers={'Accept': 'application/json'})
assert response.status_code == 200
return json.loads(response.content)['result_set']
def get_analysis_ids(tcga_patient, sample_type, datatype, filetype='bam'):
'''High-level query for analysis IDs by patient'''
if not tcga_patient.endswith('*'):
tcga_patient = tcga_patient + '*'
response = cgquery(legacy_sample_id= tcga_patient, full_details=False)
assert response['hits'] > 0, 'No hits found from search'
hits = response['results']
# filter by sample, datatype, filetype
hits = filter(lambda h: h['sample_type'] in SAMPLE_MAPPING[sample_type], hits)
hits = filter(lambda h: h['library_strategy'] == LIBRARY_MAPPING[datatype], hits)
if filetype in ['fq', 'fastq']:
hits = filter(lambda h: h['refassem_short_name'] == 'unaligned', hits)
else:
hits = filter(lambda h: h['refassem_short_name'] != 'unaligned', hits)
if not hits:
return []
return [h['analysis_id'] for h in hits]
from fileio import locate
import subprocess
import os
import string
import random
import errno
CGHUB_KEY = '/path/to/cghub.key'
CACHE_DIR = '/path/to/sparse_bams'
MOUNT_BASE_DIR = '/path/to/gtfuse_mounts'
BASE_URI = 'https://cghub.ucsc.edu/cghub/data/analysis/download/'
def mkdir_p(path):
'''Helper function to emulate 'mkdir -p' shell command'''
try:
os.makedirs(path)
except OSError as exc: # Python >2.5
if exc.errno == errno.EEXIST and os.path.isdir(path):
pass
else:
raise
def random_folder_generator(size=8, chars=string.ascii_uppercase + string.digits):
'''Make a random (default 8-char) string, e.g. for a folder name'''
return ''.join(random.choice(chars) for x in range(size))
def get_sparse_bam(analysis, link_dir, coords):
'''Download sparse bam and create symlink.
Given an ANALYSIS dict (from cghub REST API) and a region described by
COORDS, mount the corresponding bamfile, download reads, and create a symlink
to the TCGA id within directory LINK_DIR.
'''
# mount temp bamfile and cache SF3B1 reads
bam = GTFuse()
bam.mount(analysis['analysis_id'])
bam.view(coords)
# make a symlink to cached reads
cached_path = bam.get_cached_bamfile()
bam_fname = analysis['legacy_sample_id'][:16] + '.' + analysis['library_strategy'] + '.bam'
link_path = os.path.join(link_dir, bam_fname)
if not os.path.exists(link_path):
os.symlink(cached_path, link_path)
os.symlink(cached_path + '.bai', link_path + '.bai')
bam.unmount()
class GTFuse:
'''Class to represent a mounted GTFuse object'''
def __init__(self, credentials_key_path=CGHUB_KEY, cache_dir=CACHE_DIR, \
mount_base_dir=MOUNT_BASE_DIR, base_uri=BASE_URI):
self.credentials_key_path = credentials_key_path
self.cache_dir = cache_dir
self.mount_dir = os.path.join(mount_base_dir, random_folder_generator())
mkdir_p(self.mount_dir)
self.base_uri = base_uri
def mount(self, analysis_id, only_print_cmd=False):
'''Mounts a CGHub GTFuse object to a temporary directory'''
uri = self.base_uri + analysis_id
cmd = ['gtfuse', '-v', '-c', self.credentials_key_path, \
'--cache-dir=' + self.cache_dir, uri, self.mount_dir]
print ' '.join(cmd)
if only_print_cmd:
return True
print 'Mounting CGHub analysis ' + analysis_id
subprocess.check_call(cmd)
bamfile = list(locate('*.bam*', root=self.mount_dir))
bamfile = [fname for fname in bamfile if '.bai' not in fname]
if len(bamfile) == 1:
self.bamfile = bamfile[0]
self.analysis_id = analysis_id
print 'Successfully mounted'
elif len(bamfile) == 0:
print 'Warning: no .bam file found'
else:
print 'Warning: multiple .bam files found'
def unmount(self):
'''Unmount FUSE filesystem and remove temporary directory'''
cmd = ['fusermount', '-z', '-u', self.mount_dir]
subprocess.check_call(cmd)
os.rmdir(self.mount_dir)
print 'Successfully unmounted'
def view(self, region):
'''Get reads from samtools view'''
if not self.bamfile:
print 'No bam file to query'
return
cmd = ['samtools', 'view', self.bamfile, region]
return subprocess.check_output(cmd)
def mpileup(self, region, mpileup=False):
'''Get pileups from samtools mpileup'''
if not self.bamfile:
print 'No bam file to query'
return
cmd = ['samtools', 'mpileup', '-r', region, self.bamfile]
return subprocess.check_output(cmd)
def get_cached_bamfile(self):
'''Get absolute filepath to cached sparse bam file'''
return os.path.join(self.cache_dir, self.analysis_id, os.path.basename(self.bamfile))
def get_mounted_bamfile(self):
'''Get absolute filepath to mounted GTFuse object'''
return self.bamfile
def delete_reservations_in_cache_dir(self):
files_to_delete = locate('*.reservation', root=self.cache_dir)
for fname in files_to_delete:
os.unlink(fname)
def delete_locks_in_cache_dir(self):
files_to_delete = locate('*.lock', root=self.cache_dir)
for fname in files_to_delete:
os.unlink(fname)
def __repr__(self):
return '<GTFuse object mounted at ' + self.get_mounted_bamfile() + '>'
from gtfuse import get_sparse_bam, get_analysis_ids
from cgquery import cgquery,
link_dir = '/home/user/bam_links/'
coords = 'chr17:7509811-7550142' # region of interest to gather reads for cached bam
analysis_ids = get_analysis_ids('TCGA-MK-A4N6', 'tumor', 'rna')
analysis_dict = cgquery(analysis_id=analysis_ids[0])
get_sparse_bam(analysis_dict, link_dir, coords)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment