Skip to content

Instantly share code, notes, and snippets.

@clintval
Created March 7, 2018 23:13
Show Gist options
  • Select an option

  • Save clintval/7d8161be99d870992b8960132f62751a to your computer and use it in GitHub Desktop.

Select an option

Save clintval/7d8161be99d870992b8960132f62751a to your computer and use it in GitHub Desktop.
An untested Python pileup parser - do not use
import re
import csv
import textwrap
import matplotlib.pyplot as plt
from collections import Counter
class ReadError(LookupError):
"""Raise this when a property of a read is called for and no data exists"""
pass
class PileupError(LookupError):
"""Raise this when a Pileup file is corrupt or invalid"""
pass
class Read(object):
def __init__(self):
self._read_bases = []
@property
def strand(self):
if '.' in self._read_bases:
return '+'
elif ',' in self._read_bases:
return '-'
else:
raise ReadError('No sequence set')
@property
def sequence(self):
if self.strand == '+':
return ''.join(self._read_bases)
else:
return ''.join(self._read_bases[::-1])
@property
def read_length(self):
return len(self._read_bases)
def __repr__(self):
return('Strand: {}\nSequence:\n{}'.format(
self.strand,
textwrap.fill(self.sequence, width=80)))
class Pileup(object):
def __init__(self):
self.Ns_by_position = Counter()
self.indels_by_position = Counter()
self.mutations_by_position = Counter()
self.sequenced = {base: Counter() for base in dna_bases}
def read(self, infile, start=0, end=None):
reads = []
self.infile = infile
with open(self.infile) as pileup, open(self.outfile, 'w') as mutpos:
out = csv.writer(mutpos, delimiter='\t')
for i, line in enumerate(pileup):
# if i >= 2000: break
# Strip newlines split on tabs, then unpack and change type.
chrom, pos, ref, depth, read_base, _ = line.strip().split('\t')
pos, depth = int(pos), int(depth)
# If an end location is defined and we have reached it, break.
if self.end is not None and pos > self.end:
break
# Create new Read and append to list of current reads for all
# new reads starting at this location.
for m in re.finditer(r'\^.', read_base):
read_base = read_base.replace(m.group(), '', 1)
reads.append(Read())
# Loop through indel lengths existing at this position.
# For each of the indels at this position:
# 1. Record them and remove them from the read_base
# 2. Capitalize and check to see if previously found
indel_positions, inserts, deletes = [], [], []
for length in [int(s) for s in re.findall(r'\d+', read_base)]:
regex = r'[+-]{}[acgtnACGTN]{{{}}}'.format(length)
m = re.search(regex, read_base)
indel_positions.append(
m.start() - 1 - read_base[:m.start()].count('$'))
read_base = read_base.replace(m.group(), '', 1)
indel = m.group().upper()
if '+' in indel:
inserts.append(indel)
elif '-' in indel:
deletes.append(indel)
else:
raise PileupError
# The only double spaced grapheme remaining is the $-symbol
# which is placed after a read_base symbol to signal the end
# of a read. We must first find all of the $-symbols and their
# respective locations and then subtract cumulatively from
# their locations as we find more of them. These positions will
# then reflect the location of the read_base in the string
# after removal of the dollar symbols from the string.
closing = []
for i, match in enumerate(re.finditer(r'.\$', read_base)):
closing.append(match.start() - i)
# Redefine reads as those that do not need to be closed
# Get sequences from all reads to be closed
sequences = []
for read in reads:
if reads.index(read) in closing:
sequences.append(read.sequence)
reads = [r for r in reads if reads.index(r) not in closing]
# Continue to reconstruct each read sequence.
for i, base in enumerate(read_base.replace('$', '')):
# The asterisk is simply a deletion placeholder and
# shouldn't be counted.
if base is '*':
continue
elif i in indel_positions:
reads[i]._read_bases.append('X')
else:
reads[i]._read_bases.append(base)
# Filter out records that do not match filter criteria
if pos < self.start:
continue
# Count all times these bases are observed in the read_base
A, C, G, T, N = [read_base.upper().count(_) for _ in 'ACGTN']
# Correct depth at this location by subtracting Ns
depth = 1 if depth - N == 0 else depth - N
# Tally all sums, nucleotides sequenced, and update Counters.
for base, amount in zip(dna_bases, (A, C, G, T)):
self.sequenced[ref].update(base * amount)
self.sequenced[ref].update(ref * depth)
for i, base in enumerate(list(zip(*sequences))):
self.Ns_by_position.update([i + 1] * base.count('N'))
self.indels_by_position.update([i + 1] * base.count('X'))
self.mutations_by_position.update(
[i + 1] * sum([base.count(_) for _ in dna_bases]))
# Write position summary output to .mutpos file
out.writerow([chrom, ref, pos, depth, A, C, G, T, N,
':'.join(inserts), ':'.join(deletes)])
@property
def total_nucleotides(self):
return sum(self.sequenced[base][base] for base in dna_bases)
@property
def outfile(self):
if self.infile.endswith('.pileup'):
return self.infile.replace('.pileup', '.mutpos')
else:
return ''.join([self.infile, '.mutpos'])
def plot_statistics(self, title, **kwargs):
titles = ['Base Substitutions', 'Undeciphered (Ns)', 'Indels']
xlabels = ['', '', 'Position in Read']
counters = [self.mutations_by_position,
self.Ns_by_position,
self.indels_by_position,
self.depth_by_reference]
fig, axes = plt.subplots(3, figsize=(6, 8))
fig.tight_layout(h_pad=4)
plot_data = zip(titles, xlabels, axes, counters)
for title, xlabel, ax, counter in plot_data:
ax = plot_numeric_counter(counter,
ax=ax,
read_length=self.read_length,
num_reads=self.total,
title=title,
kwargs=kwargs)
ax.set_xlabel(xlabel)
title_on = kwargs.pop('title_on', True)
if self.id is not '' and title_on is True:
fig.suptitle(self.id,
y=1.04,
fontsize=kwargs.pop('title_fontize', 20))
return fig, axes
@property
def _summary(self):
from statsmodels.stats.proportion import proportion_confint
header_style = '<div style="text-align:left;"><b>{}<br></b></div>' # noqa
total_deletions = 0
total_insertions = 0
total_substitutions = 0
table = [
['ID:', self.id, '', '', ''],
['Name:', self.name, '', '', ''],
['Description:', self.description, '', '', ''],
['Minimum Depth:', self.min_depth, '', '', ''],
['Clonality:', '[{},{}]'.format(self.c, self.C), '', '', ''],
['', '', '', '', ''],
['CONVERSION', 'COUNT', 'FREQUENCY', 'LOWER_CONF', 'UPPER_CONF']]
# Base substitutions
for base in dna_bases:
n = self.sequenced[base][base]
table.append(['{:,}\'s Sequenced:'.format(base), n, '', '', ''])
for substitution in dna_bases:
if substitution == base:
continue
count = self.sequenced[base][substitution]
table.append(['{} to {}:'.format(base, substitution),
count,
count / n,
*proportion_confint(count, n, method='jeffrey')])
total_substitutions += count
table.append(['', '', '', '', ''])
table.extend([
['Nucleotides Sequenced', total_substitutions, '', '', ''],
['Point Substitutions',
total_substitutions,
total_substitutions / self.total,
*proportion_confint(total_substitutions, self.total)]])
for insertion, count in sorted(self.insertions.items()):
table.append(['Total Insertions',
count,
count / self.total,
*proportion_confint(count,
self.total,
method='jeffrey')])
total_insertions += count
for deletion, count in sorted(self.deletions.items()):
table.append(['Total Deletions',
count,
count / self.total,
*proportion_confint(count,
self.total,
method='jeffrey')])
total_deletions += count
for name, total in zip(['Insertions', 'Deletions'],
[total_insertions, total_deletions]):
table.append([name,
total,
total / self.total,
*proportion_confint(total,
self.total,
method='jeffrey')])
return table
def _repr(self):
from tabulate import tabulate
return(tabulate(
self._summary,
stralign='right',
numalign='right',
tablefmt='plain'))
def _repr_html(self):
from tabulate import tabulate
return(tabulate(
self._summary,
stralign='right',
numalign='right',
tablefmt='html'))
@Huanle
Copy link
Copy Markdown

Huanle commented May 26, 2018

line 83: regex = r'[+-]{}[acgtnACGTN]{{{}}}'.format(length)

gives me this error:

Traceback (most recent call last):
File "", line 1, in
IndexError: tuple index out of range

I am using python2.7.14

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment