Skip to content

Instantly share code, notes, and snippets.

@andrewparkermorgan
Created May 1, 2019 23:54
Show Gist options
  • Save andrewparkermorgan/f96619fe7e103417ab4666c446d5586a to your computer and use it in GitHub Desktop.
Save andrewparkermorgan/f96619fe7e103417ab4666c446d5586a to your computer and use it in GitHub Desktop.
#! /usr/bin/env python3
"""
interpolate_cM.py
Annotate a VCF file with genetic position of sites by (linear) interpolation on a user-supplied genetic map.
"""
import os
import sys
import time
import argparse
import logging
import numpy as np
from collections import OrderedDict, defaultdict
from cyvcf2 import VCF
parser = argparse.ArgumentParser(description = "Annotate a VCF file with genetic position of sites.")
parser.add_argument("-i","--infile",
default = "-",
help = "VCF file with positions to annotate [default: stdin]")
parser.add_argument("-m","--map", type = argparse.FileType("rU"),
required = True,
help = "PLINK-format genetic map file")
parser.add_argument("-o","--offset", type = float,
default = 0.0,
help = "arbitrary offset to add to start of genetic map on each chromosome [default: %(default)f]")
args = parser.parse_args()
## set up log trace
logging.basicConfig(level = logging.DEBUG)
logging.StreamHandler(stream = sys.stderr)
logger = logging.getLogger()
## connect to VCF
logger.info("Annotating VCF file at <{}>".format(args.infile))
vcf = VCF(args.infile)
def is_monotone_increasing(x, strict = False):
if not strict:
return np.all(x[1:] >= x[:-1])
else:
return np.all(x[1:] > x[:-1])
def read_map_line(line):
chrom, marker, cm, bp = line.strip().split()
cm = float(cm)
bp = int(bp)
return chrom, marker, cm, bp
def read_map_file(infile, offset = 0.0):
themap = OrderedDict()
for line in infile:
chrom, marker, cm, bp = read_map_line(line)
if not chrom in themap:
themap[chrom] = [ (0,0) ]
themap[chrom].append( (cm + offset, bp) )
for chrom in themap:
cms, bps = zip(*themap[chrom])
cms = np.array(cms, dtype = np.float)
bps = np.array(bps, dtype = np.int)
o = np.argsort(bps)
cms = cms[o]
if not is_monotone_increasing(cms) or not is_monotone_increasing(bps, True):
raise ValueErorr("Provided genetic map is not strictly increasing with respect to physical map.")
themap[chrom] = (bps, cms)
return themap
## read in genetic map
logger.info("Reading reference genetic map from <{}>".format(args.map.name))
logger.info("Adding {} cM offset to start of each contig to keep all genetic positions >= 0.0 cM".format(args.offset))
refmap = read_map_file(args.map, args.offset)
for chrom,(bps,cms) in refmap.items():
logger.info("\t ... {}: {} cM ({} markers; last @ {} bp)".format(chrom, cms[-1], bps.shape[0], bps[-1]))
## decorate VCF header with some metadata, then write it
vcf.add_info_to_header({ "ID": "CM", "Number": 1, "Type": "Float", "Description": "Genetic position interpolated from reference map" })
cmd_line = "##interpolate_cM.py {} @ {}".format( " ".join(sys.argv[1:]), time.ctime(time.time()) )
header = str(vcf.raw_header).strip().split("\n")
header.insert(len(header)-1, cmd_line)
print("\n".join(header))
## main loop
logger.info("Processing sites in VCF file ...")
nsites = 0
nundef = 0
skipped_contigs = defaultdict(int)
for site in vcf:
nsites += 1
if site.CHROM not in refmap and site.CHROM not in skipped_contigs:
logger.warning("Contig {} not in genetic map; skipping any sites there ...")
nundef += 1
skipped_contigs[site.CHROM] += 1
## find index of marker to RIGHT of this site
bps, cms = refmap[site.CHROM]
ipos = np.searchsorted(bps, site.POS)
nmar = bps.shape[0]
#logger.debug(cms)
#logger.debug(bps)
#logger.debug((site.POS, ipos, nmar))
if ipos >= nmar:
## marker is off the RIGHT end reference map; back it up one and interpolate with last good interval's rate
ipos -= 1
ivl_cm = np.diff(cms[ (ipos-1):(ipos+1) ])
ivl_bp = np.diff(bps[ (ipos-1):(ipos+1) ])
rate = ivl_cm/ivl_bp
bp_dist = site.POS - bps[ipos-1]
cm_dist = rate*bp_dist
cm_pos = cms[ipos-1] + cm_dist
#logger.debug((ivl_cm, ivl_bp, rate, cm_pos))
site.INFO["CM"] = float(cm_pos)
sys.stdout.write(str(site))
if not nsites % 1000:
logger.info("\t ... {} sites done ({} skipped) ...".format(nsites, nundef))
logger.info("Processed {} sites.".format(nsites))
if len(skipped_contigs):
logger.info("Summary of skipped sites:\n")
for chrom, nsites in skipped_contigs.items():
logger.info("\t ... {}: {:9d} sites".format(chrom, nsites))
else:
logger.info("No sites skipped due to contig not in reference map.")
logger.info("Done.\n")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment