Skip to content

Instantly share code, notes, and snippets.

@clintval
Last active March 7, 2018 23:18
Show Gist options
  • Select an option

  • Save clintval/f446c2cc9a4c5cf44371508930263424 to your computer and use it in GitHub Desktop.

Select an option

Save clintval/f446c2cc9a4c5cf44371508930263424 to your computer and use it in GitHub Desktop.
Faux-object-oriented model for COSMIC used in DOI:10.1073/pnas.1700759114
import textwrap
from operator import itemgetter
from Bio.Seq import reverse_complement
from pyfaidx import Fasta, FetchError
from collections import Counter, defaultdict, OrderedDict
from pymutagenesis.seq import *
link = '<a target="_blank" href={}>{}</a>'
header_style = '<div style="text-align:left;"><b>{}<br></b></div>'
accession_url = 'https://www.ncbi.nlm.nih.gov/gquery/?term={}'
PubMed_url = 'https://www.ncbi.nlm.nih.gov/PubMed/?term={}'
# COSMIC web API links
sample_url = 'http://cancer.sanger.ac.uk/cosmic/sample/overview?id={}'
search_url = 'http://cancer.sanger.ac.uk/cosmic/search?q={}'
study_url = 'http://cancer.sanger.ac.uk/cosmic/study/overview?study_id={}'
tissue_url = ('http://cancer.sanger.ac.uk/cosmic/browse/tissue#sn={}'
'&ss={}&hn={}&sh={}&in=t&src=tissue&all_data=y')
def lookup_table(samples):
lookup = defaultdict(list)
for sample in sorted(samples):
lookup[sample.name].append(sample.sample_id)
return lookup
class CosmicMutation(object):
def __init__(self):
self.AA = ''
self.cds = ''
self.chrom = ''
self.description = ''
self.FATHMM_pred = ''
self.FATHMM_score = ''
self.gene_cds_length = ''
self.gene_id = ''
self.gene_name = ''
self.GRCh = ''
self.id = ''
self.LOH = ''
self.position = (0, 0)
self.SNP = ''
self.strand = ''
self.somatic_status = ''
self.zygosity = ''
def __eq__(self, other):
return self.id == other
def __hash__(self):
return hash(self.id)
def __lt__(self, other):
return self.id < other.id
def set_context(self, ref_object, k=3, notation='pyrimidine'):
self.context = ''
self.label = ''
if isinstance(ref_object, Fasta):
try:
kmer = str(ref_object.get_seq(self.chrom,
self.position[0] - 1,
self.position[1] + 1)).upper()
except FetchError:
return None
elif isinstance(ref_object, dict):
kmer = get_kmer(ref_object,
self.chrom,
self.position[0] - 1, k=3).upper()
if kmer is None:
return None
label = self.cds_type
if (
len(kmer) != 3 or
len(label) != 3 or
label[0] not in dna_bases or
label[1] is not '>' or
label[2] not in dna_bases
):
return None
if label[0] in purines if notation is 'pyrimidine' else pyrimidines:
label = reverse_complement(label)[::-1]
if kmer[1] in purines if notation is 'pyrimidine' else pyrimidines:
kmer = reverse_complement(kmer)
self.label = kmer[0] + '[' + label + ']' + kmer[2]
self.context = kmer
return None
def __repr__(self):
from tabulate import tabulate
wrapper = textwrap.TextWrapper(width=54,
initial_indent='',
subsequent_indent=' ' * 26)
table = [
['OVERVIEW', ''], ['--------', ''],
['ID:', self.id],
['GRCh Version:', self.GRCh],
['Mutation Somatic Status:', wrapper.fill(self.somatic_status)],
['Mutation Description:', wrapper.fill(self.description)],
['', ''], ['MUTATION', ''], ['--------', ''],
['Amino Acid:', self.AA],
['Coding DNA Sequence:', self.cds],
['Chromosome:', self.chrom],
['Position:', '-'.join(map(str, self.position))],
['Strand:', self.strand],
['SNP:', self.SNP],
['', ''], ['GENE AFFECTED', ''], ['-------------', ''],
['Name:', self.gene_name],
['Gene ID:', self.gene_id],
['Gene CDS Length:', self.gene_cds_length],
['', ''], ['METADATA', ''], ['--------', ''],
['FATHMM Prediction:', self.FATHMM_pred],
['FATHMM Score:', self.FATHMM_score],
['Loss of Heterozygozity:', self.LOH],
['Zygosity:', self.zygosity]]
return(tabulate(table, tablefmt='plain'))
def _repr_html_(self):
from tabulate import tabulate
tables = [
[[header_style.format("OVERVIEW"), ''],
['ID:', self.id],
['GRCh Version:', self.GRCh],
['Mutation Somatic Status:', self.somatic_status],
['Mutation Description:', self.description]],
[[header_style.format("MUTATION"), ''],
['Chromosome:', self.chrom],
['Position:',
'-'.join(['{:,}'.format(_) for _ in self.position])],
['Strand:', self.strand],
['SNP:', self.SNP],
['Amino Acid:', self.AA],
['Coding DNA Sequence:', self.cds]],
[[header_style.format("GENE AFFECTED"), ''],
['Name:', self.gene_name],
['Gene ID:', self.gene_id],
['Gene CDS Length:', self.gene_cds_length]],
[[header_style.format("METADATA"), ''],
['FATHMM Prediction:', self.FATHMM_pred],
['FATHMM Score:', self.FATHMM_score],
['Loss of Heterozygozity:', self.LOH],
['Zygosity:', self.zygosity]]]
return(''.join([tabulate(table,
tablefmt='html',
headers='firstrow') for table in tables]))
class CosmicSelection(object):
def __init__(self, samples):
self.samples = OrderedDict(sorted(samples.items(), key=itemgetter(1)))
def gene_frequencies(self, num=None, by=''):
gene_frequencies = Counter()
for sample in self.samples.values():
for mutation in sample.mutations.values():
gene_frequencies.update([mutation.gene_name])
return gene_frequencies.most_common(num)
def all_mutations(self):
mutations = []
for sample in self.samples.values():
mutations.extend(sample.mutations.values())
return mutations
class CosmicSample(object):
def __init__(self, name):
# Identifiers
self.accession = ''
self.HGNC_id = ''
self.PubMed_id = ''
self.sample_id = ''
self.study_id = ''
self.tumour_id = ''
# Clinical
self.age = 0
self.genome_wide_screen = ''
self.histology = {'primary': '',
'subtype 1': '', 'subtype 2': '', 'subtype 3': ''}
self.name = name
self.sample_source = ''
self.site = {'primary': '',
'subtype 1': '', 'subtype 2': '', 'subtype 3': ''}
self.tumour_origin = ''
self.mutations = []
def gene_frequencies(self, N=None):
gene_frequencies = Counter()
for mutation in self.mutations:
gene_frequencies.update([mutation.gene_name])
return gene_frequencies.most_common(N)
def __eq__(self, other):
return self.sample_id == self.other_id
def __lt__(self, other):
return len(self.mutations) > len(other.mutations)
def __repr__(self):
from tabulate import tabulate
table = [
['OVERVIEW', ''], ['--------', ''],
['Sample:', self.name],
['Mutations:', '{:,}'.format(len(self.mutations))],
['', ''], ['SITE', ''], ['----', ''],
['Primary:', self.site['primary']],
['Subtype 1:', self.site['subtype 1']],
['Subtype 2:', self.site['subtype 2']],
['Subtype 3:', self.site['subtype 3']],
['', ''], ['HISTOLOGY', ''], ['---------', ''],
['Primary:', self.histology['primary']],
['Subtype 1:', self.histology['subtype 1']],
['Subtype 2:', self.histology['subtype 2']],
['Subtype 3:', self.histology['subtype 3']],
['', ''], ['CLINICAL INFORMATION', ''],
['--------------------', ''],
['Age:', self.age],
['Genome Wide Screen:', self.genome_wide_screen],
['Sample Source:', self.sample_source],
['Tumour Origin:', self.tumour_origin],
['', ''], ['IDENTIFIERS', ''], ['-----------', ''],
['Accession:', self.accession],
['HGNC ID:', self.HGNC_id],
['PubMed ID:', self.PubMed_id],
['Sample ID:', self.sample_id],
['Study ID:', self.study_id],
['Tumour ID:', self.tumour_id]]
return(tabulate(table, tablefmt='plain'))
def _repr_html_(self):
from tabulate import tabulate
tables = [
[[header_style.format('OVERVIEW'), ''],
['Sample:',
link.format(search_url.format(self.name), self.name)],
['Mutations:', '{:,}'.format(len(self.mutations))]],
[[header_style.format('SITE'), ''],
['Primary:',
link.format(tissue_url.format(
self.site['primary'],
'all',
'all',
'all'), self.site['primary'])],
['Subtype 1:',
link.format(tissue_url.format(
self.site['primary'],
self.site['subtype 1'],
'all',
'all'), self.site['subtype 1'])],
['Subtype 2:', self.site['subtype 2']],
['Subtype 3:', self.site['subtype 3']]
],
[[header_style.format('HISTOLOGY'), ''],
['Primary:',
link.format(tissue_url.format(
self.site['primary'],
self.site['subtype 1'],
self.histology['primary'],
'all'), self.histology['primary'])],
['Subtype 1:',
link.format(tissue_url.format(
self.site['primary'],
self.site['subtype 1'],
self.histology['primary'],
self.histology['subtype 1']), self.histology['subtype 1'])],
['Subtype 2:', self.histology['subtype 2']],
['Subtype 3:', self.histology['subtype 3']]],
[[header_style.format('CLINICAL INFORMATION'), ''],
['Age:', self.age],
['Genome Wide Screen:', self.genome_wide_screen],
['Sample Source:', self.sample_source],
['Tumour Origin:', self.tumour_origin]],
[[header_style.format('IDENTIFIERS'), ''],
['Accession:',
link.format(accession_url.format(self.accession),
self.accession)],
['HGNC ID:', self.HGNC_id],
['PubMed ID:',
link.format(PubMed_url.format(self.PubMed_id),
self.PubMed_id)],
['Sample ID:',
link.format(sample_url.format(self.sample_id), self.sample_id)],
['Study ID:',
link.format(study_url.format(self.study_id), self.study_id)],
['Tumour ID:', self.tumour_id]]]
return('<br>'.join([tabulate(table,
tablefmt='html',
headers='firstrow') for table in tables]))
def parse_to_samples(infile):
samples = {}
with open(infile) as handle:
next(handle)
for i, line in enumerate(handle):
line = line.strip('\n').split('\t')
try:
(gene_name, accession, gene_cds_length, HGNC_id, sample_name,
sample_id, tumour_id, primary_site, site_subtype_1,
site_subtype_2, site_subtype_3, hist_primary, hist_subtype_1,
hist_subtype_2, hist_subtype_3, genome_wide_screen,
mutation_id, mutation_cds, mutation_AA, mutation_description,
mutation_zygosity, LOH, GRCh, mutation_position,
mutation_strand, SNP, FATHMM_pred, FATHMM_score,
mutation_somatic_status, PubMed_id, study_id, sample_source,
tumour_origin, age) = line
except ValueError as e:
print('{}: Invalid line {}\n{}'.format(e, i, line))
if sample_id not in samples:
samples[sample_id] = CosmicSample(sample_name)
# Identifiers
samples[sample_id].accession = accession
samples[sample_id].HGNC_id = HGNC_id
samples[sample_id].PubMed_id = PubMed_id
samples[sample_id].sample_id = sample_id
samples[sample_id].study_id = study_id
samples[sample_id].tumour_id = tumour_id
# Clinical
samples[sample_id].age = age
samples[sample_id].genome_wide_screen = genome_wide_screen
samples[sample_id].histology = {
'primary': hist_primary,
'subtype 1': hist_subtype_1,
'subtype 2': hist_subtype_2,
'subtype 3': hist_subtype_3}
samples[sample_id].name = sample_name
samples[sample_id].sample_source = sample_source
samples[sample_id].site = {
'primary': primary_site,
'subtype 1': site_subtype_1,
'subtype 2': site_subtype_2,
'subtype 3': site_subtype_3}
samples[sample_id].tumour_origin = tumour_origin
else:
# Should check to see if we can improve information here.
pass
point = CosmicMutation()
if mutation_position is not '':
point.chrom, point.position = mutation_position.split(':', 1)
start, end = point.position.split('-', 1)
point.position = (int(start), int(end))
point.chrom = ''.join(['chr', point.chrom])
if point.chrom == 'chr23':
point.chrom = 'chrX'
elif point.chrom == 'chr24':
point.chrom = 'chrY'
if mutation_cds is not '':
point.cds = mutation_cds
try:
numbers = [s for s in mutation_cds if s.isdigit()]
point.cds_nt = int(''.join(numbers))
_, point.cds_type = mutation_cds.split(str(point.cds_nt))
except (AttributeError, ValueError):
point.cds_nt = ''
point.cds_type = ''
if '_' in gene_name:
point.gene_name, point.gene_id = gene_name.split('_', 1)
else:
point.gene_name = gene_name
point.AA = mutation_AA
point.description = mutation_description
point.FATHMM_pred = FATHMM_pred
point.FATHMM_score = FATHMM_score
point.gene_cds_length = gene_cds_length
point.GRCh = GRCh
point.id = mutation_id
point.LOH = LOH
point.SNP = SNP
point.strand = mutation_strand
point.somatic_status = mutation_somatic_status
point.zygosity = mutation_zygosity
samples[sample_id].mutations.append(point)
for sample in samples.values():
sample.mutations = sorted(sample.mutations)
return samples
def merge_samples(samples, min_mutations=0):
new_samples = {}
attributes = ['accession', 'age', 'HGNC_id', 'PubMed_id',
'sample_id', 'sample_source', 'study_id',
'tumour_id', 'tumour_origin']
for sample in samples.values():
if sample.name not in new_samples.keys():
new_samples[sample.name] = sample
else:
new_samples[sample.name].mutations.extend(sample.mutations)
mutations = list(set(new_samples[sample.name].mutations))
new_samples[sample.name].mutations = mutations
for attribute in attributes:
if getattr(new_samples[sample.name], attribute) in ['', 'NS']:
setattr(new_samples[sample.name],
attribute,
getattr(new_samples[sample.name], attribute))
temp = {}
for name, sample in new_samples.items():
if len(sample.mutations) <= min_mutations:
continue
temp[name] = sample
return temp
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment