Created
May 1, 2019 23:54
-
-
Save andrewparkermorgan/f96619fe7e103417ab4666c446d5586a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python3 | |
""" | |
interpolate_cM.py | |
Annotate a VCF file with genetic position of sites by (linear) interpolation on a user-supplied genetic map. | |
""" | |
import os | |
import sys | |
import time | |
import argparse | |
import logging | |
import numpy as np | |
from collections import OrderedDict, defaultdict | |
from cyvcf2 import VCF | |
parser = argparse.ArgumentParser(description = "Annotate a VCF file with genetic position of sites.") | |
parser.add_argument("-i","--infile", | |
default = "-", | |
help = "VCF file with positions to annotate [default: stdin]") | |
parser.add_argument("-m","--map", type = argparse.FileType("rU"), | |
required = True, | |
help = "PLINK-format genetic map file") | |
parser.add_argument("-o","--offset", type = float, | |
default = 0.0, | |
help = "arbitrary offset to add to start of genetic map on each chromosome [default: %(default)f]") | |
args = parser.parse_args() | |
## set up log trace | |
logging.basicConfig(level = logging.DEBUG) | |
logging.StreamHandler(stream = sys.stderr) | |
logger = logging.getLogger() | |
## connect to VCF | |
logger.info("Annotating VCF file at <{}>".format(args.infile)) | |
vcf = VCF(args.infile) | |
def is_monotone_increasing(x, strict = False): | |
if not strict: | |
return np.all(x[1:] >= x[:-1]) | |
else: | |
return np.all(x[1:] > x[:-1]) | |
def read_map_line(line): | |
chrom, marker, cm, bp = line.strip().split() | |
cm = float(cm) | |
bp = int(bp) | |
return chrom, marker, cm, bp | |
def read_map_file(infile, offset = 0.0): | |
themap = OrderedDict() | |
for line in infile: | |
chrom, marker, cm, bp = read_map_line(line) | |
if not chrom in themap: | |
themap[chrom] = [ (0,0) ] | |
themap[chrom].append( (cm + offset, bp) ) | |
for chrom in themap: | |
cms, bps = zip(*themap[chrom]) | |
cms = np.array(cms, dtype = np.float) | |
bps = np.array(bps, dtype = np.int) | |
o = np.argsort(bps) | |
cms = cms[o] | |
if not is_monotone_increasing(cms) or not is_monotone_increasing(bps, True): | |
raise ValueErorr("Provided genetic map is not strictly increasing with respect to physical map.") | |
themap[chrom] = (bps, cms) | |
return themap | |
## read in genetic map | |
logger.info("Reading reference genetic map from <{}>".format(args.map.name)) | |
logger.info("Adding {} cM offset to start of each contig to keep all genetic positions >= 0.0 cM".format(args.offset)) | |
refmap = read_map_file(args.map, args.offset) | |
for chrom,(bps,cms) in refmap.items(): | |
logger.info("\t ... {}: {} cM ({} markers; last @ {} bp)".format(chrom, cms[-1], bps.shape[0], bps[-1])) | |
## decorate VCF header with some metadata, then write it | |
vcf.add_info_to_header({ "ID": "CM", "Number": 1, "Type": "Float", "Description": "Genetic position interpolated from reference map" }) | |
cmd_line = "##interpolate_cM.py {} @ {}".format( " ".join(sys.argv[1:]), time.ctime(time.time()) ) | |
header = str(vcf.raw_header).strip().split("\n") | |
header.insert(len(header)-1, cmd_line) | |
print("\n".join(header)) | |
## main loop | |
logger.info("Processing sites in VCF file ...") | |
nsites = 0 | |
nundef = 0 | |
skipped_contigs = defaultdict(int) | |
for site in vcf: | |
nsites += 1 | |
if site.CHROM not in refmap and site.CHROM not in skipped_contigs: | |
logger.warning("Contig {} not in genetic map; skipping any sites there ...") | |
nundef += 1 | |
skipped_contigs[site.CHROM] += 1 | |
## find index of marker to RIGHT of this site | |
bps, cms = refmap[site.CHROM] | |
ipos = np.searchsorted(bps, site.POS) | |
nmar = bps.shape[0] | |
#logger.debug(cms) | |
#logger.debug(bps) | |
#logger.debug((site.POS, ipos, nmar)) | |
if ipos >= nmar: | |
## marker is off the RIGHT end reference map; back it up one and interpolate with last good interval's rate | |
ipos -= 1 | |
ivl_cm = np.diff(cms[ (ipos-1):(ipos+1) ]) | |
ivl_bp = np.diff(bps[ (ipos-1):(ipos+1) ]) | |
rate = ivl_cm/ivl_bp | |
bp_dist = site.POS - bps[ipos-1] | |
cm_dist = rate*bp_dist | |
cm_pos = cms[ipos-1] + cm_dist | |
#logger.debug((ivl_cm, ivl_bp, rate, cm_pos)) | |
site.INFO["CM"] = float(cm_pos) | |
sys.stdout.write(str(site)) | |
if not nsites % 1000: | |
logger.info("\t ... {} sites done ({} skipped) ...".format(nsites, nundef)) | |
logger.info("Processed {} sites.".format(nsites)) | |
if len(skipped_contigs): | |
logger.info("Summary of skipped sites:\n") | |
for chrom, nsites in skipped_contigs.items(): | |
logger.info("\t ... {}: {:9d} sites".format(chrom, nsites)) | |
else: | |
logger.info("No sites skipped due to contig not in reference map.") | |
logger.info("Done.\n") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment