Created
May 20, 2015 02:04
-
-
Save mdshw5/03f51f043ad09d464703 to your computer and use it in GitHub Desktop.
Simple pileup-like script
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import sys | |
import argparse | |
import pkg_resources | |
from collections import deque | |
from collections import Counter | |
try: | |
from collections import OrderedDict | |
except ImportError: #python 2.6 | |
from ordereddict import OrderedDict | |
class Extractor(OrderedDict): | |
def __missing__(self, pos): | |
value = (self.sam, deque(), deque()) | |
self[pos] = value | |
return value | |
def update(self, sam, min_qual): | |
self.sam = sam | |
g = sam.pos | |
for i, (seq, qual) in enumerate(zip(sam.gapped('seq', '-'), sam.gapped('qual', '~'))): | |
if qual > min_qual: | |
self[g + i][1].append(seq) | |
self[g + i][2].append(qual) | |
def clear(self, stop=None): | |
for pos in sorted(k for k in self.keys() if stop is not None and k < stop): | |
sam, seq, qual = self.pop(pos) | |
ref = sam.parse_md()[sam.index_of(pos)] | |
try: | |
s, q = zip(*[(s, q) for s, q in zip(seq, qual) if s != ref]) | |
except ValueError: | |
s, q = ([], []) | |
yield (sam.rname, str(pos), ref, str(len(seq) - len(s)), str(len(s)), ''.join(s), ''.join(q)) | |
def pileup(args): | |
from simplesam import Reader | |
e = Extractor() | |
bases = ('A', 'C', 'T', 'G', 'N', '-') | |
stats = dict([('A', Counter()), ('C', Counter()), ('T', Counter()), ('G', Counter()), ('N', Counter()), ('-', Counter())]) | |
with Reader(args.bam) as bam: | |
for read in bam: | |
if not read.mapped: | |
continue | |
if read.duplicate: | |
continue | |
if read.secondary: | |
continue | |
for line in e.clear(stop=read.pos): | |
if args.counts: | |
args.pileup.write('\t'.join(line) + '\t' + '\t'.join([str(line[5].count(c)) for c in bases]) + '\n') | |
else: | |
args.pileup.write('\t'.join(line) + '\n') | |
if args.stats: | |
stats[line[2]][line[2]] += int(line[3]) | |
stats[line[2]].update(line[5]) | |
e.update(read, '!') | |
for line in e.clear(): | |
if args.counts: | |
args.pileup.write('\t'.join(line) + '\t' + '\t'.join([str(line[5].count(c)) for c in bases]) + '\n') | |
else: | |
args.pileup.write('\t'.join(line) + '\n') | |
if args.stats: | |
stats[line[2]][line[2]] += int(line[3]) | |
stats[line[2]].update(line[5]) | |
if args.stats: | |
for ref, counts in sorted(stats.items()): | |
for base in bases: | |
ref_count = counts[ref] | |
base_count = counts[base] | |
try: | |
percent_base = base_count / sum(counts.values()) | |
except ZeroDivisionError: | |
percent_base = 0. | |
args.stats.write('{ref}\t{base}\t{ref_count}\t{base_count}\t{percent_base:.4%}\n'.format(**locals())) | |
def main(): | |
parser = argparse.ArgumentParser(prog='pileup', description="generate a simple pileup-like file from a sorted/indexed BAM file") | |
parser.add_argument('--version', action='version', version="%(prog)s version {0}".format(pkg_resources.get_distribution("gemstone").version)) | |
parser.add_argument('bam', type=argparse.FileType('r'), help="sorted/indexed BAM file ") | |
parser.add_argument('pileup', type=argparse.FileType('w'), help="pileup output file") | |
parser.add_argument('-c', '--counts', action='store_true', default=False, help="display counts for A/C/T/G/N/- separately (default: %(default)s)") | |
parser.add_argument('-i', '--stats', type=argparse.FileType('w'), help="tabulate mismatches to output file") | |
parser.set_defaults(func=pileup) | |
args = parser.parse_args() | |
args.func(args) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment