Last active
March 16, 2018 09:11
-
-
Save llllllllll/8cb443125c6e6db97e8b1af7291547a3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Example usage: | |
In [1]: !cat ayy.s | |
.text | |
.global _start | |
.type _start, @function | |
_start: | |
movq $10, %rdx | |
pushw $10 | |
pushw $28513 | |
pushw $28012 | |
pushw $8313 | |
pushw $31073 | |
movq %rsp, %rsi | |
movq $1, %rdi | |
movq $1, %rbx | |
movq $1, %rax | |
syscall | |
addq $10, %rsp | |
movq $0, %rax | |
retq | |
In [2]: !as -o ayy.o ayy.s | |
In [3]: with open('ayy.o', 'rb') as f: | |
...: f.seek(64) | |
...: payload = f.read() | |
...: | |
In [4]: from they_should_have_written_cpython_in_rust_lol import execute_binary_payload | |
In [5]: execute_binary_payload(payload) | |
ayy lmao | |
""" | |
import dis | |
import mmap | |
import sys | |
import types | |
# byte offsets for structures | |
if hasattr(sys, 'gettotalrefcount'): | |
# under PyDEBUG builds | |
_f_localsplus_offset = 392 | |
_ob_item_offset = 40 | |
_view_offset = 72 | |
_sizeof_pyobject = 32 | |
_mbuf_offset = 40 | |
_master_offset = 48 | |
_view_offset = 72 | |
_tp_repr_offset = 104 | |
else: | |
# under normal builds | |
_f_localsplus_offset = 376 | |
_ob_item_offset = 24 | |
_view_offset = 56 | |
_sizeof_pyobject = 16 | |
_mbuf_offset = 24 | |
_master_offset = 32 | |
_view_offset = 56 | |
_tp_repr_offset = 88 | |
# the maximum allowed offset from the address of the frame's locals | |
# to the target address measured in pointer counts. | |
_write_range = 2 ** 22 | |
def _make_write_bytecode(range_): | |
"""Create bytecode that emits one branch for all values in | |
[-range_, range_] like: | |
if this_branch(): | |
_n = value | |
where ``this_branch`` is a callable that checks if we should assign to | |
the given index. ``value`` is constant 0. ``this_branch`` is not stored | |
in the locals, instead we just store it on the stack. | |
To do negative values, we overflow oparg with man EXTENDED_ARG calls. | |
""" | |
try: | |
return _make_write_bytecode._cache[range_] | |
except KeyError: | |
pass | |
cached_file_name = f'_make_write_bytecode.{range_}' | |
try: | |
with open(cached_file_name, 'rb') as f: | |
_make_write_bytecode._cache[range_] = ret = f.read() | |
return ret | |
except FileNotFoundError: | |
pass | |
code = bytearray(( | |
dis.opmap['LOAD_CONST'], 0, | |
dis.opmap['YIELD_VALUE'], 0, | |
)) | |
def check(n): | |
*extension, index = (n % 2 ** 32).to_bytes(4, 'big') | |
extension = [*filter(bool, extension)] | |
code.extend(( | |
dis.opmap['DUP_TOP'], 0, | |
dis.opmap['CALL_FUNCTION'], 0, | |
dis.opmap['EXTENDED_ARG'], 0, | |
dis.opmap['EXTENDED_ARG'], 0, | |
dis.opmap['EXTENDED_ARG'], 0, | |
dis.opmap['POP_JUMP_IF_FALSE'], 0, | |
dis.opmap['LOAD_CONST'], 0, | |
)) | |
ix = len(code) - 3 | |
for ext in extension: | |
code.extend((dis.opmap['EXTENDED_ARG'], ext)) | |
code.extend(( | |
dis.opmap['STORE_FAST'], index, | |
)) | |
jump_index = len(code) | |
*jump_extension, jump_index = jump_index.to_bytes(4, 'big') | |
code[ix - 6] = jump_extension[0] | |
code[ix - 4] = jump_extension[1] | |
code[ix - 2] = jump_extension[2] | |
code[ix] = jump_index | |
for n in range(range_): | |
check(n) | |
check(-n) | |
code.extend(( | |
dis.opmap['LOAD_CONST'], 0, | |
dis.opmap['RETURN_VALUE'], 0 | |
)) | |
ret = _make_write_bytecode._cache[range_] = bytes(code) | |
with open(cached_file_name, 'wb') as f: | |
f.write(ret) | |
return ret | |
_make_write_bytecode._cache = {} | |
def _make_write_codeobject(range_, value): | |
return types.CodeType( | |
0, | |
0, | |
0, | |
2, | |
99, | |
_make_write_bytecode(range_), | |
(value,), # make co_consts[0] ``value`` to replace None | |
(), | |
(), | |
'<string>', | |
'<write-target-helper>', | |
1, | |
b'', | |
) | |
def _write_target_address(target_address, value, *, verbose=False): | |
# collection of frames to prevent deallocation | |
frames = [] | |
while True: | |
# spray the heap with generators looking for a location where the | |
# frame's local variables are withing ``range_ * 8`` bytes of the | |
# target address | |
code = _make_write_codeobject(_write_range, value) | |
generator = types.FunctionType(code, {})() | |
locals_address = id(generator.gi_frame) + _f_localsplus_offset | |
offset_bytes = target_address - locals_address | |
offset_index = offset_bytes // 8 | |
if -_write_range <= offset_index <= _write_range: | |
# we found a frame that is close enough to the target address; | |
# clear the saved frames and continue | |
frames.clear() | |
if verbose: | |
print( | |
f'found write frame within range:' | |
f' offset={offset_index};' | |
f' locals=0x{locals_address:x}' | |
f' target=0x{target_address:x}', | |
) | |
break | |
else: | |
if verbose: | |
print( | |
f'write frame too far away:' | |
f' offset={offset_index};' | |
f' locals=0x{locals_address:x}' | |
f' target=0x{target_address:x}', | |
) | |
# this frame isn't close enough; save it in memory so we don't try | |
# here again | |
frames.append(generator.gi_frame) | |
# make sure we set None to ``value`` and prime the generator | |
assert next(generator) is value, 'yielded the wrong value' | |
def branch_selector(): | |
for n in range(_write_range): | |
if n == offset_index: | |
yield True | |
break | |
yield False | |
if -n == offset_index: | |
yield True | |
break | |
yield False | |
try: | |
# send the selector into the generator; this gets bound as | |
# ``this_branch`` | |
generator.send(branch_selector().__next__) | |
except StopIteration: | |
# the branch after ``branch_selector`` returns True will raise a | |
# ``StopIteration`` | |
pass | |
# the maximum allowed offset from the address of the frame's locals | |
# to the target address measured in pointer counts. | |
_load_range = 2 ** 22 | |
def _make_load_bytecode(range_): | |
"""Create bytecode that emits one branch for all values in | |
[-range_, range_] like: | |
if this_branch(): | |
if False: | |
_n = False # create a new local | |
yield _n | |
where ``this_branch`` is a callable that checks if we should assign to | |
the given index. ``this_branch`` is not stored | |
in the locals, instead we just store it on the stack. | |
To do negative values, we overflow oparg with man EXTENDED_ARG calls. | |
""" | |
try: | |
return _make_load_bytecode._cache[range_] | |
except KeyError: | |
pass | |
cached_file_name = f'_make_load_bytecode.{range_}' | |
try: | |
with open(cached_file_name, 'rb') as f: | |
_make_load_bytecode._cache[range_] = ret = f.read() | |
return ret | |
except FileNotFoundError: | |
pass | |
code = bytearray(( | |
dis.opmap['LOAD_CONST'], 0, | |
dis.opmap['YIELD_VALUE'], 0, | |
)) | |
def check(n): | |
*extension, index = (n % 2 ** 32).to_bytes(4, 'big') | |
extension = [*filter(bool, extension)] | |
code.extend(( | |
dis.opmap['DUP_TOP'], 0, | |
dis.opmap['CALL_FUNCTION'], 0, | |
dis.opmap['EXTENDED_ARG'], 0, | |
dis.opmap['EXTENDED_ARG'], 0, | |
dis.opmap['EXTENDED_ARG'], 0, | |
dis.opmap['POP_JUMP_IF_FALSE'], 0, | |
)) | |
ix = len(code) - 1 | |
for ext in extension: | |
code.extend((dis.opmap['EXTENDED_ARG'], ext)) | |
code.extend(( | |
dis.opmap['LOAD_FAST'], index, | |
dis.opmap['YIELD_VALUE'], 0, | |
)) | |
jump_index = len(code) | |
*jump_extension, jump_index = jump_index.to_bytes(4, 'big') | |
code[ix - 6] = jump_extension[0] | |
code[ix - 4] = jump_extension[1] | |
code[ix - 2] = jump_extension[2] | |
code[ix] = jump_index | |
for n in range(range_): | |
check(n) | |
check(-n) | |
code.extend(( | |
dis.opmap['LOAD_CONST'], 0, | |
dis.opmap['RETURN_VALUE'], 0 | |
)) | |
ret = _make_load_bytecode._cache[range_] = bytes(code) | |
with open(cached_file_name, 'wb') as f: | |
f.write(ret) | |
return ret | |
_make_load_bytecode._cache = {} | |
def _make_load_codeobject(range_): | |
return types.CodeType( | |
0, | |
0, | |
0, | |
2, | |
99, | |
_make_load_bytecode(range_), | |
(None,), | |
(), | |
(), | |
'<string>', | |
'<load-target-helper>', | |
1, | |
b'', | |
) | |
def _load_target_address(target_address, *, verbose=False): | |
# collection of frames to prevent deallocation | |
frames = [] | |
while True: | |
# spray the heap with generators looking for a location where the | |
# frame's local variables are withing ``range_ * 8`` bytes of the | |
# target address | |
code = _make_load_codeobject(_load_range) | |
generator = types.FunctionType(code, {})() | |
locals_address = id(generator.gi_frame) + _f_localsplus_offset | |
offset_bytes = target_address - locals_address | |
offset_index = offset_bytes // 8 | |
if -_load_range <= offset_index <= _load_range: | |
# we found a frame that is close enough to the target address; | |
# clear the saved frames and continue | |
frames.clear() | |
if verbose: | |
print( | |
f'found load frame within range:' | |
f' offset={offset_index};' | |
f' locals=0x{locals_address:x}' | |
f' target=0x{target_address:x}', | |
) | |
break | |
else: | |
if verbose: | |
print( | |
f'load frame too far away:' | |
f' offset={offset_index};' | |
f' locals=0x{locals_address:x}' | |
f' target=0x{target_address:x}', | |
) | |
# this frame isn't close enough; save it in memory so we don't try | |
# here again | |
frames.append(generator.gi_frame) | |
# make sure we set None to ``value`` and prime the generator | |
assert next(generator) is None, 'yielded the wrong value' | |
def branch_selector(): | |
for n in range(_write_range): | |
if n == offset_index: | |
yield True | |
break | |
yield False | |
if -n == offset_index: | |
yield True | |
break | |
yield False | |
# send the selector into the generator; this gets bound as | |
# ``this_branch`` | |
return generator.send(branch_selector().__next__) | |
def tuple_setitem(t, ix, value, *, verbose=False): | |
"""``setitem`` for tuples. | |
Parameters | |
---------- | |
t : tuple | |
The tuple to assign into. | |
ix : int | |
The index to assign to (without bounds checking). | |
value : any | |
The value to store at index ``ix``. | |
verbose : bool, optional | |
Print debugging information? | |
""" | |
# the target address is the id of the tuple + the offset of the first | |
# element + the index * the size of a pointer | |
target_address = id(t) + _ob_item_offset + ix * 8 | |
_write_target_address(target_address, value, verbose=verbose) | |
def access_memory(ob, size, *, verbose=False): | |
"""Access the underlying memory for an object as a mutable memory view. | |
Parameters | |
---------- | |
ob : object | |
The object to access | |
size : int | |
The number of bytes to access. | |
verbose : bool, optional | |
Print debugging information? | |
Returns | |
------- | |
data : memoryview | |
A view of ``size`` bytes starting at the address of ``ob``. | |
""" | |
underlying_memory = bytearray(size) | |
view = memoryview(underlying_memory) | |
managed_buffer = _load_target_address( | |
id(view) + _mbuf_offset, | |
verbose=verbose, | |
) | |
_write_target_address( | |
id(managed_buffer) + _master_offset, | |
ob, | |
verbose=verbose, | |
) | |
_write_target_address( | |
id(view) + _view_offset, | |
ob, | |
verbose=verbose, | |
) | |
return view | |
def execute_binary_payload(payload, *, verbose=False): | |
"""Execute an arbitrary compiled payload. | |
Parameters | |
---------- | |
payload : bytes | |
The compiled payload to execute. | |
verbose : bool, optional | |
Print debugging information? | |
""" | |
buffer = mmap.mmap(-1, len(payload), prot=mmap.PROT_EXEC | mmap.PROT_WRITE) | |
buffer[:] = payload | |
target_address = access_memory(buffer, 64, verbose=verbose)[ | |
_sizeof_pyobject:_sizeof_pyobject + 8 | |
] | |
class C: | |
pass | |
mem = access_memory(C, 124, verbose=verbose) | |
mem[_tp_repr_offset:_tp_repr_offset + 8] = target_address | |
try: | |
repr(C()) | |
except SystemError: | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment