Skip to content

Instantly share code, notes, and snippets.

@parity3
Last active November 1, 2018 04:20
Show Gist options
  • Save parity3/be5b2ab1ba01dcab628bd16d286811ee to your computer and use it in GitHub Desktop.
Save parity3/be5b2ab1ba01dcab628bd16d286811ee to your computer and use it in GitHub Desktop.
import functools, cStringIO, sys
import itertools, zlib, gzip, os
class savedata:
# convenience class used only while reading the gzip file header.
# it may not be a gzip file, so this instance will hold whatever was read so we don't have to seek backwards
def __init__(self, fileobj):
self.fileobj = fileobj
self.saved_data = cStringIO.StringIO()
def read(self, arg):
v = self.fileobj.read(arg)
self.saved_data.write(v)
return v
class gzip_stream:
# Does what gzip does but without seeking while reading.
# Supports mixing __iter__ with read calls.
# Supports non-file-handle streams and works more efficiently in general
CHUNK_SIZE = 65536
def __init__(self, fileobj, parse_gzip_header = True):
self.unused_data = cStringIO.StringIO()
if parse_gzip_header:
self.parse_initial_fileobj(fileobj)
self.fileobj = fileobj
def __getattr__(self, item):
if item == 'fileobj_chunks':
fileobj_chunks = self.fileobj_chunks = iter(functools.partial(self.fileobj.read, self.CHUNK_SIZE), '')
return fileobj_chunks
if item == 'fileobj_unc_chunks':
fileobj_unc_chunks = self.fileobj_unc_chunks = self.iter_fileobj_unc_chunks()
return fileobj_unc_chunks
raise AttributeError(item)
def iter_fileobj_unc_chunks(self):
decobj = zlib.decompressobj(-zlib.MAX_WBITS)
decompress = decobj.decompress
for chunk in self.fileobj_chunks:
unused_data = decobj.unused_data
if unused_data != "":
sio = cStringIO.StringIO(unused_data)
seek = sio.seek
seek(0, os.SEEK_END)
sio.write(chunk)
seek(8) # gzip packet footer containing crc32/size.. ignore
c = next(itertools.dropwhile('\x00'.__eq__, iter(functools.partial(sio.read,1),'')),'')
if c:
seek(-1, os.SEEK_CUR)
decobj = zlib.decompressobj(-zlib.MAX_WBITS)
decompress = decobj.decompress
unc = decompress(sio.read())
if unc:
yield unc
else:
unc = decompress(chunk)
if unc:
yield unc
unc = decobj.flush()
if unc:
yield unc
def __iter__(self):
s = self.unused_data
write = s.write
seek = s.seek
truncate = s.truncate
chunks = self.fileobj_unc_chunks
readline = s.readline
while True:
while True:
l = readline()
if l.endswith('\n'):
yield l
else:
truncate(0)
if l:
write(l)
break
# invariant: whatever is in unused_data is a partial line
chunk = next(chunks, '')
if not chunk:
seek(0)
chunk = s.read()
if chunk:
truncate(0)
yield chunk
return
write(chunk)
seek(0)
def read(self, arg):
unused_data = self.unused_data
tell = unused_data.tell
write = unused_data.write
chunks = self.fileobj_unc_chunks
pos = tell()
read_to = pos + arg
seek = unused_data.seek
read = unused_data.read
seek(0, os.SEEK_END)
while tell() < read_to:
chunk = next(chunks,'')
if chunk:
write(chunk)
else:
break
seek(pos)
s = read(arg)
leftovers = read()
if leftovers:
seek(0)
write(leftovers)
unused_data.truncate()
seek(0)
else:
unused_data.truncate(0)
return s
@classmethod
def parse_initial_fileobj(cls, fileobj):
sd = savedata(fileobj)
gz = gzip.GzipFile(fileobj=sd)
try:
# noinspection PyProtectedMember
gz._init_read()
# noinspection PyProtectedMember
gz._read_gzip_header()
except IOError:
# keep the saved_data of our read monitor
et, ev, tb = sys.exc_info()
ev.savedata = sd.saved_data.getvalue()
raise et,ev,tb
def seek(self, seekto, whence=os.SEEK_SET):
# seek support on uncompressed data is not pretty; instead of slowing down to support seek, only cover the faster cases.
# Also, point of this class is to never disrupt the underlying stream
if whence == os.SEEK_SET:
if 'fileobj_chunks' in self.__dict__:
raise NotImplementedError("seek set not supported after reading")
elif whence == os.SEEK_END:
raise NotImplementedError("seek end never supported")
if seekto < 0:
pos = self.unused_data.tell()
offset = pos + seekto
if offset < 0:
raise NotImplementedError("seek backwards not supported")
self.unused_data.seek(offset)
else:
chunksize = self.CHUNK_SIZE
for _ in xrange(chunksize-1, seekto, chunksize):
s = self.read(chunksize)
size = len(s)
if size != chunksize:
raise EOFError("seek beyond EOF")
leftover = seekto % chunksize
if leftover:
s = self.read(leftover)
size = len(s)
if size != leftover:
raise EOFError("seek beyond EOF")
@parity3
Copy link
Author

parity3 commented Nov 1, 2018

Tried to implement RawIOBase in revisions 2-3 but it doubled execution time for line iterators. So back to revision 1.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment