Created
March 7, 2018 23:13
-
-
Save clintval/7d8161be99d870992b8960132f62751a to your computer and use it in GitHub Desktop.
An untested Python pileup parser - do not use
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import re | |
| import csv | |
| import textwrap | |
| import matplotlib.pyplot as plt | |
| from collections import Counter | |
| class ReadError(LookupError): | |
| """Raise this when a property of a read is called for and no data exists""" | |
| pass | |
| class PileupError(LookupError): | |
| """Raise this when a Pileup file is corrupt or invalid""" | |
| pass | |
| class Read(object): | |
| def __init__(self): | |
| self._read_bases = [] | |
| @property | |
| def strand(self): | |
| if '.' in self._read_bases: | |
| return '+' | |
| elif ',' in self._read_bases: | |
| return '-' | |
| else: | |
| raise ReadError('No sequence set') | |
| @property | |
| def sequence(self): | |
| if self.strand == '+': | |
| return ''.join(self._read_bases) | |
| else: | |
| return ''.join(self._read_bases[::-1]) | |
| @property | |
| def read_length(self): | |
| return len(self._read_bases) | |
| def __repr__(self): | |
| return('Strand: {}\nSequence:\n{}'.format( | |
| self.strand, | |
| textwrap.fill(self.sequence, width=80))) | |
| class Pileup(object): | |
| def __init__(self): | |
| self.Ns_by_position = Counter() | |
| self.indels_by_position = Counter() | |
| self.mutations_by_position = Counter() | |
| self.sequenced = {base: Counter() for base in dna_bases} | |
| def read(self, infile, start=0, end=None): | |
| reads = [] | |
| self.infile = infile | |
| with open(self.infile) as pileup, open(self.outfile, 'w') as mutpos: | |
| out = csv.writer(mutpos, delimiter='\t') | |
| for i, line in enumerate(pileup): | |
| # if i >= 2000: break | |
| # Strip newlines split on tabs, then unpack and change type. | |
| chrom, pos, ref, depth, read_base, _ = line.strip().split('\t') | |
| pos, depth = int(pos), int(depth) | |
| # If an end location is defined and we have reached it, break. | |
| if self.end is not None and pos > self.end: | |
| break | |
| # Create new Read and append to list of current reads for all | |
| # new reads starting at this location. | |
| for m in re.finditer(r'\^.', read_base): | |
| read_base = read_base.replace(m.group(), '', 1) | |
| reads.append(Read()) | |
| # Loop through indel lengths existing at this position. | |
| # For each of the indels at this position: | |
| # 1. Record them and remove them from the read_base | |
| # 2. Capitalize and check to see if previously found | |
| indel_positions, inserts, deletes = [], [], [] | |
| for length in [int(s) for s in re.findall(r'\d+', read_base)]: | |
| regex = r'[+-]{}[acgtnACGTN]{{{}}}'.format(length) | |
| m = re.search(regex, read_base) | |
| indel_positions.append( | |
| m.start() - 1 - read_base[:m.start()].count('$')) | |
| read_base = read_base.replace(m.group(), '', 1) | |
| indel = m.group().upper() | |
| if '+' in indel: | |
| inserts.append(indel) | |
| elif '-' in indel: | |
| deletes.append(indel) | |
| else: | |
| raise PileupError | |
| # The only double spaced grapheme remaining is the $-symbol | |
| # which is placed after a read_base symbol to signal the end | |
| # of a read. We must first find all of the $-symbols and their | |
| # respective locations and then subtract cumulatively from | |
| # their locations as we find more of them. These positions will | |
| # then reflect the location of the read_base in the string | |
| # after removal of the dollar symbols from the string. | |
| closing = [] | |
| for i, match in enumerate(re.finditer(r'.\$', read_base)): | |
| closing.append(match.start() - i) | |
| # Redefine reads as those that do not need to be closed | |
| # Get sequences from all reads to be closed | |
| sequences = [] | |
| for read in reads: | |
| if reads.index(read) in closing: | |
| sequences.append(read.sequence) | |
| reads = [r for r in reads if reads.index(r) not in closing] | |
| # Continue to reconstruct each read sequence. | |
| for i, base in enumerate(read_base.replace('$', '')): | |
| # The asterisk is simply a deletion placeholder and | |
| # shouldn't be counted. | |
| if base is '*': | |
| continue | |
| elif i in indel_positions: | |
| reads[i]._read_bases.append('X') | |
| else: | |
| reads[i]._read_bases.append(base) | |
| # Filter out records that do not match filter criteria | |
| if pos < self.start: | |
| continue | |
| # Count all times these bases are observed in the read_base | |
| A, C, G, T, N = [read_base.upper().count(_) for _ in 'ACGTN'] | |
| # Correct depth at this location by subtracting Ns | |
| depth = 1 if depth - N == 0 else depth - N | |
| # Tally all sums, nucleotides sequenced, and update Counters. | |
| for base, amount in zip(dna_bases, (A, C, G, T)): | |
| self.sequenced[ref].update(base * amount) | |
| self.sequenced[ref].update(ref * depth) | |
| for i, base in enumerate(list(zip(*sequences))): | |
| self.Ns_by_position.update([i + 1] * base.count('N')) | |
| self.indels_by_position.update([i + 1] * base.count('X')) | |
| self.mutations_by_position.update( | |
| [i + 1] * sum([base.count(_) for _ in dna_bases])) | |
| # Write position summary output to .mutpos file | |
| out.writerow([chrom, ref, pos, depth, A, C, G, T, N, | |
| ':'.join(inserts), ':'.join(deletes)]) | |
| @property | |
| def total_nucleotides(self): | |
| return sum(self.sequenced[base][base] for base in dna_bases) | |
| @property | |
| def outfile(self): | |
| if self.infile.endswith('.pileup'): | |
| return self.infile.replace('.pileup', '.mutpos') | |
| else: | |
| return ''.join([self.infile, '.mutpos']) | |
| def plot_statistics(self, title, **kwargs): | |
| titles = ['Base Substitutions', 'Undeciphered (Ns)', 'Indels'] | |
| xlabels = ['', '', 'Position in Read'] | |
| counters = [self.mutations_by_position, | |
| self.Ns_by_position, | |
| self.indels_by_position, | |
| self.depth_by_reference] | |
| fig, axes = plt.subplots(3, figsize=(6, 8)) | |
| fig.tight_layout(h_pad=4) | |
| plot_data = zip(titles, xlabels, axes, counters) | |
| for title, xlabel, ax, counter in plot_data: | |
| ax = plot_numeric_counter(counter, | |
| ax=ax, | |
| read_length=self.read_length, | |
| num_reads=self.total, | |
| title=title, | |
| kwargs=kwargs) | |
| ax.set_xlabel(xlabel) | |
| title_on = kwargs.pop('title_on', True) | |
| if self.id is not '' and title_on is True: | |
| fig.suptitle(self.id, | |
| y=1.04, | |
| fontsize=kwargs.pop('title_fontize', 20)) | |
| return fig, axes | |
| @property | |
| def _summary(self): | |
| from statsmodels.stats.proportion import proportion_confint | |
| header_style = '<div style="text-align:left;"><b>{}<br></b></div>' # noqa | |
| total_deletions = 0 | |
| total_insertions = 0 | |
| total_substitutions = 0 | |
| table = [ | |
| ['ID:', self.id, '', '', ''], | |
| ['Name:', self.name, '', '', ''], | |
| ['Description:', self.description, '', '', ''], | |
| ['Minimum Depth:', self.min_depth, '', '', ''], | |
| ['Clonality:', '[{},{}]'.format(self.c, self.C), '', '', ''], | |
| ['', '', '', '', ''], | |
| ['CONVERSION', 'COUNT', 'FREQUENCY', 'LOWER_CONF', 'UPPER_CONF']] | |
| # Base substitutions | |
| for base in dna_bases: | |
| n = self.sequenced[base][base] | |
| table.append(['{:,}\'s Sequenced:'.format(base), n, '', '', '']) | |
| for substitution in dna_bases: | |
| if substitution == base: | |
| continue | |
| count = self.sequenced[base][substitution] | |
| table.append(['{} to {}:'.format(base, substitution), | |
| count, | |
| count / n, | |
| *proportion_confint(count, n, method='jeffrey')]) | |
| total_substitutions += count | |
| table.append(['', '', '', '', '']) | |
| table.extend([ | |
| ['Nucleotides Sequenced', total_substitutions, '', '', ''], | |
| ['Point Substitutions', | |
| total_substitutions, | |
| total_substitutions / self.total, | |
| *proportion_confint(total_substitutions, self.total)]]) | |
| for insertion, count in sorted(self.insertions.items()): | |
| table.append(['Total Insertions', | |
| count, | |
| count / self.total, | |
| *proportion_confint(count, | |
| self.total, | |
| method='jeffrey')]) | |
| total_insertions += count | |
| for deletion, count in sorted(self.deletions.items()): | |
| table.append(['Total Deletions', | |
| count, | |
| count / self.total, | |
| *proportion_confint(count, | |
| self.total, | |
| method='jeffrey')]) | |
| total_deletions += count | |
| for name, total in zip(['Insertions', 'Deletions'], | |
| [total_insertions, total_deletions]): | |
| table.append([name, | |
| total, | |
| total / self.total, | |
| *proportion_confint(total, | |
| self.total, | |
| method='jeffrey')]) | |
| return table | |
| def _repr(self): | |
| from tabulate import tabulate | |
| return(tabulate( | |
| self._summary, | |
| stralign='right', | |
| numalign='right', | |
| tablefmt='plain')) | |
| def _repr_html(self): | |
| from tabulate import tabulate | |
| return(tabulate( | |
| self._summary, | |
| stralign='right', | |
| numalign='right', | |
| tablefmt='html')) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
line 83: regex = r'[+-]{}[acgtnACGTN]{{{}}}'.format(length)
gives me this error:
Traceback (most recent call last):
File "", line 1, in
IndexError: tuple index out of range
I am using python2.7.14