Skip to content

Instantly share code, notes, and snippets.

@radaniba
Created November 29, 2012 17:25
Show Gist options
  • Save radaniba/4170569 to your computer and use it in GitHub Desktop.
Save radaniba/4170569 to your computer and use it in GitHub Desktop.
Object-oriented approach to handle very large (i.e., genomic) multi-sequence fasta files.
"""Object-oriented approach to handle very large (i.e., genomic) multi-sequence fasta files.
LFastaSeq is the central class of this module providing nucleotide-based access to individual sequence entries as if those entries were separate objects in memory.
Example for use in a command line pipe: varlist_generator | gedit <seq_file> > out_file
This writes a new sequence to <out_file> with nucleotide changes incorporated as specified by the list of variations <varlist>."""
import sys
import IOhelper
def parse (ifn=None):
"""Builds a dictionary of LFastaSeq objects from a multisequence fasta file."""
if not ifn:
ifn = IOhelper.getvar_c1stdin('Name of the sequence file?', 'No sequence input file. Terminating ...')
ifo = open(ifn, 'rb')
seqs = {}
while True:
# this function has to use readline() instead of the more convenient file iterator
# because file.next() uses a hidden read-ahead buffer that affects the position in the file
line = ifo.readline()
if not line: break
if line.startswith('>'):
seqname = extract_seqname(line)
if seqname in seqs:
print >> sys.stderr, "duplicate sequence identifier: %s" % newkey
raise KeyError
else:
s_start = ifo.tell()
h_start = s_start-len(line)
nextline = IOhelper.previewline(ifo) # read-ahead one line to check sequence
if nextline.startswith(">") or not nextline:
print >> sys.stderr, "empty sequence: %s" % newkey
raise ValueError
else:
len_of_entry=0
llen = len(nextline.strip())
seqs[seqname]=LFastaSeq(seqfile=ifo, hdstart=h_start, seqstart=s_start, llen=llen, nlen=0)
else:
line = line.strip()
if not len(line) == llen:
nextline=IOhelper.previewline(ifo)
if nextline and not nextline.startswith('>'):
print >> sys.stderr, "need constant line length within sequence entry. Terminating ..."
raise ValueError
seqs[seqname].nlen += len(line)
return seqs
def extract_seqname(fasta_header):
"""Function to derive a sequence name from a fasta header.
The current, very simple implementation is based on the following logic:
- use everything left of the first whitespace as seqname
- if there is no whitespace in the header, use the whole line
- in any case remove the leading '>' and terminal whitespace (e.g. 'newline')"""
return fasta_header.split()[0][1:].strip()
class LFastaSeq(object):
"""Stores all information necessary to find and format an individual sequence in a large multi-fasta file.
Provides methods to move within and read from specified sequences within the file
as if those sequences were opened as files themselves, but without ever loading a complete
entry into memory.
Attributes:
seqfile = the associated file object that holds the entry
hdstart = byte position in the associated file object of the start of the sequence header
seqstart = byte position of the first nucleotide of the sequence
llen = line length used in the sequence entry
cnpos = the current position in nucleotides within the virtual sequence file
cbpos = the current position in bytes within the virtual sequence file
nlen = length of the sequence in nucleotides
"""
def __init__(self, seqfile, hdstart, seqstart, llen, nlen, cnpos=None):
self.seqfile = seqfile
self.hdstart = hdstart
self.seqstart = seqstart
self.llen = llen
self.nlen = nlen
if not cnpos:
cnpos = 1
self.cnpos = cnpos
self.cbpos = 0
self.seek (cnpos, 0) # make cbpos match cnpos
def __str__ (self):
return "Sequence %s in file %s" % (self.getname(), self.seqfile.name)
def seek (self, pos, mode=0):
"""Mimicks a file object's seek() method, but with position specified in nucleotides.
Like file.seek() it is silent about positions beyond EOF, but raises an IOError with
effective negative positions."""
pos = int(pos)
if mode == 1:
pos+=self.cnpos
elif mode == 2:
pos+=self.nlen+1
if pos < 1:
raise IOError
self.seqfile.seek(self.seqstart, 0) # positioning at the start of the sequence entry
if pos-1 > self.nlen: # determines how far we have to advance from start to reach pos-1 or end of sequence
rlength = self.nlen
else:
rlength = pos-1
lspace = self.llen - ((self.cnpos-1) % self.llen)
if rlength >= lspace:
rlength-= lspace # we're reading this far beyond the end of the current line
q, r = divmod(rlength, self.llen)
for i in range(q+1):
dummy=self.seqfile.readline().strip() # polymorphism that works with string and LineBuffer
rlength = r
dummy=self.seqfile.read(rlength)
self.cnpos=pos
self.cbpos=self.seqfile.tell()
def tell (self):
"""Returns position in nucleotides."""
return self.cnpos
def read (self, length=None, lbuffer=''):
"""Mimicks a file object's read() method.
As an optional parameter a LineBuffer object that processes the output can be specified,
thus, keeping memory requirements to a minimum. Returns EOF at and beyond end of a sequence,
thus preventing read-through into the subsequent seq entry."""
if not isinstance(length, int) and not length == None: # to mimick file.read() behavior on non-integer value
raise TypeError
if self.cnpos > self.nlen:
return ''
if not self.cbpos == self.seqfile.tell(): # make actual file position match the user's expectation if necessary
self.seek(self.cnpos)
output = lbuffer
if length == None or length < 0 or self.cnpos-1+length > self.nlen:
length=self.nlen-(self.cnpos-1)
rlength = length
lspace = self.llen - ((self.cnpos-1) % self.llen)
if rlength >= lspace:
rlength-= lspace # we're reading this far beyond the end of the current line
q, r = divmod(rlength, self.llen)
for i in range(q+1):
output+=self.seqfile.readline().strip() # polymorphism that works with string and LineBuffer
rlength = r
output += self.seqfile.read(rlength)
self.cbpos = self.seqfile.tell()
self.cnpos += length
if not lbuffer:
return output
def getheader(self):
"""Returns the fasta header line of a sequence."""
self.seqfile.seek(self.hdstart)
return self.seqfile.readline().strip()
def getname(self):
"""Returns a name for the sequence.
Currently, just everything left of the first whitespace in the header line."""
return self.getheader().split()[0][1:].strip()
class Variation ():
"""Stores information about a sequence variation."""
def __init__ (self, parent_sequence, pos, refn, altn):
self.pseq = parent_sequence # the sequence that the variation belongs to
self.pos = pos # the variation's (start) position
self.refn = refn # the reference nucleotide(s)
self.altn = altn # the altered nucleotide(s)
def getpos (self):
"""Returns the position of the variation.
Useful for sorting lists of variations because it can be specified
as the key function in sorted and list.sort"""
return self.pos
def __str__ (self):
"""Report info about the variation."""
return "%s -> %s Variation at %d in sequence %s" % (self.refn, self.altn, self.pos, self.pseq)
def reflen (self):
"""Returns the number of nucleotides on the reference sequence that are affected by the variation."""
return len(self.refn)
def gedit ():
"""A function that substitutes parts of a sequence according to a list of variations.
The variations can be SNPs, but also insertions or deletions of any size.
The function uses LargeFasta.parse() to generate a dictionary of LFasta instances to
provide file-like access to individual sequence entries."""
gdict=parse() # indexes a sequence file and returns a dictionary of LFasta objects
# +++++++++++++++++++++++++ process the variation list from stdin ++++++++++++++++++++++++++++++++++
#
# The expected format of the variation list is
# SEQ POS REFN ALTN (with columns separated by whitespace of any sort and number).
# where SEQ must match a sequence name in gdict above.
# The list can be unsorted, since the script will take care of this.
# Parses the variation list and stores the information in a dictionary with the same keys as the sequence dictionary.
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
vardict = {}
for key in gdict.keys():
vardict[key]=[]
while True:
var = sys.stdin.readline().strip()
if not var: break
varinfo = var.split()
if not varinfo[0] in vardict:
print >> sys.stderr, "Variation information does not match genome index: %s" % varinfo[0]
else:
vardict[varinfo[0]].append(Variation(varinfo[0], int(varinfo[1]), varinfo[2], varinfo[3]))
# sort the variation lists of all chromosomes in place
for key in vardict.keys():
vardict[key].sort(key=lambda entry: entry.getpos())
#+++++++++++++++++ generating output ++++++++++++++++++++++
output=sys.stdout # write to stdout
for key in sorted(gdict.keys(), key=lambda entry: gdict[entry].hdstart): # process keys in the original file's order to make output file look like input
print >> sys.stderr, "writing %s" % key
lbuffer=IOhelper.LineBuffer('', output, gdict[key].llen) # a line-based buffer that makes sure we write lines with llen length to output
# this does automatic re-wrapping of lines in cases of insertions or deletions
output.write(gdict[key].getheader()+"\n") # echo the header line
for var in vardict[key]:
gdict[key].read(var.getpos()-gdict[key].tell(), lbuffer) # read everything up to but not including the next Variation directly into the LineBuffer object
test = gdict[key].read(var.reflen()) # read the Variation's whole ref sequence
if not test.upper() == var.refn.upper(): # a simple test that the Variation has something to do with the ref sequence
print >> sys.stderr, "\nVariation does not agree with sequence file @: %d in %s" % (var.getpos(), var.pseq)
raise ValueError
lbuffer.write(test.isupper() and var.altn.upper() or var.altn.lower()) # write the Variation, but preserve case at the position
gdict[key].read(-1, lbuffer) # after the last Variation, still write the rest of the current sequence
lbuffer.flush_all() # make sure everything's written to output
# bye bye
gdict[gdict.keys()[0]].seqfile.close() # close the sequence file object referenced by any LFasta object's 'file' entry
if __name__ == "__main__":
# print the last twenty nucleotides for all the sequences in the specified file
seqdict=parse()
for key in seqdict.keys():
print seqdict[key].getname()
seqdict[key].seek(-20,2)
print seqdict[key].read(-1)
seqdict[seqdict.keys()[0]].seqfile.close()
###########################################
# and this is the relevant part of the IOhelper module
def getvar_c1stdin (message, error):
"""Accesory function that tries to get a filename.
Tries command line first, then stdin"""
try:
gfilename=sys.argv[1]
except:
print >> sys.stderr, message
gfilename=sys.stdin.readline().strip()
if not gfilename:
print >> sys.stderr, error
sys.exit()
return gfilename
def previewline (fo):
"""Sneak preview of the next line of a file, without moving in it."""
pos = fo.tell()
nextline = fo.readline()
fo.seek(pos)
return nextline
class LineBuffer (object):
"""Buffers strings up to a defined line length before writing to a specified output.
Flushes a line of characters + an extra 'newline' to the predefined file handle when filled.
The class is prepared for use in 'with' expressions, where on exit it will automatically flush
its content. It also supports iteration over and searches in its string content, and can be
used in string concatenation.
During normal operation an instance of the class will check for overflow whenever one of its values changes
(either through direct assignment or through a call to write(). flush() and flush_all() are used for flushing
the buffer contents, but differ slightly in their behavior:
flush() writes out all complete lines to the output handle. If the buffer contains no complete lines at the
time of the call, an incomplete line is written and terminated with 'newline'.
flush_all() always writes the whole content of the buffer to the output handle using line wrapping and terminating
the last line with a 'newline'. Under normal conditions the effect of external calls to flush() and flush_all()
should be identical, but flush_all() is the recommended function to ensure that the buffer is fully written
to the output before closing the handle."""
def __init__ (self, init_str, output_handle, line_length):
# direct assignment to the object dictionary is necessary to avoid calling __setattr__ and check_overflow at this stage
self.__dict__["content"] = ''
self.__dict__["oh"] = output_handle
self.__dict__["ll"] = line_length
self.content = init_str # now it's safe and desired to call check_overflow
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.flush_all()
def __len__(self):
return len(self.content)
def __contains__(self, item):
return item in self.content
def __iter__(self):
return iter(self.content)
def __reversed__(self):
return reversed(self.content)
def __setattr__(self, name, value):
super(LineBuffer, self).__setattr__(name, value)
self.check_overflow()
def __add__(self, other):
self.content+=other
return self
def __radd__(self, other):
return other+self.content
def write (self, string):
self.content+=string # check_overflow will be called through __setattr__()
def check_overflow(self):
while len(self)>=self.ll: self.flush()
def flush (self):
l = len(self)>self.ll and self.ll or len(self)
output=self.content[:l]
self.oh.write(output+'\n')
self.content=self.content[l:]
def flush_all (self):
while not self.content == '':
self.flush()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment