Last active
March 7, 2018 23:18
-
-
Save clintval/f446c2cc9a4c5cf44371508930263424 to your computer and use it in GitHub Desktop.
Faux-object-oriented model for COSMIC used in DOI:10.1073/pnas.1700759114
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import textwrap | |
| from operator import itemgetter | |
| from Bio.Seq import reverse_complement | |
| from pyfaidx import Fasta, FetchError | |
| from collections import Counter, defaultdict, OrderedDict | |
| from pymutagenesis.seq import * | |
| link = '<a target="_blank" href={}>{}</a>' | |
| header_style = '<div style="text-align:left;"><b>{}<br></b></div>' | |
| accession_url = 'https://www.ncbi.nlm.nih.gov/gquery/?term={}' | |
| PubMed_url = 'https://www.ncbi.nlm.nih.gov/PubMed/?term={}' | |
| # COSMIC web API links | |
| sample_url = 'http://cancer.sanger.ac.uk/cosmic/sample/overview?id={}' | |
| search_url = 'http://cancer.sanger.ac.uk/cosmic/search?q={}' | |
| study_url = 'http://cancer.sanger.ac.uk/cosmic/study/overview?study_id={}' | |
| tissue_url = ('http://cancer.sanger.ac.uk/cosmic/browse/tissue#sn={}' | |
| '&ss={}&hn={}&sh={}&in=t&src=tissue&all_data=y') | |
| def lookup_table(samples): | |
| lookup = defaultdict(list) | |
| for sample in sorted(samples): | |
| lookup[sample.name].append(sample.sample_id) | |
| return lookup | |
| class CosmicMutation(object): | |
| def __init__(self): | |
| self.AA = '' | |
| self.cds = '' | |
| self.chrom = '' | |
| self.description = '' | |
| self.FATHMM_pred = '' | |
| self.FATHMM_score = '' | |
| self.gene_cds_length = '' | |
| self.gene_id = '' | |
| self.gene_name = '' | |
| self.GRCh = '' | |
| self.id = '' | |
| self.LOH = '' | |
| self.position = (0, 0) | |
| self.SNP = '' | |
| self.strand = '' | |
| self.somatic_status = '' | |
| self.zygosity = '' | |
| def __eq__(self, other): | |
| return self.id == other | |
| def __hash__(self): | |
| return hash(self.id) | |
| def __lt__(self, other): | |
| return self.id < other.id | |
| def set_context(self, ref_object, k=3, notation='pyrimidine'): | |
| self.context = '' | |
| self.label = '' | |
| if isinstance(ref_object, Fasta): | |
| try: | |
| kmer = str(ref_object.get_seq(self.chrom, | |
| self.position[0] - 1, | |
| self.position[1] + 1)).upper() | |
| except FetchError: | |
| return None | |
| elif isinstance(ref_object, dict): | |
| kmer = get_kmer(ref_object, | |
| self.chrom, | |
| self.position[0] - 1, k=3).upper() | |
| if kmer is None: | |
| return None | |
| label = self.cds_type | |
| if ( | |
| len(kmer) != 3 or | |
| len(label) != 3 or | |
| label[0] not in dna_bases or | |
| label[1] is not '>' or | |
| label[2] not in dna_bases | |
| ): | |
| return None | |
| if label[0] in purines if notation is 'pyrimidine' else pyrimidines: | |
| label = reverse_complement(label)[::-1] | |
| if kmer[1] in purines if notation is 'pyrimidine' else pyrimidines: | |
| kmer = reverse_complement(kmer) | |
| self.label = kmer[0] + '[' + label + ']' + kmer[2] | |
| self.context = kmer | |
| return None | |
| def __repr__(self): | |
| from tabulate import tabulate | |
| wrapper = textwrap.TextWrapper(width=54, | |
| initial_indent='', | |
| subsequent_indent=' ' * 26) | |
| table = [ | |
| ['OVERVIEW', ''], ['--------', ''], | |
| ['ID:', self.id], | |
| ['GRCh Version:', self.GRCh], | |
| ['Mutation Somatic Status:', wrapper.fill(self.somatic_status)], | |
| ['Mutation Description:', wrapper.fill(self.description)], | |
| ['', ''], ['MUTATION', ''], ['--------', ''], | |
| ['Amino Acid:', self.AA], | |
| ['Coding DNA Sequence:', self.cds], | |
| ['Chromosome:', self.chrom], | |
| ['Position:', '-'.join(map(str, self.position))], | |
| ['Strand:', self.strand], | |
| ['SNP:', self.SNP], | |
| ['', ''], ['GENE AFFECTED', ''], ['-------------', ''], | |
| ['Name:', self.gene_name], | |
| ['Gene ID:', self.gene_id], | |
| ['Gene CDS Length:', self.gene_cds_length], | |
| ['', ''], ['METADATA', ''], ['--------', ''], | |
| ['FATHMM Prediction:', self.FATHMM_pred], | |
| ['FATHMM Score:', self.FATHMM_score], | |
| ['Loss of Heterozygozity:', self.LOH], | |
| ['Zygosity:', self.zygosity]] | |
| return(tabulate(table, tablefmt='plain')) | |
| def _repr_html_(self): | |
| from tabulate import tabulate | |
| tables = [ | |
| [[header_style.format("OVERVIEW"), ''], | |
| ['ID:', self.id], | |
| ['GRCh Version:', self.GRCh], | |
| ['Mutation Somatic Status:', self.somatic_status], | |
| ['Mutation Description:', self.description]], | |
| [[header_style.format("MUTATION"), ''], | |
| ['Chromosome:', self.chrom], | |
| ['Position:', | |
| '-'.join(['{:,}'.format(_) for _ in self.position])], | |
| ['Strand:', self.strand], | |
| ['SNP:', self.SNP], | |
| ['Amino Acid:', self.AA], | |
| ['Coding DNA Sequence:', self.cds]], | |
| [[header_style.format("GENE AFFECTED"), ''], | |
| ['Name:', self.gene_name], | |
| ['Gene ID:', self.gene_id], | |
| ['Gene CDS Length:', self.gene_cds_length]], | |
| [[header_style.format("METADATA"), ''], | |
| ['FATHMM Prediction:', self.FATHMM_pred], | |
| ['FATHMM Score:', self.FATHMM_score], | |
| ['Loss of Heterozygozity:', self.LOH], | |
| ['Zygosity:', self.zygosity]]] | |
| return(''.join([tabulate(table, | |
| tablefmt='html', | |
| headers='firstrow') for table in tables])) | |
| class CosmicSelection(object): | |
| def __init__(self, samples): | |
| self.samples = OrderedDict(sorted(samples.items(), key=itemgetter(1))) | |
| def gene_frequencies(self, num=None, by=''): | |
| gene_frequencies = Counter() | |
| for sample in self.samples.values(): | |
| for mutation in sample.mutations.values(): | |
| gene_frequencies.update([mutation.gene_name]) | |
| return gene_frequencies.most_common(num) | |
| def all_mutations(self): | |
| mutations = [] | |
| for sample in self.samples.values(): | |
| mutations.extend(sample.mutations.values()) | |
| return mutations | |
| class CosmicSample(object): | |
| def __init__(self, name): | |
| # Identifiers | |
| self.accession = '' | |
| self.HGNC_id = '' | |
| self.PubMed_id = '' | |
| self.sample_id = '' | |
| self.study_id = '' | |
| self.tumour_id = '' | |
| # Clinical | |
| self.age = 0 | |
| self.genome_wide_screen = '' | |
| self.histology = {'primary': '', | |
| 'subtype 1': '', 'subtype 2': '', 'subtype 3': ''} | |
| self.name = name | |
| self.sample_source = '' | |
| self.site = {'primary': '', | |
| 'subtype 1': '', 'subtype 2': '', 'subtype 3': ''} | |
| self.tumour_origin = '' | |
| self.mutations = [] | |
| def gene_frequencies(self, N=None): | |
| gene_frequencies = Counter() | |
| for mutation in self.mutations: | |
| gene_frequencies.update([mutation.gene_name]) | |
| return gene_frequencies.most_common(N) | |
| def __eq__(self, other): | |
| return self.sample_id == self.other_id | |
| def __lt__(self, other): | |
| return len(self.mutations) > len(other.mutations) | |
| def __repr__(self): | |
| from tabulate import tabulate | |
| table = [ | |
| ['OVERVIEW', ''], ['--------', ''], | |
| ['Sample:', self.name], | |
| ['Mutations:', '{:,}'.format(len(self.mutations))], | |
| ['', ''], ['SITE', ''], ['----', ''], | |
| ['Primary:', self.site['primary']], | |
| ['Subtype 1:', self.site['subtype 1']], | |
| ['Subtype 2:', self.site['subtype 2']], | |
| ['Subtype 3:', self.site['subtype 3']], | |
| ['', ''], ['HISTOLOGY', ''], ['---------', ''], | |
| ['Primary:', self.histology['primary']], | |
| ['Subtype 1:', self.histology['subtype 1']], | |
| ['Subtype 2:', self.histology['subtype 2']], | |
| ['Subtype 3:', self.histology['subtype 3']], | |
| ['', ''], ['CLINICAL INFORMATION', ''], | |
| ['--------------------', ''], | |
| ['Age:', self.age], | |
| ['Genome Wide Screen:', self.genome_wide_screen], | |
| ['Sample Source:', self.sample_source], | |
| ['Tumour Origin:', self.tumour_origin], | |
| ['', ''], ['IDENTIFIERS', ''], ['-----------', ''], | |
| ['Accession:', self.accession], | |
| ['HGNC ID:', self.HGNC_id], | |
| ['PubMed ID:', self.PubMed_id], | |
| ['Sample ID:', self.sample_id], | |
| ['Study ID:', self.study_id], | |
| ['Tumour ID:', self.tumour_id]] | |
| return(tabulate(table, tablefmt='plain')) | |
| def _repr_html_(self): | |
| from tabulate import tabulate | |
| tables = [ | |
| [[header_style.format('OVERVIEW'), ''], | |
| ['Sample:', | |
| link.format(search_url.format(self.name), self.name)], | |
| ['Mutations:', '{:,}'.format(len(self.mutations))]], | |
| [[header_style.format('SITE'), ''], | |
| ['Primary:', | |
| link.format(tissue_url.format( | |
| self.site['primary'], | |
| 'all', | |
| 'all', | |
| 'all'), self.site['primary'])], | |
| ['Subtype 1:', | |
| link.format(tissue_url.format( | |
| self.site['primary'], | |
| self.site['subtype 1'], | |
| 'all', | |
| 'all'), self.site['subtype 1'])], | |
| ['Subtype 2:', self.site['subtype 2']], | |
| ['Subtype 3:', self.site['subtype 3']] | |
| ], | |
| [[header_style.format('HISTOLOGY'), ''], | |
| ['Primary:', | |
| link.format(tissue_url.format( | |
| self.site['primary'], | |
| self.site['subtype 1'], | |
| self.histology['primary'], | |
| 'all'), self.histology['primary'])], | |
| ['Subtype 1:', | |
| link.format(tissue_url.format( | |
| self.site['primary'], | |
| self.site['subtype 1'], | |
| self.histology['primary'], | |
| self.histology['subtype 1']), self.histology['subtype 1'])], | |
| ['Subtype 2:', self.histology['subtype 2']], | |
| ['Subtype 3:', self.histology['subtype 3']]], | |
| [[header_style.format('CLINICAL INFORMATION'), ''], | |
| ['Age:', self.age], | |
| ['Genome Wide Screen:', self.genome_wide_screen], | |
| ['Sample Source:', self.sample_source], | |
| ['Tumour Origin:', self.tumour_origin]], | |
| [[header_style.format('IDENTIFIERS'), ''], | |
| ['Accession:', | |
| link.format(accession_url.format(self.accession), | |
| self.accession)], | |
| ['HGNC ID:', self.HGNC_id], | |
| ['PubMed ID:', | |
| link.format(PubMed_url.format(self.PubMed_id), | |
| self.PubMed_id)], | |
| ['Sample ID:', | |
| link.format(sample_url.format(self.sample_id), self.sample_id)], | |
| ['Study ID:', | |
| link.format(study_url.format(self.study_id), self.study_id)], | |
| ['Tumour ID:', self.tumour_id]]] | |
| return('<br>'.join([tabulate(table, | |
| tablefmt='html', | |
| headers='firstrow') for table in tables])) | |
| def parse_to_samples(infile): | |
| samples = {} | |
| with open(infile) as handle: | |
| next(handle) | |
| for i, line in enumerate(handle): | |
| line = line.strip('\n').split('\t') | |
| try: | |
| (gene_name, accession, gene_cds_length, HGNC_id, sample_name, | |
| sample_id, tumour_id, primary_site, site_subtype_1, | |
| site_subtype_2, site_subtype_3, hist_primary, hist_subtype_1, | |
| hist_subtype_2, hist_subtype_3, genome_wide_screen, | |
| mutation_id, mutation_cds, mutation_AA, mutation_description, | |
| mutation_zygosity, LOH, GRCh, mutation_position, | |
| mutation_strand, SNP, FATHMM_pred, FATHMM_score, | |
| mutation_somatic_status, PubMed_id, study_id, sample_source, | |
| tumour_origin, age) = line | |
| except ValueError as e: | |
| print('{}: Invalid line {}\n{}'.format(e, i, line)) | |
| if sample_id not in samples: | |
| samples[sample_id] = CosmicSample(sample_name) | |
| # Identifiers | |
| samples[sample_id].accession = accession | |
| samples[sample_id].HGNC_id = HGNC_id | |
| samples[sample_id].PubMed_id = PubMed_id | |
| samples[sample_id].sample_id = sample_id | |
| samples[sample_id].study_id = study_id | |
| samples[sample_id].tumour_id = tumour_id | |
| # Clinical | |
| samples[sample_id].age = age | |
| samples[sample_id].genome_wide_screen = genome_wide_screen | |
| samples[sample_id].histology = { | |
| 'primary': hist_primary, | |
| 'subtype 1': hist_subtype_1, | |
| 'subtype 2': hist_subtype_2, | |
| 'subtype 3': hist_subtype_3} | |
| samples[sample_id].name = sample_name | |
| samples[sample_id].sample_source = sample_source | |
| samples[sample_id].site = { | |
| 'primary': primary_site, | |
| 'subtype 1': site_subtype_1, | |
| 'subtype 2': site_subtype_2, | |
| 'subtype 3': site_subtype_3} | |
| samples[sample_id].tumour_origin = tumour_origin | |
| else: | |
| # Should check to see if we can improve information here. | |
| pass | |
| point = CosmicMutation() | |
| if mutation_position is not '': | |
| point.chrom, point.position = mutation_position.split(':', 1) | |
| start, end = point.position.split('-', 1) | |
| point.position = (int(start), int(end)) | |
| point.chrom = ''.join(['chr', point.chrom]) | |
| if point.chrom == 'chr23': | |
| point.chrom = 'chrX' | |
| elif point.chrom == 'chr24': | |
| point.chrom = 'chrY' | |
| if mutation_cds is not '': | |
| point.cds = mutation_cds | |
| try: | |
| numbers = [s for s in mutation_cds if s.isdigit()] | |
| point.cds_nt = int(''.join(numbers)) | |
| _, point.cds_type = mutation_cds.split(str(point.cds_nt)) | |
| except (AttributeError, ValueError): | |
| point.cds_nt = '' | |
| point.cds_type = '' | |
| if '_' in gene_name: | |
| point.gene_name, point.gene_id = gene_name.split('_', 1) | |
| else: | |
| point.gene_name = gene_name | |
| point.AA = mutation_AA | |
| point.description = mutation_description | |
| point.FATHMM_pred = FATHMM_pred | |
| point.FATHMM_score = FATHMM_score | |
| point.gene_cds_length = gene_cds_length | |
| point.GRCh = GRCh | |
| point.id = mutation_id | |
| point.LOH = LOH | |
| point.SNP = SNP | |
| point.strand = mutation_strand | |
| point.somatic_status = mutation_somatic_status | |
| point.zygosity = mutation_zygosity | |
| samples[sample_id].mutations.append(point) | |
| for sample in samples.values(): | |
| sample.mutations = sorted(sample.mutations) | |
| return samples | |
| def merge_samples(samples, min_mutations=0): | |
| new_samples = {} | |
| attributes = ['accession', 'age', 'HGNC_id', 'PubMed_id', | |
| 'sample_id', 'sample_source', 'study_id', | |
| 'tumour_id', 'tumour_origin'] | |
| for sample in samples.values(): | |
| if sample.name not in new_samples.keys(): | |
| new_samples[sample.name] = sample | |
| else: | |
| new_samples[sample.name].mutations.extend(sample.mutations) | |
| mutations = list(set(new_samples[sample.name].mutations)) | |
| new_samples[sample.name].mutations = mutations | |
| for attribute in attributes: | |
| if getattr(new_samples[sample.name], attribute) in ['', 'NS']: | |
| setattr(new_samples[sample.name], | |
| attribute, | |
| getattr(new_samples[sample.name], attribute)) | |
| temp = {} | |
| for name, sample in new_samples.items(): | |
| if len(sample.mutations) <= min_mutations: | |
| continue | |
| temp[name] = sample | |
| return temp |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment