Created
December 18, 2024 00:32
-
-
Save xusheng6/1fc2a8a42a31ca587105b8d48d2c1131 to your computer and use it in GitHub Desktop.
helper script to remove the obfuscation and clean up the code in Serpentine challenge in flare-on
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from dataclasses import dataclass | |
@dataclass | |
class Instr: | |
startTime: str | |
instrType: str | |
ip: int | |
addr: int | |
size: int | |
value: int | |
@dataclass | |
class PathInfo: | |
callAddr: int | |
instrAddr: int | |
bytesWritten: int | |
returnAddr: int | |
isJumpToHalt: bool | |
instrsToSkip: int | |
actualInstrSize: int | |
haltInstrAddr: int | |
def assemble_jmp(target, addr): | |
instr_bytes = b'\xe9' | |
diff = (target - (addr + 5)) & 0xffffffff | |
instr_bytes += struct.pack('<I', diff) | |
return instr_bytes | |
trace = [] | |
trace_lines = open('serpentine03.run.shellcode.txt').read().splitlines() | |
for line in trace_lines: | |
parts = line.split('\t') | |
trace.append(Instr(parts[0], parts[1], int(parts[2], 16), int(parts[3], 16), int(parts[4], 16), int(parts[5], 16))) | |
def find_matching_write(idx): | |
addr = trace[idx].addr | |
initial_call_found = False | |
call_found = False | |
write_found = False | |
max_look_ahead = 10 | |
skip_instr = 0 | |
call_instr_idx = 0 | |
second_write_instr_idx = 0 | |
return_addr = 0 | |
return_instr_idx = 0 | |
actual_instr_size = 0 | |
for i in range(1, max_look_ahead): | |
if trace[idx + i].instrType == "Execute" and trace[idx + i].ip == addr: | |
call_found = True | |
call_instr_idx = idx + i | |
actual_instr_size = trace[idx + i].size | |
if not call_found: | |
# Not the write we are looking for | |
return (False, 0) | |
halt_instr_addr = 0 | |
# This is a jump to a halt instruction | |
if trace[call_instr_idx].value & 0xff == 0xe9 and trace[call_instr_idx + 1].value == 0xf4: | |
if trace[call_instr_idx + 2].instrType == 'Execute': | |
return_instr_idx = call_instr_idx + 3 | |
return_addr = trace[return_instr_idx].addr | |
halt_instr_addr = trace[call_instr_idx + 1].addr | |
else: | |
# failed | |
print('failed2') | |
pass | |
# find the initial call instruction | |
for i in range(1, max_look_ahead): | |
if trace[idx - i].instrType == "Execute" and trace[idx - i].value & 0xff == 0xe8: | |
initialCallAddr = trace[idx - i].addr | |
initial_call_found = True | |
break | |
if initial_call_found: | |
return (True, PathInfo(initialCallAddr, trace[call_instr_idx].addr, trace[idx].value, return_addr, True, return_instr_idx - idx, actual_instr_size, halt_instr_addr)) | |
else: | |
print('failed3') | |
pass | |
# failed | |
# This is an ordinary instruction | |
else: | |
for i in range(1, max_look_ahead): | |
if trace[call_instr_idx + i].instrType == "Write" and trace[call_instr_idx + i].addr == addr: | |
write_found = True | |
second_write_instr_idx = call_instr_idx + i | |
break | |
if not write_found: | |
print('failed4') | |
# failed | |
pass | |
# looking for the returning 0xc3 | |
for i in range(1, max_look_ahead): | |
if trace[second_write_instr_idx + i].instrType == "Execute" and trace[second_write_instr_idx + i].value == 0xc3: | |
return_instr_idx = second_write_instr_idx + i + 1 | |
return_addr = trace[return_instr_idx].addr | |
break | |
# find the initial call instruction | |
for i in range(1, max_look_ahead): | |
if trace[idx - i].instrType == "Execute" and trace[idx - i].value & 0xff == 0xe8: | |
initialCallAddr = trace[idx - i].addr | |
initial_call_found = True | |
break | |
if initial_call_found: | |
return (True, PathInfo(initialCallAddr, trace[call_instr_idx].addr, trace[idx].value, return_addr, False, return_instr_idx - idx, actual_instr_size, 0)) | |
else: | |
print('failed5') | |
return (False, PathInfo()) | |
# failed | |
# print(trace) | |
idx = 0 | |
patches = [] | |
while idx < len(trace): | |
if trace[idx].instrType != 'Write': | |
idx += 1 | |
continue | |
found, info = find_matching_write(idx) | |
if found: | |
# print(info) | |
patches.append(info) | |
# print(hex(info.callAddr), hex(info.instrAddr), hex(info.bytesWritten), hex(info.returnAddr), info.isJumpToHalt, info.instrsToSkip, info.actualInstrSize, hex(info.haltInstrAddr)) | |
idx += info.instrsToSkip | |
else: | |
idx += 1 | |
import sys | |
# sys.exit(0) | |
import struct | |
from binaryninja import * | |
bv = BinaryView.load(r'shellcode.bin.bndb') | |
arch = bv.arch | |
print(hex(bv.start)) | |
undo = bv.begin_undo_actions() | |
for patch in patches: | |
if patch.isJumpToHalt: | |
# data = arch.assemble('jmp 0x%x' % (patch.returnAddr), patch.callAddr) | |
# bv.write(patch.callAddr, data) | |
# jump to the actual instruction | |
# data = arch.assemble('jmp 0x%x' % (patch.instrAddr), patch.callAddr) | |
data = assemble_jmp(patch.instrAddr, patch.callAddr) | |
bv.write(patch.callAddr, data) | |
# Write the actual instruction bytes | |
# print("patching at 0x%x" % patch.instrAddr) | |
bv.write(patch.instrAddr, struct.pack('<I', patch.bytesWritten)) | |
# Write NOP at the HALT instruction | |
bv.write(patch.haltInstrAddr, b'\x90') | |
# Write another jmp instruciton to the next valid instruction | |
addr_after_instr = patch.haltInstrAddr + 1 | |
# data = arch.assemble('jmp 0x%x' % (patch.returnAddr), addr_after_instr) | |
data = assemble_jmp(patch.returnAddr, addr_after_instr) | |
# print("patching at 0x%x" % addr_after_instr) | |
bv.write(addr_after_instr, data) | |
else: | |
# jump to the actual instruction | |
# data = arch.assemble('jmp 0x%x' % (patch.instrAddr), patch.callAddr) | |
data = assemble_jmp(patch.instrAddr, patch.callAddr) | |
# print("patching at 0x%x" % patch.callAddr) | |
bv.write(patch.callAddr, data) | |
# Write the actual instruction bytes | |
# print("patching at 0x%x" % patch.instrAddr) | |
bv.write(patch.instrAddr, struct.pack('<I', patch.bytesWritten)) | |
# Write another jmp instruciton to the next valid instruction | |
addr_after_instr = patch.instrAddr + patch.actualInstrSize | |
# data = arch.assemble('jmp 0x%x' % (patch.returnAddr), addr_after_instr) | |
data = assemble_jmp(patch.returnAddr, addr_after_instr) | |
# print("patching at 0x%x" % addr_after_instr) | |
bv.write(addr_after_instr, data) | |
bv.commit_undo_actions(undo) | |
bv.create_database(r'shellcode_patched.bndb') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment