Last active
February 24, 2020 18:00
-
-
Save andreberg/6032619 to your computer and use it in GitHub Desktop.
[PyLong from ByteArray] Methods for reading and converting PyLong objects as defined in longobject.c. Made this to help me understand how Python's marshal interface deals with these objects when it commits the bytes making up the PyLong object to a binary pyc file. #cython #python #python3 #marshall #cpython #synalyzeitpro
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/local/bin/python3.3 | |
''' | |
pylong_from_bytearray -- methods for reading and converting PyLong objects. | |
Ported from Python's C source code, specifically from marshal.c and longobj.c | |
Serialization of PyLong objects to marshal's bytecode format has the following | |
layout: | |
+--+--+--+--+--+--+--+--+--+ | |
|6C|XX|XX|XX|XX|NN| ... |NN| | |
+--+--+--+--+--+--+--+--+--+ | |
\_/ \_________/ \_________/ | |
l size digit arr. | |
The size determines the length of the variable digit array portion. | |
It is exactly size * 2 bytes long. | |
:author: | André Berg | |
| | |
:copyright: | 2013 Berg Media. All rights reserved. | |
| | |
:license: | Licensed under the Apache License, Version 2.0 (the "License"); | |
| you may not use this file except in compliance with the License. | |
| You may obtain a copy of the License at | |
| | |
| http://www.apache.org/licenses/LICENSE-2.0 | |
| | |
| Unless required by applicable law or agreed to in writing, software | |
| distributed under the License is distributed on an **"AS IS"** **BASIS**, | |
| **WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND**, either express or implied. | |
| See the License for the specific language governing permissions and | |
| limitations under the License. | |
| | |
:contact: | [email protected] | |
''' | |
import sys | |
import marshal | |
# Find out as much as possible for current system limits | |
def sysbits(): | |
if sys.maxsize > 2**32: | |
return 64 | |
else: | |
return 32 | |
def is_64bit(): | |
if sysbits() > 32: | |
return True | |
else: | |
return False | |
def is_32bit(): | |
if sysbits() <= 32: | |
return True | |
else: | |
return False | |
if is_64bit(): | |
size_t = 8 # 64bit | |
else: | |
size_t = 4 # 32bit | |
if hasattr(sys, 'int_info'): | |
plshift = sys.int_info.bits_per_digit | |
else: | |
if size_t > 4: | |
plshift = 30 | |
else: | |
plshift = 15 | |
SIZEOF_LONG = size_t | |
PY_LLONG_MAX = sys.maxsize # 9223372036854775807 on a 64bit system | |
PY_LLONG_MIN = -(sys.maxsize-1) # -9223372036854775808 on a 64bit system | |
PY_ABS_LLONG_MIN = (0 - PY_LLONG_MIN) | |
PY_SSIZE_T_MAX = PY_LLONG_MAX | |
PyLong_SHIFT = plshift | |
PyLong_BASE = (1 << PyLong_SHIFT) | |
PyLong_MASK = (PyLong_BASE - 1) | |
PyLong_MARSHAL_SHIFT = 15 | |
PyLong_MARSHAL_BASE = (1 << PyLong_MARSHAL_SHIFT) | |
PyLong_MARSHAL_MASK = (PyLong_MARSHAL_BASE - 1) | |
PyLong_MARSHAL_RATIO = round(PyLong_SHIFT / PyLong_MARSHAL_SHIFT) | |
def array(n, default=0): | |
arr = [] | |
for i in range(0, n): # IGNORE:W0612 @UnusedVariable | |
arr.append(default) | |
return arr | |
def bytesarray(s): | |
ba = s | |
if not isinstance(ba[0], int): | |
ba = [] | |
for b in s: | |
ba.append(ord(b)) | |
return ba | |
def hexstring(b): | |
if len(b) == 0: | |
return "0x0" | |
result = "%#x" % b[0] | |
for c in b[1:]: | |
result += "%x" % c | |
return result | |
def long_normalize(v): | |
result = [] | |
if v.ob_size < 0: | |
m = -1 | |
else: | |
m = 1 | |
for i in v.ob_digit: | |
if i != 0: | |
result.append(i) | |
new_pl = PyLong.from_list(result) | |
new_pl.ob_size *= m | |
return new_pl | |
def Py_SIZE(ob): | |
return ob.ob_size | |
class PyLong(object): | |
"""PyLong object.""" | |
def __init__(self, ob_size): | |
super(PyLong, self).__init__() | |
self.ob_size = ob_size | |
self.ob_digit = [] | |
for i in range(0, abs(ob_size)): # IGNORE:W0612 @UnusedVariable | |
self.ob_digit.append(0) | |
def __str__(self): | |
return ("{0}, size = {1!s}, digit = {2!r}" | |
.format(super(PyLong, self).__str__(), self.ob_size, self.ob_digit)) | |
@classmethod | |
def from_list(cls, lst): | |
new_pl = PyLong(len(lst)) | |
new_pl.ob_digit = lst | |
return new_pl | |
def PyLong_FromLong(ival): | |
return -1 | |
def PyLong_FromUnsignedLongLong(ival): | |
ndigits = 0 | |
if (ival < PyLong_BASE): | |
return PyLong_FromLong(ival) | |
# Count the number of Python digits. | |
t = ival | |
while (t > 0): | |
ndigits += 1 | |
t >>= PyLong_SHIFT | |
v = PyLong(ndigits) | |
if (v != None): | |
p = 0 | |
v.ob_size = ndigits | |
while (ival > 0): | |
v.ob_digit[p] = (ival & PyLong_MASK) | |
ival >>= PyLong_SHIFT | |
p += 1 | |
return v | |
def PyLong_AsLongLongAndOverflow(vv): | |
def _exit(): | |
return (res, overflow) | |
res = -1 | |
overflow = 0 | |
v = vv | |
i = Py_SIZE(v) | |
if (i == -1): | |
res = -v.ob_digit[0] | |
_exit() | |
elif (i == 0): | |
res = 0 | |
_exit() | |
elif (i == 1): | |
res = v.ob_digit[0] | |
_exit() | |
else: | |
sign = 1 | |
x = 0 | |
if (i < 0): | |
sign = -1 | |
i = -(i) | |
for ii in range(i, 0, -1): | |
prev = x | |
x = (x << PyLong_SHIFT) + v.ob_digit[ii-1] | |
if ((x >> PyLong_SHIFT) != prev): | |
overflow = sign | |
_exit() | |
# Haven't lost any bits, but casting to long requires extra | |
# care (see comment above). | |
if (x <= PY_LLONG_MAX): | |
res = x * sign | |
elif (sign < 0 and x == PY_ABS_LLONG_MIN): | |
res = PY_LLONG_MIN | |
else: | |
overflow = sign | |
# res is already set to -1 | |
return (res, overflow) | |
def _PyLong_AsByteArray(v, little_endian=True, is_signed=True): | |
if PyLong_SHIFT == 30: | |
n = (abs(v.ob_size) << 2) | |
else: | |
n = (abs(v.ob_size) << 1) | |
i = 0 # index into v->ob_digit | |
ndigits = 0 # |v->ob_size| | |
accum = 0 # sliding register | |
accumbits = 0 # num bits in accum | |
do_twos_comp = False # store 2's-comp? is_signed and v < 0 | |
j = 0 # num bytes filled | |
p = array(n) # pointer to next byte in bytes | |
pincr = 1 # direction to move p | |
def overflow(): | |
raise OverflowError("PyExc_OverflowError: int too big to convert") | |
return -1 | |
assert(v != None) #&& PyLong_Check(v)); | |
if (Py_SIZE(v) < 0): | |
ndigits = -(Py_SIZE(v)) | |
if (not is_signed): | |
raise OverflowError("PyExc_OverflowError: can't convert negative int to unsigned") | |
return -1 | |
do_twos_comp = True | |
else: | |
ndigits = Py_SIZE(v) | |
do_twos_comp = False | |
if little_endian: | |
#p = bytes | |
pincr = 1 | |
incr = 0 | |
else: | |
#p = bytes + n - 1 | |
pincr = -1 | |
incr = len(p) | |
# Copy over all the Python digits. | |
# It's crucial that every Python digit except for the MSD contribute | |
# exactly PyLong_SHIFT bits to the total, so first assert that the long is | |
# normalized. | |
assert((ndigits == 0) or (v.ob_digit[ndigits - 1] != 0)) | |
# for computing 2's-comp | |
carry = 1 if do_twos_comp else 0 | |
for i in range(0, ndigits): | |
thisdigit = v.ob_digit[i] | |
if (do_twos_comp): | |
thisdigit = (thisdigit ^ PyLong_MASK) + carry | |
carry = thisdigit >> PyLong_SHIFT | |
thisdigit &= PyLong_MASK | |
# Because we're going LSB to MSB, thisdigit is more | |
# significant than what's already in accum, so needs to be | |
# prepended to accum. | |
accum |= thisdigit << accumbits | |
# The most-significant digit may be (probably is) at least | |
# partly empty. | |
if (i == ((ndigits - 1) > 0)): | |
# Count # of sign bits -- they needn't be stored, | |
# although for signed conversion we need later to | |
# make sure at least one sign bit gets stored. */ | |
s = (thisdigit ^ PyLong_MASK) if do_twos_comp else thisdigit | |
while (s != 0): | |
s >>= 1 | |
accumbits += 1 | |
else: | |
accumbits += PyLong_SHIFT | |
# Store as many bytes as possible. | |
while (accumbits >= 8): | |
if (j >= n): | |
return overflow() | |
j += 1 | |
p[incr] = abs(accum & 0xff) | |
incr += pincr | |
accumbits -= 8 | |
accum >>= 8 | |
# Store the straggler (if any). | |
assert(accumbits < 8) | |
assert(carry == 0) # else do_twos_comp and *every* digit was 0 | |
if (accumbits > 0): | |
if (j >= n): | |
return overflow() | |
j += 1 | |
if (do_twos_comp): | |
# Fill leading bits of the byte with sign bits | |
# (appropriately pretending that the long had an | |
# infinite supply of sign bits). | |
accum |= (~0) << accumbits | |
p[incr] = abs(accum & 0xff) | |
incr += pincr | |
elif ((j == n) and (n > 0) and is_signed): | |
# The main loop filled the byte array exactly, so the code | |
# just above didn't get to ensure there's a sign bit, and the | |
# loop below wouldn't add one either. Make sure a sign bit | |
# exists. | |
incr -= pincr | |
msb = abs(p[incr]) | |
sign_bit_set = msb >= 0x80 | |
assert(accumbits == 0) | |
if (sign_bit_set == do_twos_comp): | |
return 0 | |
else: | |
return overflow() | |
# Fill remaining bytes with copies of the sign bit. | |
signbyte = 0xff if do_twos_comp else 0 | |
for jj in range (j, n): # IGNORE:W0612 @UnusedVariable | |
p[incr] = signbyte | |
incr += pincr | |
return bytearray(p) | |
def _PyLong_FromByteArray(_bytes, little_endian=True, is_signed=True): | |
n = len(_bytes) | |
if n == 0: | |
return 0 | |
if little_endian: | |
#pstartbyte = _bytes[0] | |
pendbyte = _bytes[-1] | |
incr = 1 | |
else: | |
#pstartbyte = _bytes[-1] | |
pendbyte = _bytes[0] | |
incr = -1 | |
if is_signed: | |
is_signed = (pendbyte >= 0x80) | |
# Compute numsignificantbytes. This consists of finding the most | |
# significant byte. Leading 0 _bytes are insignificant if the number | |
# is positive, and leading 0xff _bytes if negative. | |
pincr = -incr | |
pb = _bytes[::pincr] | |
insignificant = 0xff if is_signed else 0x00 | |
i = 0 | |
for j in range(0, n): | |
if pb[j] != insignificant: | |
break | |
i += 1 | |
numsignificantbytes = (n - i) | |
if is_signed and numsignificantbytes < n: | |
numsignificantbytes += 1 | |
if round(numsignificantbytes > (PY_SSIZE_T_MAX - PyLong_SHIFT) / 8): | |
print("PyExc_OverflowError: byte array too long to convert to int") | |
return None | |
ndigits = round((numsignificantbytes * 8 + PyLong_SHIFT - 1) / PyLong_SHIFT) | |
v = PyLong(ndigits) | |
if v is None: | |
return None | |
# Copy the bits over. The tricky parts are computing 2's-comp on | |
# the fly for signed numbers, and dealing with the mismatch between | |
# 8-bit _bytes and (probably) 15-bit Python digits. | |
carry = 1 # for 2's-comp calculation | |
accum = 0 # sliding register | |
accumbits = 0 # number of bits in accum | |
i = 0 | |
idigit = 0 | |
iincr = 0 | |
for i in range(0, numsignificantbytes, incr): | |
thisbyte = _bytes[iincr] | |
# Compute correction for 2's comp, if needed. | |
if is_signed: | |
thisbyte = (0xff ^ thisbyte) + carry | |
carry = thisbyte >> 8 | |
thisbyte &= 0xff | |
# Because we're going LSB to MSB, thisbyte is | |
# more significant than what's already in accum, | |
# so needs to be prepended to accum. | |
accum = accum | (thisbyte << accumbits) | |
accumbits += 8 | |
if accumbits >= PyLong_SHIFT: | |
# There's enough to fill a Python digit. | |
assert(idigit < ndigits) | |
v.ob_digit[idigit] = (accum & PyLong_MASK) | |
idigit += 1 | |
accum >>= PyLong_SHIFT | |
accumbits -= PyLong_SHIFT | |
assert(accumbits < PyLong_SHIFT) | |
iincr += incr | |
assert(accumbits < PyLong_SHIFT) | |
if accumbits > 0: | |
assert(idigit < ndigits) | |
v.ob_digit[idigit] = accum | |
idigit += 1 | |
v.ob_size = -idigit if is_signed else idigit | |
return long_normalize(v) | |
def w_long(x): | |
result = [] | |
result.append(( x & 0xff)) | |
result.append(((x>> 8) & 0xff)) | |
result.append(((x>>16) & 0xff)) | |
result.append(((x>>24) & 0xff)) | |
if SIZEOF_LONG > 4: | |
result.append(((x>>32) & 0xff)) | |
result.append(((x>>40) & 0xff)) | |
result.append(((x>>48) & 0xff)) | |
return result | |
def r_short(b): | |
x = b[0] | |
x |= b[1] << 8 | |
# Sign-extension, in case short greater than 16 bits | |
x |= -(x & 0x8000) | |
return x | |
def r_long(b): | |
x = b[0] | |
x |= (b[1] << 8) | |
x |= (b[2] << 16) | |
x |= (b[3] << 24) | |
#s = 0 | |
#for c in b: | |
# x |= c << s | |
# s += SIZEOF_LONG | |
if SIZEOF_LONG > 4: | |
# Sign extension for 64-bit machines | |
#if len(b) > 4: | |
# x |= -(x & 0x80000000000000) | |
#else: | |
x |= -(x & 0x80000000) | |
return x | |
def r_PyLong(p): | |
def bad_digit(): | |
raise ValueError("PyExc_ValueError: bad marshal data (digit out of range in long)") | |
_bytes = bytesarray(p) | |
if _bytes[0] != 0x6C: | |
raise ValueError("PyExc_ValueError: bad marshal data (not long data)") | |
incr = 1 | |
n = r_long(_bytes[incr:]) | |
incr += 4 | |
size = int(round(1 + (abs(n) - 1) / PyLong_MARSHAL_RATIO)) | |
shorts_in_top_digit = int(round(1 + (abs(n) - 1) % PyLong_MARSHAL_RATIO)) | |
ob = PyLong(size) | |
if ob is None: | |
return None | |
ob.ob_size = size if (n > 0) else -size | |
for i in range(0, size-1): | |
d = 0 | |
for j in range(0, PyLong_MARSHAL_RATIO): | |
md = r_short(_bytes[incr:]) | |
incr += 2 | |
#if (PyErr_Occurred()) | |
# break; | |
if (md < 0 or md > PyLong_MARSHAL_BASE): | |
bad_digit() | |
d += md << j * PyLong_MARSHAL_SHIFT | |
ob.ob_digit[i] = d | |
d = 0 | |
for j in range(0, shorts_in_top_digit): | |
md = r_short(_bytes[incr:]) | |
incr += 2 | |
#if (PyErr_Occurred()) | |
# break | |
if (md < 0 or md > PyLong_MARSHAL_BASE): | |
bad_digit() | |
# topmost marshal digit should be nonzero | |
if (md == 0 and j == shorts_in_top_digit - 1): | |
raise ValueError("PyExc_ValueError: bad marshal data (unnormalized long data)") | |
d += md << j * PyLong_MARSHAL_SHIFT | |
# top digit should be nonzero, else the resulting PyLong won't be | |
# normalized | |
ob.ob_digit[size - 1] = d | |
return ob | |
if __name__ == '__main__': | |
# A long object (PyLong) has a size and a digit array. | |
# The size determines the length of the digit array. | |
# Some examples of the value portion (a.k.a. the digit | |
# array as packed bytes) of a sequence of serialized | |
# bytes for a long object. | |
# Notice how len(bytes) is equal to 2 * the size. | |
# unpacked long = 9999999999999, packed size = 4 | |
b1_le = b"\x00\x1C\x35\xFA\x49\x8C\x7F\xFF" | |
b1_be = b"\xFF\x7F\x8C\x49\xFA\x35\x1C\x00" | |
# marshal data for long = 1000000000000, packed size = 3 | |
b2_le = b"\x6c\x03\x00\x00\x00\x00\x10\x4A\x29\xA3\x03" | |
b2_be = b"\x03\xA3\x29\x4A\x10\x00\x00\x00\x00\x03\x6c" | |
which = b2_le | |
if sys.byteorder == 'little': | |
le = True | |
else: | |
le = False | |
sgn = True | |
# Let's try and do what marshal does when it reads a | |
# serialized PyLong object from a pyc file buffer. | |
initial_value = -2000000000000 | |
serialized_bytes = marshal.dumps(initial_value) | |
pl = r_PyLong(serialized_bytes) | |
print("serialized_bytes = " + str(serialized_bytes)) | |
print("pl = r_PyLong(serialized_bytes) = " + str(pl)) | |
ba = _PyLong_AsByteArray(pl, little_endian=le, is_signed=sgn) | |
print("ba = _PyLong_AsByteArray(pl) = " + str(ba)) | |
pl2 = _PyLong_FromByteArray(ba, little_endian=le, is_signed=sgn) | |
print("pl2 = _PyLong_FromByteArray(ba) = " + str(pl2)) | |
ll, of = PyLong_AsLongLongAndOverflow(pl) | |
print("PyLong_AsLongLongAndOverflow(pl) = {0} (overflow: {1})".format(ll, ("True" if of else "False"))) | |
assert(ll == initial_value) | |
ll, of = PyLong_AsLongLongAndOverflow(pl2) | |
print("PyLong_AsLongLongAndOverflow(pl2) = {0} (overflow: {1})".format(ll, ("True" if of else "False"))) | |
assert(ll == initial_value) | |
# Now let's try a roundtrip conversion between longs | |
# and PyLongs as does longobject.c. | |
pl = PyLong_FromUnsignedLongLong(1000000000000) | |
print("pl = PyLong_FromUnsignedLongLong(1000000000000) = " + str(pl)) | |
ba = _PyLong_AsByteArray(pl, little_endian=le, is_signed=sgn) | |
print("ba = _PyLong_AsByteArray(pl) = " + str(ba)) | |
pl = _PyLong_FromByteArray(ba, little_endian=le, is_signed=sgn) | |
print("_PyLong_FromByteArray(ba) = " + str(pl)) | |
ll, of = PyLong_AsLongLongAndOverflow(pl) | |
print("PyLong_AsLongLongAndOverflow(pl) = {0} (overflow: {1})".format(ll, ("True" if of else "False"))) | |
pl = _PyLong_FromByteArray(which) | |
print("pl = _PyLong_FromByteArray(which) = " + str(pl)) | |
ll, of = PyLong_AsLongLongAndOverflow(pl) | |
print("PyLong_AsLongLongAndOverflow(pl) = {0} (overflow: {1})".format(ll, ("True" if of else "False"))) | |
pl = r_PyLong(which) | |
print("pl = r_PyLong(which) = " + str(pl)) | |
ll, of = PyLong_AsLongLongAndOverflow(pl) | |
print("PyLong_AsLongLongAndOverflow(pl) = {0} (overflow: {1})".format(ll, ("True" if of else "False"))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment