Created
November 29, 2012 17:25
-
-
Save radaniba/4170569 to your computer and use it in GitHub Desktop.
Object-oriented approach to handle very large (i.e., genomic) multi-sequence fasta files.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Object-oriented approach to handle very large (i.e., genomic) multi-sequence fasta files. | |
LFastaSeq is the central class of this module providing nucleotide-based access to individual sequence entries as if those entries were separate objects in memory. | |
Example for use in a command line pipe: varlist_generator | gedit <seq_file> > out_file | |
This writes a new sequence to <out_file> with nucleotide changes incorporated as specified by the list of variations <varlist>.""" | |
import sys | |
import IOhelper | |
def parse (ifn=None): | |
"""Builds a dictionary of LFastaSeq objects from a multisequence fasta file.""" | |
if not ifn: | |
ifn = IOhelper.getvar_c1stdin('Name of the sequence file?', 'No sequence input file. Terminating ...') | |
ifo = open(ifn, 'rb') | |
seqs = {} | |
while True: | |
# this function has to use readline() instead of the more convenient file iterator | |
# because file.next() uses a hidden read-ahead buffer that affects the position in the file | |
line = ifo.readline() | |
if not line: break | |
if line.startswith('>'): | |
seqname = extract_seqname(line) | |
if seqname in seqs: | |
print >> sys.stderr, "duplicate sequence identifier: %s" % newkey | |
raise KeyError | |
else: | |
s_start = ifo.tell() | |
h_start = s_start-len(line) | |
nextline = IOhelper.previewline(ifo) # read-ahead one line to check sequence | |
if nextline.startswith(">") or not nextline: | |
print >> sys.stderr, "empty sequence: %s" % newkey | |
raise ValueError | |
else: | |
len_of_entry=0 | |
llen = len(nextline.strip()) | |
seqs[seqname]=LFastaSeq(seqfile=ifo, hdstart=h_start, seqstart=s_start, llen=llen, nlen=0) | |
else: | |
line = line.strip() | |
if not len(line) == llen: | |
nextline=IOhelper.previewline(ifo) | |
if nextline and not nextline.startswith('>'): | |
print >> sys.stderr, "need constant line length within sequence entry. Terminating ..." | |
raise ValueError | |
seqs[seqname].nlen += len(line) | |
return seqs | |
def extract_seqname(fasta_header): | |
"""Function to derive a sequence name from a fasta header. | |
The current, very simple implementation is based on the following logic: | |
- use everything left of the first whitespace as seqname | |
- if there is no whitespace in the header, use the whole line | |
- in any case remove the leading '>' and terminal whitespace (e.g. 'newline')""" | |
return fasta_header.split()[0][1:].strip() | |
class LFastaSeq(object): | |
"""Stores all information necessary to find and format an individual sequence in a large multi-fasta file. | |
Provides methods to move within and read from specified sequences within the file | |
as if those sequences were opened as files themselves, but without ever loading a complete | |
entry into memory. | |
Attributes: | |
seqfile = the associated file object that holds the entry | |
hdstart = byte position in the associated file object of the start of the sequence header | |
seqstart = byte position of the first nucleotide of the sequence | |
llen = line length used in the sequence entry | |
cnpos = the current position in nucleotides within the virtual sequence file | |
cbpos = the current position in bytes within the virtual sequence file | |
nlen = length of the sequence in nucleotides | |
""" | |
def __init__(self, seqfile, hdstart, seqstart, llen, nlen, cnpos=None): | |
self.seqfile = seqfile | |
self.hdstart = hdstart | |
self.seqstart = seqstart | |
self.llen = llen | |
self.nlen = nlen | |
if not cnpos: | |
cnpos = 1 | |
self.cnpos = cnpos | |
self.cbpos = 0 | |
self.seek (cnpos, 0) # make cbpos match cnpos | |
def __str__ (self): | |
return "Sequence %s in file %s" % (self.getname(), self.seqfile.name) | |
def seek (self, pos, mode=0): | |
"""Mimicks a file object's seek() method, but with position specified in nucleotides. | |
Like file.seek() it is silent about positions beyond EOF, but raises an IOError with | |
effective negative positions.""" | |
pos = int(pos) | |
if mode == 1: | |
pos+=self.cnpos | |
elif mode == 2: | |
pos+=self.nlen+1 | |
if pos < 1: | |
raise IOError | |
self.seqfile.seek(self.seqstart, 0) # positioning at the start of the sequence entry | |
if pos-1 > self.nlen: # determines how far we have to advance from start to reach pos-1 or end of sequence | |
rlength = self.nlen | |
else: | |
rlength = pos-1 | |
lspace = self.llen - ((self.cnpos-1) % self.llen) | |
if rlength >= lspace: | |
rlength-= lspace # we're reading this far beyond the end of the current line | |
q, r = divmod(rlength, self.llen) | |
for i in range(q+1): | |
dummy=self.seqfile.readline().strip() # polymorphism that works with string and LineBuffer | |
rlength = r | |
dummy=self.seqfile.read(rlength) | |
self.cnpos=pos | |
self.cbpos=self.seqfile.tell() | |
def tell (self): | |
"""Returns position in nucleotides.""" | |
return self.cnpos | |
def read (self, length=None, lbuffer=''): | |
"""Mimicks a file object's read() method. | |
As an optional parameter a LineBuffer object that processes the output can be specified, | |
thus, keeping memory requirements to a minimum. Returns EOF at and beyond end of a sequence, | |
thus preventing read-through into the subsequent seq entry.""" | |
if not isinstance(length, int) and not length == None: # to mimick file.read() behavior on non-integer value | |
raise TypeError | |
if self.cnpos > self.nlen: | |
return '' | |
if not self.cbpos == self.seqfile.tell(): # make actual file position match the user's expectation if necessary | |
self.seek(self.cnpos) | |
output = lbuffer | |
if length == None or length < 0 or self.cnpos-1+length > self.nlen: | |
length=self.nlen-(self.cnpos-1) | |
rlength = length | |
lspace = self.llen - ((self.cnpos-1) % self.llen) | |
if rlength >= lspace: | |
rlength-= lspace # we're reading this far beyond the end of the current line | |
q, r = divmod(rlength, self.llen) | |
for i in range(q+1): | |
output+=self.seqfile.readline().strip() # polymorphism that works with string and LineBuffer | |
rlength = r | |
output += self.seqfile.read(rlength) | |
self.cbpos = self.seqfile.tell() | |
self.cnpos += length | |
if not lbuffer: | |
return output | |
def getheader(self): | |
"""Returns the fasta header line of a sequence.""" | |
self.seqfile.seek(self.hdstart) | |
return self.seqfile.readline().strip() | |
def getname(self): | |
"""Returns a name for the sequence. | |
Currently, just everything left of the first whitespace in the header line.""" | |
return self.getheader().split()[0][1:].strip() | |
class Variation (): | |
"""Stores information about a sequence variation.""" | |
def __init__ (self, parent_sequence, pos, refn, altn): | |
self.pseq = parent_sequence # the sequence that the variation belongs to | |
self.pos = pos # the variation's (start) position | |
self.refn = refn # the reference nucleotide(s) | |
self.altn = altn # the altered nucleotide(s) | |
def getpos (self): | |
"""Returns the position of the variation. | |
Useful for sorting lists of variations because it can be specified | |
as the key function in sorted and list.sort""" | |
return self.pos | |
def __str__ (self): | |
"""Report info about the variation.""" | |
return "%s -> %s Variation at %d in sequence %s" % (self.refn, self.altn, self.pos, self.pseq) | |
def reflen (self): | |
"""Returns the number of nucleotides on the reference sequence that are affected by the variation.""" | |
return len(self.refn) | |
def gedit (): | |
"""A function that substitutes parts of a sequence according to a list of variations. | |
The variations can be SNPs, but also insertions or deletions of any size. | |
The function uses LargeFasta.parse() to generate a dictionary of LFasta instances to | |
provide file-like access to individual sequence entries.""" | |
gdict=parse() # indexes a sequence file and returns a dictionary of LFasta objects | |
# +++++++++++++++++++++++++ process the variation list from stdin ++++++++++++++++++++++++++++++++++ | |
# | |
# The expected format of the variation list is | |
# SEQ POS REFN ALTN (with columns separated by whitespace of any sort and number). | |
# where SEQ must match a sequence name in gdict above. | |
# The list can be unsorted, since the script will take care of this. | |
# Parses the variation list and stores the information in a dictionary with the same keys as the sequence dictionary. | |
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ | |
vardict = {} | |
for key in gdict.keys(): | |
vardict[key]=[] | |
while True: | |
var = sys.stdin.readline().strip() | |
if not var: break | |
varinfo = var.split() | |
if not varinfo[0] in vardict: | |
print >> sys.stderr, "Variation information does not match genome index: %s" % varinfo[0] | |
else: | |
vardict[varinfo[0]].append(Variation(varinfo[0], int(varinfo[1]), varinfo[2], varinfo[3])) | |
# sort the variation lists of all chromosomes in place | |
for key in vardict.keys(): | |
vardict[key].sort(key=lambda entry: entry.getpos()) | |
#+++++++++++++++++ generating output ++++++++++++++++++++++ | |
output=sys.stdout # write to stdout | |
for key in sorted(gdict.keys(), key=lambda entry: gdict[entry].hdstart): # process keys in the original file's order to make output file look like input | |
print >> sys.stderr, "writing %s" % key | |
lbuffer=IOhelper.LineBuffer('', output, gdict[key].llen) # a line-based buffer that makes sure we write lines with llen length to output | |
# this does automatic re-wrapping of lines in cases of insertions or deletions | |
output.write(gdict[key].getheader()+"\n") # echo the header line | |
for var in vardict[key]: | |
gdict[key].read(var.getpos()-gdict[key].tell(), lbuffer) # read everything up to but not including the next Variation directly into the LineBuffer object | |
test = gdict[key].read(var.reflen()) # read the Variation's whole ref sequence | |
if not test.upper() == var.refn.upper(): # a simple test that the Variation has something to do with the ref sequence | |
print >> sys.stderr, "\nVariation does not agree with sequence file @: %d in %s" % (var.getpos(), var.pseq) | |
raise ValueError | |
lbuffer.write(test.isupper() and var.altn.upper() or var.altn.lower()) # write the Variation, but preserve case at the position | |
gdict[key].read(-1, lbuffer) # after the last Variation, still write the rest of the current sequence | |
lbuffer.flush_all() # make sure everything's written to output | |
# bye bye | |
gdict[gdict.keys()[0]].seqfile.close() # close the sequence file object referenced by any LFasta object's 'file' entry | |
if __name__ == "__main__": | |
# print the last twenty nucleotides for all the sequences in the specified file | |
seqdict=parse() | |
for key in seqdict.keys(): | |
print seqdict[key].getname() | |
seqdict[key].seek(-20,2) | |
print seqdict[key].read(-1) | |
seqdict[seqdict.keys()[0]].seqfile.close() | |
########################################### | |
# and this is the relevant part of the IOhelper module | |
def getvar_c1stdin (message, error): | |
"""Accesory function that tries to get a filename. | |
Tries command line first, then stdin""" | |
try: | |
gfilename=sys.argv[1] | |
except: | |
print >> sys.stderr, message | |
gfilename=sys.stdin.readline().strip() | |
if not gfilename: | |
print >> sys.stderr, error | |
sys.exit() | |
return gfilename | |
def previewline (fo): | |
"""Sneak preview of the next line of a file, without moving in it.""" | |
pos = fo.tell() | |
nextline = fo.readline() | |
fo.seek(pos) | |
return nextline | |
class LineBuffer (object): | |
"""Buffers strings up to a defined line length before writing to a specified output. | |
Flushes a line of characters + an extra 'newline' to the predefined file handle when filled. | |
The class is prepared for use in 'with' expressions, where on exit it will automatically flush | |
its content. It also supports iteration over and searches in its string content, and can be | |
used in string concatenation. | |
During normal operation an instance of the class will check for overflow whenever one of its values changes | |
(either through direct assignment or through a call to write(). flush() and flush_all() are used for flushing | |
the buffer contents, but differ slightly in their behavior: | |
flush() writes out all complete lines to the output handle. If the buffer contains no complete lines at the | |
time of the call, an incomplete line is written and terminated with 'newline'. | |
flush_all() always writes the whole content of the buffer to the output handle using line wrapping and terminating | |
the last line with a 'newline'. Under normal conditions the effect of external calls to flush() and flush_all() | |
should be identical, but flush_all() is the recommended function to ensure that the buffer is fully written | |
to the output before closing the handle.""" | |
def __init__ (self, init_str, output_handle, line_length): | |
# direct assignment to the object dictionary is necessary to avoid calling __setattr__ and check_overflow at this stage | |
self.__dict__["content"] = '' | |
self.__dict__["oh"] = output_handle | |
self.__dict__["ll"] = line_length | |
self.content = init_str # now it's safe and desired to call check_overflow | |
def __enter__(self): | |
return self | |
def __exit__(self, type, value, traceback): | |
self.flush_all() | |
def __len__(self): | |
return len(self.content) | |
def __contains__(self, item): | |
return item in self.content | |
def __iter__(self): | |
return iter(self.content) | |
def __reversed__(self): | |
return reversed(self.content) | |
def __setattr__(self, name, value): | |
super(LineBuffer, self).__setattr__(name, value) | |
self.check_overflow() | |
def __add__(self, other): | |
self.content+=other | |
return self | |
def __radd__(self, other): | |
return other+self.content | |
def write (self, string): | |
self.content+=string # check_overflow will be called through __setattr__() | |
def check_overflow(self): | |
while len(self)>=self.ll: self.flush() | |
def flush (self): | |
l = len(self)>self.ll and self.ll or len(self) | |
output=self.content[:l] | |
self.oh.write(output+'\n') | |
self.content=self.content[l:] | |
def flush_all (self): | |
while not self.content == '': | |
self.flush() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment