Last active
July 13, 2016 22:26
-
-
Save Higgs1/c0066abd1cc2f2f5435d to your computer and use it in GitHub Desktop.
My current Python Data Stream Reader
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
from functools import partialmethod | |
import struct, math, io | |
def _boc(end): | |
if end is 'little': | |
return '<' | |
elif end is 'big': | |
return '>' | |
return '=' | |
def _rbitshift(val, bits): | |
return val >> bits, val & (2**bits - 1) | |
__all__ = ['DataReader'] | |
# TODO: make more generalized. Signing method should be | |
# completely independent of number format. | |
class DataReader(io.BufferedReader): | |
def __init__(self, raw, byteorder = None): | |
super().__init__(raw) | |
self.byteorder = byteorder or 'little' | |
@classmethod | |
def open(cls, file, byteorder = None, *args, **kwargs): | |
return cls(open(file, mode = 'rb', *args, **kwargs), byteorder) | |
@classmethod | |
def from_bytes(cls, bytes, byteorder = None): | |
if isinstance(bytes, str): | |
bytes = bytes.encode('iso-8859-1') | |
return cls(io.BytesIO(bytes), byteorder) | |
def read(self, n = -1): | |
bytes = super().read(n) | |
if n and not bytes: | |
raise StopIteration | |
return bytes | |
def unpack(self, fmt, byteorder = None): | |
fmt = _boc(byteorder or self.byteorder) + fmt | |
ret = struct.unpack(fmt, self.read(struct.calcsize(fmt))) | |
return ret[0] if len(ret) is 1 else ret | |
# Fixed length IEEE 754 number decoding methods. | |
# TODO: move logic to a separate IEEE 754 class. | |
# TODO: fully test if works. | |
def read_ieee754(self, size = 4, expbits = None, byteorder = None, *, | |
signed = True, expbias = None): | |
if not expbits: | |
expbits = max(0, round(4 * math.log(size, 2)) - 1) | |
if not expbias: | |
expbias = 2**(expbits - 1) - 1 | |
fracbits = size * 8 - expbits - signed | |
raw, frac = _rbitshift(self.read_uint(size, byteorder), fracbits) | |
raw, exp = _rbitshift(raw, expbits) | |
frac *= 2**-fracbits | |
if exp == 0: # Denormal numbers | |
val = frac * 2**(1 - expbias) | |
elif exp + 1 == 2**expbits: # Infinity / NaN | |
val = float('nan' if frac else 'inf') | |
else: # Normailized value | |
val = (1 + frac) * 2**(exp - expbias) | |
return -val if signed & raw else val | |
read_half = partialmethod(read_ieee754, 2, 5) | |
read_float = partialmethod(read_ieee754, 4, 8) | |
read_double = partialmethod(read_ieee754, 8) | |
# Fixed length base 256 number decoding methods. | |
def read_uint(self, size = 4, byteorder = None, *, signed = False): | |
return int.from_bytes(self.read(size), signed = signed, | |
byteorder = byteorder or self.byteorder) | |
read_ubyte = partialmethod(read_uint, 1) | |
read_ushort = partialmethod(read_uint, 2) | |
read_ulong = partialmethod(read_uint, 8) | |
read_int = partialmethod(read_uint, signed = True) | |
read_byte = partialmethod(read_int, 1) | |
read_short = partialmethod(read_int, 2) | |
read_long = partialmethod(read_int, 8) | |
# String decoding methods. | |
# TODO: generalize into 'read until str' method? | |
def read_cstr(self, byteorder = None): | |
buf = self.read(1) | |
while buf[-1] != 0: | |
buf += self.read(1) | |
return buf[:-1] | |
def read_pstr(self, size = 2, byteorder = None): | |
return self.read(self.read_uint(size, byteorder)) | |
# Variable length base 128 number decoding methods. | |
# TODO: implement maxsize? | |
# Maxsize = if a number reaches maxsize length, | |
# then the last byte is fully read- no need to check high bit. | |
# TODO: implement minsize? | |
# Minsize = like maxsize, the first minsize-1 bytes are fully read. | |
def read_uintvar(self, byteorder = None, *, signed = False): | |
byteorder = byteorder or self.byteorder | |
byte = self.read_ubyte() | |
if signed: | |
val = byte & 63 | |
signed = byte & 64 | |
else: | |
val = byte & 127 | |
if byteorder is 'little': | |
pos = 1 | |
while byte & 128: | |
byte = self.read_ubyte() | |
val += (byte & 127) << (pos * 7) | |
pos += 1 | |
elif byteorder is 'big': | |
while byte & 128: | |
byte = self.read_ubyte() | |
val = (val << 7) + (byte & 127) | |
return val * -1 if signed else val | |
read_intvar = partialmethod(read_uintvar, signed = True) | |
read_leb128 = partialmethod(read_intvar, 'little') | |
read_uleb128 = partialmethod(read_uintvar, 'little') | |
read_uvlq = partialmethod(read_uintvar, 'big') | |
read_vlq = partialmethod(read_uvlq, signed = True) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
A fairly general purpose data reader for python. It's what I currently use for various things. I plan on making it more generalized eventually!
See https://docs.google.com/document/d/1qRY02Hoj1xAS-12rW5FTAh0UMRMHf-v3QYlvRsBI5ig for my current thoughts.