Skip to content

Instantly share code, notes, and snippets.

@informationsea
Created March 31, 2012 09:09
Show Gist options
  • Save informationsea/2261128 to your computer and use it in GitHub Desktop.
Save informationsea/2261128 to your computer and use it in GitHub Desktop.
Indexed file reader
#!/usr/bin/env python
"""
Read indexed tab-delimited file
"""
__author__ = '@informationsea'
__copyright__ = 'Copyright (C) 2011 @informationsea All Rights Reserved.'
import sys
import os
import os.path
import argparse
import bz2
def suggest_type_from_str(string):
"""
Arguments:
- `string`:
"""
try:
return int(string)
except:
try:
return float(string)
except:
return str(string)
class IndexedFile(object):
"""
"""
def __init__(self, filepath):
"""
Arguments:
- `file`:
"""
self._filepath = filepath
self._mainfile = None
self._pointer = 0
def __del__(self):
"""
Arguments:
- `self`:
"""
self.close()
def __exit__(self, exc_type, exc_value, traceback):
"""
Arguments:
- `self`:
- `exc_type`:
- `exc_value`:
- `traceback`:
"""
self.close()
def __enter__(self):
"""
Arguments:
- `self`:
"""
self.open()
return self
def open(self):
"""
Arguments:
- `self`:
"""
try:
if not os.path.exists(self._filepath+'-index'):
make_index(self._filepath)
elif os.path.getmtime(self._filepath) > os.path.getmtime(self._filepath+'-index'):
make_index(self._filepath)
if self._filepath.endswith('.bz2'):
self._mainfile = bz2.BZ2File(self._filepath, 'r')
else:
self._mainfile = open(self._filepath, 'r')
self._indexfile = open(self._filepath+'-index', 'r')
self._index = dict()
self._indexlist = list()
except BaseException as e:
raise e
self.close()
for line in self._indexfile:
elements = line[:-1].split('\t')
self._index[elements[0]] = int(elements[1])
self._indexlist.append(elements[0])
def close(self):
"""
Arguments:
- `self`:
"""
if self._mainfile:
self._mainfile.close()
self._mainfile = None
if self._indexfile:
self._indexfile.close()
self._indexfile = None
def read(self, key):
"""
Arguments:
- `self`:
- `key`:
"""
if self._mainfile == None:
self.open()
if str(key) not in self._index:
return
self._mainfile.seek(self._index[str(key)])
for line in self._mainfile:
first_element = line[:-1].split('\t')[0]
if first_element != str(key):
return
yield line
def splited_read(self, key):
"""
Arguments:
- `self`:
- `key`:
"""
if self._mainfile == None:
self.open()
if str(key) not in self._index:
return
#print 'Seek from {} to {}'.format(self._mainfile.tell(), self._index[str(key)])
self._mainfile.seek(self._index[str(key)])
self._pointer(self._index[str(key)])
for line in self._mainfile:
elements = line[:-1].split('\t')
if elements[0] != str(key):
return
yield [suggest_type_from_str(x) for x in elements]
def indices(self):
"""
Arguments:
- `self`:
"""
if self._mainfile == None:
self.open()
return self._indexlist
def make_index(filepath):
"""
Arguments:
- `filepath`:
"""
print >>sys.stderr, 'Making index for', filepath,
if filepath.endswith('.bz2'):
print >>sys.stderr, 'BZipped file',
mainfile = bz2.BZ2File(filepath, 'r')
else:
mainfile = open(filepath, 'r')
indexfile = open(filepath+'-index', 'w')
print >>sys.stderr, '...'
keylist = set()
last = None
offset = 0
for line in mainfile:
first_element = line[:-1].split('\t')[0]
if first_element != last:
if first_element in keylist:
print >>sys.stderr, 'Error : duplicated keys:', first_element
return False
print >>indexfile, '{0}\t{1}'.format(first_element, offset)
last = first_element
keylist.add(first_element)
if sys.stderr.isatty():
sys.stderr.write('{}\r'.format(len(keylist)))
offset += len(line)
print >>sys.stderr, 'Done '
mainfile.close()
indexfile.close()
return True
def _main():
"""
"""
parser = argparse.ArgumentParser(description='Make index')
parser.add_argument('-t', '--test', help='Read test', default=None)
parser.add_argument('input', nargs='+', help='files')
options = parser.parse_args()
if options.test:
indexedFile = IndexedFile(options.input[0])
for line in indexedFile.read(options.test):
print line[:-1]
return
for one in options.input:
print one
make_index(one)
if __name__ == '__main__':
_main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment