Last active
November 1, 2018 04:20
-
-
Save parity3/be5b2ab1ba01dcab628bd16d286811ee to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import functools, cStringIO, sys | |
import itertools, zlib, gzip, os | |
class savedata: | |
# convenience class used only while reading the gzip file header. | |
# it may not be a gzip file, so this instance will hold whatever was read so we don't have to seek backwards | |
def __init__(self, fileobj): | |
self.fileobj = fileobj | |
self.saved_data = cStringIO.StringIO() | |
def read(self, arg): | |
v = self.fileobj.read(arg) | |
self.saved_data.write(v) | |
return v | |
class gzip_stream: | |
# Does what gzip does but without seeking while reading. | |
# Supports mixing __iter__ with read calls. | |
# Supports non-file-handle streams and works more efficiently in general | |
CHUNK_SIZE = 65536 | |
def __init__(self, fileobj, parse_gzip_header = True): | |
self.unused_data = cStringIO.StringIO() | |
if parse_gzip_header: | |
self.parse_initial_fileobj(fileobj) | |
self.fileobj = fileobj | |
def __getattr__(self, item): | |
if item == 'fileobj_chunks': | |
fileobj_chunks = self.fileobj_chunks = iter(functools.partial(self.fileobj.read, self.CHUNK_SIZE), '') | |
return fileobj_chunks | |
if item == 'fileobj_unc_chunks': | |
fileobj_unc_chunks = self.fileobj_unc_chunks = self.iter_fileobj_unc_chunks() | |
return fileobj_unc_chunks | |
raise AttributeError(item) | |
def iter_fileobj_unc_chunks(self): | |
decobj = zlib.decompressobj(-zlib.MAX_WBITS) | |
decompress = decobj.decompress | |
for chunk in self.fileobj_chunks: | |
unused_data = decobj.unused_data | |
if unused_data != "": | |
sio = cStringIO.StringIO(unused_data) | |
seek = sio.seek | |
seek(0, os.SEEK_END) | |
sio.write(chunk) | |
seek(8) # gzip packet footer containing crc32/size.. ignore | |
c = next(itertools.dropwhile('\x00'.__eq__, iter(functools.partial(sio.read,1),'')),'') | |
if c: | |
seek(-1, os.SEEK_CUR) | |
decobj = zlib.decompressobj(-zlib.MAX_WBITS) | |
decompress = decobj.decompress | |
unc = decompress(sio.read()) | |
if unc: | |
yield unc | |
else: | |
unc = decompress(chunk) | |
if unc: | |
yield unc | |
unc = decobj.flush() | |
if unc: | |
yield unc | |
def __iter__(self): | |
s = self.unused_data | |
write = s.write | |
seek = s.seek | |
truncate = s.truncate | |
chunks = self.fileobj_unc_chunks | |
readline = s.readline | |
while True: | |
while True: | |
l = readline() | |
if l.endswith('\n'): | |
yield l | |
else: | |
truncate(0) | |
if l: | |
write(l) | |
break | |
# invariant: whatever is in unused_data is a partial line | |
chunk = next(chunks, '') | |
if not chunk: | |
seek(0) | |
chunk = s.read() | |
if chunk: | |
truncate(0) | |
yield chunk | |
return | |
write(chunk) | |
seek(0) | |
def read(self, arg): | |
unused_data = self.unused_data | |
tell = unused_data.tell | |
write = unused_data.write | |
chunks = self.fileobj_unc_chunks | |
pos = tell() | |
read_to = pos + arg | |
seek = unused_data.seek | |
read = unused_data.read | |
seek(0, os.SEEK_END) | |
while tell() < read_to: | |
chunk = next(chunks,'') | |
if chunk: | |
write(chunk) | |
else: | |
break | |
seek(pos) | |
s = read(arg) | |
leftovers = read() | |
if leftovers: | |
seek(0) | |
write(leftovers) | |
unused_data.truncate() | |
seek(0) | |
else: | |
unused_data.truncate(0) | |
return s | |
@classmethod | |
def parse_initial_fileobj(cls, fileobj): | |
sd = savedata(fileobj) | |
gz = gzip.GzipFile(fileobj=sd) | |
try: | |
# noinspection PyProtectedMember | |
gz._init_read() | |
# noinspection PyProtectedMember | |
gz._read_gzip_header() | |
except IOError: | |
# keep the saved_data of our read monitor | |
et, ev, tb = sys.exc_info() | |
ev.savedata = sd.saved_data.getvalue() | |
raise et,ev,tb | |
def seek(self, seekto, whence=os.SEEK_SET): | |
# seek support on uncompressed data is not pretty; instead of slowing down to support seek, only cover the faster cases. | |
# Also, point of this class is to never disrupt the underlying stream | |
if whence == os.SEEK_SET: | |
if 'fileobj_chunks' in self.__dict__: | |
raise NotImplementedError("seek set not supported after reading") | |
elif whence == os.SEEK_END: | |
raise NotImplementedError("seek end never supported") | |
if seekto < 0: | |
pos = self.unused_data.tell() | |
offset = pos + seekto | |
if offset < 0: | |
raise NotImplementedError("seek backwards not supported") | |
self.unused_data.seek(offset) | |
else: | |
chunksize = self.CHUNK_SIZE | |
for _ in xrange(chunksize-1, seekto, chunksize): | |
s = self.read(chunksize) | |
size = len(s) | |
if size != chunksize: | |
raise EOFError("seek beyond EOF") | |
leftover = seekto % chunksize | |
if leftover: | |
s = self.read(leftover) | |
size = len(s) | |
if size != leftover: | |
raise EOFError("seek beyond EOF") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Tried to implement RawIOBase in revisions 2-3 but it doubled execution time for line iterators. So back to revision 1.