Created
August 5, 2014 20:07
-
-
Save brevans/978f2d86b4788b3e3560 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from os import path, makedirs, remove, stat | |
import argparse | |
import re | |
from glob import iglob | |
from collections import defaultdict as dd | |
from Bio import SeqIO | |
import vcf | |
AMB = {'AC': 'M', 'AG': 'R', 'AT': 'W', 'CG': 'S', 'CT': 'Y', 'GT': 'K', | |
'CA': 'M', 'GA': 'R', 'TA': 'W', 'GC': 'S', 'TC': 'Y', 'TG': 'K', | |
'AA': 'A', 'CC': 'C', 'GG': 'G', 'TT': 'T'} | |
def m_dir(d): | |
try: | |
makedirs(d) | |
except OSError: | |
pass | |
def get_ref_dict(fa_name): | |
''' | |
in: the name of a fasta reference file | |
out: a dictionary of the sequences inside ['seqname']-> Bio.Seq object | |
''' | |
return (SeqIO.index(fa_name, "fasta")) | |
def get_sample_name(vcf_fi, comp_fi): | |
''' | |
in: vcf file (with path) and hapcompass file (with path) | |
if the sample name doesn't match raise an Exception | |
out: the sample name | |
''' | |
comp_name = path.basename(comp_fi) | |
vcf_name = path.basename(vcf_fi) | |
pref_match = re.match('(\S+)_out_\S+_solution\.txt$', comp_name) | |
prefix = pref_match.group(1) | |
if not vcf_name.startswith(prefix): | |
print("Your files {} and {} don't seem to match, exiting.".format( | |
vcf_name, comp_name)) | |
raise Exception("filenames don't match") | |
return prefix | |
def get_hap_info(fi): | |
''' | |
in: a filename of a haplotype block phasing file that was output | |
from HapCompass | |
out: a dict: chrom->bp->[phase1,phase2] and the phasing info not to be used | |
a list of all unused lines from the phasing file | |
a set of all loci that had > 1 blocks | |
''' | |
info = {} | |
blocks = set() | |
dupes = set() | |
unused_info = [] | |
capture = None | |
curr_block = None | |
''' | |
BLOCK 195 217 1 2 64.0 LG98 | |
VAR_POS_195 195 1 0 1 | |
VAR_POS_217 217 2 0 1 | |
''' | |
# read in block info | |
for l in open(fi): | |
if l == '\n': | |
continue | |
tmp = l.split() | |
if l.startswith('BLOCK'): | |
curr_block = tmp[6] | |
#if we encounter more than one block, | |
# we won't be using this phasing info | |
if curr_block in blocks: | |
if curr_block in info: | |
unused_info += info[curr_block]['original'] | |
del info[curr_block] | |
else: | |
unused_info.append(l) | |
dupes.add(curr_block) | |
capture = False | |
unused_info.append(l) | |
else: | |
blocks.add(curr_block) | |
info[curr_block] = {} | |
info[curr_block]['original'] = [l] | |
capture = True | |
elif capture == True: | |
info[curr_block][int(tmp[1])] = [int(tmp[3]), int(tmp[4])] | |
info[curr_block]['original'].append(l) | |
elif capture == False: | |
unused_info.append(l) | |
return info, unused_info | |
def make_vcf_dict(vcf_reader): | |
vcf_dict = dd(lambda: {}) | |
hets = dd(lambda: 0) | |
for v in vcf_reader: | |
vcf_dict[v.CHROM][v.POS] = {'ALLELES': [v.REF] + [x.sequence for x in v.ALT], | |
'IS_HET': v.samples[0].is_het, | |
'GT': [int(x) for x in v.samples[0].gt_alleles]} | |
if v.samples[0].is_het: | |
hets[v.CHROM] += 1 | |
return vcf_dict, hets | |
def clean_up(fhs): | |
for s in fhs: | |
names = [x.name for x in fhs[s]] | |
[x.close() for x in fhs[s]] | |
[remove(fi) for fi in names if stat(fi).st_size == 0] | |
def process_files(fa_file, vcf_dir, hap_dir, out_dir, verb): | |
m_dir(out_dir) | |
ref = get_ref_dict(fa_file) | |
# so I can write sequences to hdls[chrom][0] for phased, [1] for unphased | |
hdls = {x: [open(path.join(out_dir, x + '_ambi.txt'), 'w'), | |
open(path.join(out_dir, x + '_phased.txt'), 'w')] | |
for x in ref.keys()} | |
unusedfh = open(path.join(out_dir, 'unused_blocks.txt'), 'w') | |
for comp_fi, vcf_fi in zip(iglob(path.join(hap_dir, '*_solution.txt')), | |
iglob(path.join(vcf_dir, '*.vcf'))): | |
name = get_sample_name(vcf_fi, comp_fi) | |
hap_info, unused = get_hap_info(comp_fi) | |
pref = name + '\t' | |
if len(unused) > 0: | |
unusedfh.write(pref + pref.join(unused)) | |
vcf_reader = vcf.Reader(open(vcf_fi, 'r')) | |
#snps chrom->pos-> REF:'A', ALT:['T','C'] | |
snps, hetnum = make_vcf_dict(vcf_reader) | |
for chrom in ref.keys(): | |
if verb: | |
verb_pref = "Locus {} for individual {} : ".format(name, chrom) | |
phased = False | |
#ref is ZERO indexed | |
seqs = [list(ref[chrom]), list(ref[chrom])] | |
if chrom not in snps: | |
phased = True | |
if verb: | |
print(verb_pref + " no SNPs found in vcf. Reference sequence used.") | |
elif chrom in snps: | |
if hetnum[chrom] == 0: | |
#phased, fixed snps | |
phased = True | |
if verb: | |
print(verb_pref + " SNPs found, with no heterozygous sites. Phased but fixed genotypes.") | |
elif hetnum[chrom] == 1: | |
#phased, one het | |
phased = True | |
if verb: | |
print(verb_pref + " SNPs found, with one heterozygous site. Phasing is easy!") | |
elif hetnum[chrom] > 1: | |
if chrom in hap_info and len(hap_info[chrom]) == hetnum[chrom] + 1: | |
phased = True | |
if verb: | |
print(verb_pref + "phased in Hapcompass, all heterozygous sites accounted for.") | |
elif chrom in hap_info and len(hap_info[chrom]) != hetnum[chrom] + 1: | |
phased = False | |
if verb: | |
print(verb_pref + "phased in Hapcompass, but some sites unaccounted for. Output is ambiguous") | |
else: | |
phased = False | |
if verb: | |
print(verb_pref + "either HapCompass didn't phase or had multiple blocks for this locus. Output is ambiguous") | |
for i, bp in enumerate(seqs[0]): | |
#vcf is ONE indexed | |
vi = i + 1 | |
if vi in snps[chrom]: | |
if phased: | |
if snps[chrom][vi]['IS_HET']: | |
if chrom in hap_info: | |
bp1, bp2 = hap_info[chrom][vi] | |
else: | |
bp1, bp2 = snps[chrom][vi]['GT'] | |
seqs[0][i] = snps[chrom][vi]['ALLELES'][bp1] | |
seqs[1][i] = snps[chrom][vi]['ALLELES'][bp2] | |
else: | |
bp1, bp2 = snps[chrom][vi]['GT'] | |
seqs[0][i] = snps[chrom][vi]['ALLELES'][bp1] | |
seqs[1][i] = snps[chrom][vi]['ALLELES'][bp2] | |
else: | |
snp = AMB[''.join([snps[chrom][vi]['ALLELES'][x] | |
for x in snps[chrom][vi]['GT']])] | |
seqs[0][i] = snp | |
if phased: | |
seqa = ''.join(seqs[0]) | |
seqb = ''.join(seqs[1]) | |
hdls[chrom][phased].write('>' + name + 'a\n' + seqa + '\n') | |
hdls[chrom][phased].write('>' + name + 'b\n' + seqb + '\n') | |
elif not phased: | |
seq = ''.join(seqs[0]) | |
hdls[chrom][phased].write('>' + name + '\n' + seq + '\n') | |
phased = None | |
clean_up(hdls) | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser( | |
description=("Given a reference fasta file and a collection " | |
"of paired vcf/HapCompass output files, generate " | |
"haplotype sequences for each individual grouped " | |
"by locus/chromosome")) | |
parser.add_argument("-r", "--ref", help=("The reference fasta file used in " | |
"generating the vcf files"), required=True) | |
parser.add_argument("-v", "--vcf", help=("The directory of vcfs used to " | |
"generate the HapCompass output file. ONE vcf per sample. " | |
"Default is current directory."), | |
default='.') | |
parser.add_argument("-c", "--hapcompass", help=("The directory containing " | |
"your HapCompass output _solution.txt files. Default is" | |
"the current directory."), | |
default='.') | |
parser.add_argument("-o", "--out", help=("The directory to store your results. " | |
"Deefault is ./out."), default='./out') | |
parser.add_argument("--verbose", help="Print extra info", default=False, action='store_true') | |
args = parser.parse_args() | |
fa_file = path.abspath(args.ref) | |
vcf_dir = path.abspath(args.vcf) | |
hap_dir = path.abspath(args.hapcompass) | |
out_dir = path.abspath(args.out) | |
verb = args.verbose | |
process_files(fa_file, vcf_dir, hap_dir, out_dir, verb) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment