Last active
August 29, 2015 14:01
-
-
Save mkyt/08b0cb23b2e68475cc6f to your computer and use it in GitHub Desktop.
change indentation "\t" -> " "
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from Bio import SeqIO | |
from Bio.SeqRecord import SeqRecord | |
from Bio.Seq import Seq | |
from Bio.Blast.Applications import NcbiblastpCommandline | |
from Bio.Blast import NCBIXML | |
import sys | |
if sys.hexversion >= 0x3000000: | |
from io import StringIO | |
else: | |
from StringIO import StringIO | |
import tempfile | |
import os.path | |
class AlignResult(object): | |
def __init__(self, bits, length, s1_s, s1_e, s2_s, s2_e, s1, s2, m, ident, gaps, score, frame): | |
self.bits = bits | |
self.length = length | |
self.s1_range = (s1_s-1, s1_e) # convert to 0-origin, half-closed range | |
self.s2_range = (s2_s-1, s2_e) | |
self.s1 = s1 | |
self.s2 = s2 | |
self.m = m | |
self.ident = ident | |
self.gaps = gaps | |
self.score = score | |
self.strand = frame | |
self.contradictions = [] | |
self.aligned = self._generate_aligned() | |
def __repr__(self): | |
range_stringify= lambda x: '{:d}..{:d}'.format(*x) | |
strand_stringify = lambda x: '/'.join('Plus' if e > 0 else 'Minus' for e in x) | |
return '<AlignResult [{}] <-> [{}] (score: {}, length: {}, ident: {}, gaps: {}, strand: {})>'.format(range_stringify(self.s1_range), range_stringify(self.s2_range), self.score, self.length, self.ident, self.gaps, strand_stringify(self.strand)) | |
def _generate_aligned(self): | |
ERR = ('N', '-') # "cannot be read" or gap | |
res = Seq('') | |
s1_pos = self.s1_range[0] | |
s2_pos = self.s2_range[0] | |
for i in range(self.length): | |
c1 = self.s1[i] | |
c2 = self.s2[i] | |
if c1 in ERR: | |
res += c2 | |
else: | |
res += c1 | |
if c2 not in ERR and c1 != c2: | |
self.contradictions.append(((s1_pos, c1), (s2_pos, c2))) # two reads contradicts | |
s1_pos += self.strand[0] | |
s2_pos += self.strand[1] | |
return res | |
def align_two(s1, s2): | |
s1_f = os.path.join(tempfile.gettempdir(), 's1.fasta') | |
s2_f = os.path.join(tempfile.gettempdir(), 's2.fasta') | |
SeqIO.write(SeqRecord(s1), open(s1_f, 'w+'), 'fasta') | |
SeqIO.write(SeqRecord(s2), open(s2_f, 'w+'), 'fasta') | |
output = NcbiblastpCommandline(cmd='blastn', query=s1_f, subject=s2_f, outfmt=5)()[0] | |
blast_result_record = NCBIXML.read(StringIO(output)) | |
try: | |
hsp = blast_result_record.alignments[0].hsps[0] | |
except IndexError: | |
# blast returned no result | |
return None | |
return AlignResult(hsp.bits, hsp.align_length, hsp.query_start, hsp.query_end, hsp.sbjct_start, hsp.sbjct_end, hsp.query, hsp.sbjct, hsp.match, hsp.identities, hsp.gaps, hsp.score, hsp.frame) | |
def evalQuality(s): | |
rate = 1.0 | |
res = 0 | |
for i in range(len(s)): | |
if s[i] == 'N': | |
rate *= 0.9 | |
else: | |
res += rate | |
return res | |
def concat_two(s1, s2, alignRes): | |
s1_s, s1_e = alignRes.s1_range | |
s1_after = len(s1) - s1_e # length after aligned region | |
s2_s, s2_e = alignRes.s2_range | |
res = Seq('') | |
rev = alignRes.strand[1] < 0 | |
src = [] | |
if rev: | |
s2 = s2.complement() | |
s2_s, s2_e = s2_e-1, s2_s+1 | |
ss1_before = evalQuality(s1[s1_s-1::-1]) if s1_s > 0 else 0 # s1:(s-1)..0 | |
ss1_after = evalQuality(s1[s1_e:]) # s1:e..last | |
ss2_before = evalQuality(s2[s2_s-1::-1]) if s2_s > 0 else 0 # s2:(s-1)..0 | |
ss2_after = evalQuality(s2[s2_e:]) #s2:e..last | |
if rev: | |
ss2_before, ss2_after = ss2_after, ss2_before | |
print('scores:{}:{} {}:{}'.format(ss1_before, ss1_after, ss2_before, ss2_after)) | |
beg = ss1_before > ss2_before # s1 is better at the beginning | |
ed = ss1_after > ss2_after # s1 is better at the end | |
if beg and ed: | |
# s1 contains s2 | |
res += s1[:] | |
src.append(('s1', len(res))) | |
elif not beg and not ed: | |
# s2 contains s1 | |
res += s2[::-1 if rev else 1] | |
src.append(('s2', len(res))) | |
elif beg and not ed: | |
# s1 -> s2 | |
res += s1[:s1_s] # s1:0..(s-1) | |
src.append(('s1', len(res))) | |
res += alignRes.aligned # s1:s..(e-1) == s2:s..(e-1) | |
src.append(('aligned', len(res))) | |
if not rev: | |
res += s2[s2_e:] # s2:e..last | |
else: | |
if s2_s > 0: | |
res += s2[s2_s-1::-1] # s2:(s-1)..0 | |
src.append(('s2', len(res))) | |
else: | |
# s2 -> s1 | |
if not rev: | |
res += s2[:s2_s] # s2:0..(s-1) | |
else: | |
res += s2[:s2_e-1:-1] # s2:last..e | |
src.append(('s2', len(res))) | |
res += alignRes.aligned # s1:s..(e-1) == s2:s..(e-1) or s2:(e-1)..s | |
src.append(('aligned', len(res))) | |
res += s1[s1_e:] # s1:e..last | |
src.append(('s1', len(res))) | |
print(src) | |
return res, src | |
def concatenate(seqs): | |
left = len(seqs) | |
consumed = set() | |
res = dict() | |
while left > 1: | |
mx_score, mx_pair = -1, (-1, -1) | |
l = len(seqs) | |
for i in range(l): | |
if i in consumed: | |
continue | |
for j in range(i+1, l): | |
if j in consumed: | |
continue | |
if (i, j) not in res: | |
res[(i, j)] = align_two(seqs[i], seqs[j]) | |
if res[(i, j)] is not None and res[(i, j)].score > mx_score: | |
mx_score = res[(i, j)].score | |
mx_pair = (i, j) | |
if mx_score < 0: | |
print("alignment failed before consuming all sequences supplied") | |
break | |
m1, m2 = mx_pair | |
print('merging seq #{:d} and #{:d} (score {}) to obtain seq #{:d}'.format(m1, m2, mx_score, len(seqs))) | |
print(res[mx_pair]) | |
new_seq, src = concat_two(seqs[m1], seqs[m2], res[mx_pair]) | |
consumed.add(m1) | |
consumed.add(m2) | |
print(new_seq) | |
contra = res[mx_pair].contradictions | |
# TODO: notify users of contradictions | |
seqs.append(new_seq) | |
left -= 1 | |
return seqs[-1] | |
def findORFs(seq): | |
table = 1 | |
min_pro_len = 200 | |
for strand, nuc in [(+1, seq), (-1, seq.reverse_complement())]: | |
for frame in range(3): | |
print(nuc[frame:].translate(table)) | |
for pro in nuc[frame:].translate(table).split("*"): | |
if len(pro) >= min_pro_len: | |
print("%s...%s - length %i, strand %i, frame %i" % (pro[:30], pro[-3:], len(pro), strand, frame)) | |
print() | |
print(pro) | |
def main(progname, args): | |
if len(args) < 3: | |
print('usage: {} [INPUT_FILE 1] [INPUT_FILE 2]...'.format(progname)) | |
return 1 | |
seqs = [next(SeqIO.parse(f, 'fasta')).seq for f in args] | |
res = concatenate(seqs) | |
print(res) | |
res2 = res.reverse_complement() | |
print(res2) | |
#findORFs(res2) | |
return 0 | |
if __name__ == '__main__': | |
sys.exit(main(sys.argv[0], sys.argv[1:])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment