Created
December 29, 2018 03:43
-
-
Save malonge/a05da7f99e651ea55e8a4e57a5c4aab1 to your computer and use it in GitHub Desktop.
Convert a SAM file to a MUMmer delta file
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import argparse | |
from collections import defaultdict | |
""" | |
This utility converts a SAM file to a nucmer delta file. | |
SAM files must contain an NM tag, which is default in minimap2 alignments. | |
""" | |
class SAMAlignment: | |
def __init__(self, in_ref_header, in_query_header, in_ref_start, in_cigar, in_flag, in_seq_len, in_num_mismatches): | |
self.num_mismatches = in_num_mismatches | |
self.seq_len = in_seq_len | |
self.flag = int(in_flag) | |
self.is_rc = False | |
# Check if the query was reverse complemented | |
if self.flag & 16: | |
self.is_rc = True | |
self.cigar = in_cigar | |
self.parsed_cigar = [] | |
self._parse_cigar() | |
if self.is_rc: | |
self.parsed_cigar = self.parsed_cigar[::-1] | |
self.ref_header = in_ref_header | |
self.query_header = in_query_header | |
self.ref_start = int(in_ref_start) | |
self.ref_end = None | |
self.query_start = None | |
self.query_end = None | |
self.query_aln_len = 0 | |
self.ref_aln_len = 0 | |
num_hard_clipped = 0 | |
for i in self.parsed_cigar: | |
if i[-1] == 'H': | |
num_hard_clipped += int(i[:-1]) | |
if self.seq_len == 1: | |
self.seq_len = 0 | |
for i in self.parsed_cigar: | |
if i[-1] == 'M' or i[-1] == 'I' or i[-1] == 'S' or i[-1] == 'X' or i[-1] == '=': | |
self.seq_len += int(i[:-1]) | |
self.query_len = self.seq_len + num_hard_clipped | |
self._get_query_start() | |
self._get_query_aln_len() | |
self._get_ref_aln_len() | |
self._get_query_end() | |
self._get_ref_end() | |
if self.is_rc: | |
self.query_start, self.query_end = self.query_end, self.query_start | |
# Lazily taking care of off-by-one errors | |
self.query_end += 1 | |
self.query_start -= 1 | |
self.query_end -= 1 | |
self.ref_end -= 1 | |
def __str__(self): | |
return ' '.join([self.ref_header, self.query_header]) | |
def __repr__(self): | |
return '<' + ' '.join([self.ref_header, self.query_header]) + '>' | |
def _get_query_start(self): | |
""" | |
An alignment will either start with soft clipping, hard clipping, or with one of the alignment | |
codes (e.g. Match/Mismatch (M) or InDel (I/D)). If hard or soft clipped, the start of the alignment is | |
the very next base after clipping. If the alignment has commenced, then the start of the alignment is 1. | |
Notes: | |
At least for now, I am only specifying this for minimap2 output, which I believe only has the following | |
characters in cigar strings: | |
{'I', 'S', 'M', 'D', 'H'} | |
so I will only handle these cases. | |
""" | |
if self.parsed_cigar[0][-1] == 'I' or self.parsed_cigar[0][-1] == 'D' or self.parsed_cigar[0][-1] == 'M': | |
self.query_start = 1 | |
else: | |
# The alignment has started with either soft or hard clipping | |
# Need to double check if a 1 needs to be added here | |
self.query_start = int(self.parsed_cigar[0][:-1]) + 1 | |
def _get_query_aln_len(self): | |
""" Just the addition of all cigar codes which "consume" the query (M, I)""" | |
for i in self.parsed_cigar: | |
if i[-1] == 'M' or i[-1] == 'I': | |
self.query_aln_len += int(i[:-1]) | |
def _get_ref_aln_len(self): | |
for i in self.parsed_cigar: | |
if i[-1] == 'M' or i[-1] == 'D': | |
self.ref_aln_len += int(i[:-1]) | |
def _get_query_end(self): | |
""" This is the length of the alignment + query start """ | |
self.query_end = self.query_start + self.query_aln_len | |
def _get_ref_end(self): | |
""" This is the length of the alignment + query start """ | |
self.ref_end = self.ref_start + self.ref_aln_len | |
def _parse_cigar(self): | |
cigar_chars = { | |
'M', | |
'I', | |
'D', | |
'N', | |
'S', | |
'H', | |
'P', | |
'=', | |
'X' | |
} | |
this_field = '' | |
for char in self.cigar: | |
this_field += char | |
if char in cigar_chars: | |
self.parsed_cigar.append(this_field) | |
this_field = '' | |
def write_delta(in_alns, in_file_name): | |
with open(in_file_name, 'w') as f: | |
f.write('file1 file2\n') | |
f.write('NUCMER\n') | |
for aln in in_alns.keys(): | |
query_len = in_alns[aln][0].query_len | |
#print (query_len) | |
f.write('>%s %s %r %r\n' % (aln[0], aln[1], ref_chr_lens[aln[0]], query_len)) | |
for i in in_alns[aln]: | |
f.write('%r %r %r %r %r %r %r\n' % ( | |
i.ref_start, | |
i.ref_end, | |
i.query_start, | |
i.query_end, | |
i.num_mismatches, | |
i.num_mismatches, | |
0 | |
)) | |
# Continue with the cigar string | |
offsets = [] | |
cigar = i.parsed_cigar | |
if cigar[0][-1] == 'S' or cigar[0][-1] == 'H': | |
cigar = cigar[1:-1] | |
else: | |
cigar = cigar[:-1] | |
counter = 1 | |
for code in cigar: | |
if code[-1] == 'M': | |
counter += int(code[:-1]) | |
elif code[-1] == 'D': | |
offsets.append(counter) | |
num_I = int(code[:-1]) | |
for i in range(1, num_I): | |
offsets.append(1) | |
counter = 1 | |
elif code[-1] == 'I': | |
offsets.append(-1*counter) | |
num_I = int(code[:-1]) | |
for i in range(1, num_I): | |
offsets.append(-1) | |
counter = 1 | |
else: | |
raise ValueError('Unexpected CIGAR code') | |
offsets.append(0) | |
offsets = [str(i) for i in offsets] | |
f.write('\n'.join(offsets) + '\n') | |
parser = argparse.ArgumentParser(description='Convert a SAM file to a nucmer delta file.') | |
parser.add_argument("sam_file", metavar="<alns.sam>", type=str, help="SAM file to convert") | |
args = parser.parse_args() | |
sam_file = args.sam_file | |
# Make a dictionary storing all alignments between and query and a reference sequence. | |
# key = (reference_header, query_header) | |
# value = list of alignments between those sequences. only store info needed for the delta file | |
alns = defaultdict(list) | |
# Read through the sam file | |
ref_chr_lens = dict() | |
with open(sam_file) as f: | |
for line in f: | |
# Skip SAM headers | |
if line.startswith('@SQ'): | |
header_list = line.split('\t') | |
chrom = header_list[1].replace('SN:', '') | |
chr_len = int(header_list[2].replace('LN:', '')) | |
ref_chr_lens[chrom] = chr_len | |
continue | |
if line.startswith('@'): | |
continue | |
fields = line.split('\t') | |
ref_header = fields[2] | |
query_header = fields[0] | |
ref_start = fields[3] | |
cigar = fields[5] | |
flag = fields[1] | |
seq_len = len(fields[9]) | |
if not cigar == '*': | |
# Get the NM flag | |
nm = None | |
for i in fields: | |
if i.startswith('NM:i'): | |
nm = int(i[5:]) | |
continue | |
if nm is None: | |
raise ValueError('SAM file must include NM tag.') | |
x = SAMAlignment( | |
ref_header, | |
query_header, | |
ref_start, | |
cigar, | |
flag, | |
seq_len, | |
nm | |
) | |
alns[(ref_header, query_header)].append(x) | |
write_delta(alns, sam_file + '.delta') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment