Skip to content

Instantly share code, notes, and snippets.

@xusheng6
Created December 18, 2024 00:32
Show Gist options
  • Save xusheng6/1fc2a8a42a31ca587105b8d48d2c1131 to your computer and use it in GitHub Desktop.
Save xusheng6/1fc2a8a42a31ca587105b8d48d2c1131 to your computer and use it in GitHub Desktop.
helper script to remove the obfuscation and clean up the code in Serpentine challenge in flare-on
from dataclasses import dataclass
@dataclass
class Instr:
startTime: str
instrType: str
ip: int
addr: int
size: int
value: int
@dataclass
class PathInfo:
callAddr: int
instrAddr: int
bytesWritten: int
returnAddr: int
isJumpToHalt: bool
instrsToSkip: int
actualInstrSize: int
haltInstrAddr: int
def assemble_jmp(target, addr):
instr_bytes = b'\xe9'
diff = (target - (addr + 5)) & 0xffffffff
instr_bytes += struct.pack('<I', diff)
return instr_bytes
trace = []
trace_lines = open('serpentine03.run.shellcode.txt').read().splitlines()
for line in trace_lines:
parts = line.split('\t')
trace.append(Instr(parts[0], parts[1], int(parts[2], 16), int(parts[3], 16), int(parts[4], 16), int(parts[5], 16)))
def find_matching_write(idx):
addr = trace[idx].addr
initial_call_found = False
call_found = False
write_found = False
max_look_ahead = 10
skip_instr = 0
call_instr_idx = 0
second_write_instr_idx = 0
return_addr = 0
return_instr_idx = 0
actual_instr_size = 0
for i in range(1, max_look_ahead):
if trace[idx + i].instrType == "Execute" and trace[idx + i].ip == addr:
call_found = True
call_instr_idx = idx + i
actual_instr_size = trace[idx + i].size
if not call_found:
# Not the write we are looking for
return (False, 0)
halt_instr_addr = 0
# This is a jump to a halt instruction
if trace[call_instr_idx].value & 0xff == 0xe9 and trace[call_instr_idx + 1].value == 0xf4:
if trace[call_instr_idx + 2].instrType == 'Execute':
return_instr_idx = call_instr_idx + 3
return_addr = trace[return_instr_idx].addr
halt_instr_addr = trace[call_instr_idx + 1].addr
else:
# failed
print('failed2')
pass
# find the initial call instruction
for i in range(1, max_look_ahead):
if trace[idx - i].instrType == "Execute" and trace[idx - i].value & 0xff == 0xe8:
initialCallAddr = trace[idx - i].addr
initial_call_found = True
break
if initial_call_found:
return (True, PathInfo(initialCallAddr, trace[call_instr_idx].addr, trace[idx].value, return_addr, True, return_instr_idx - idx, actual_instr_size, halt_instr_addr))
else:
print('failed3')
pass
# failed
# This is an ordinary instruction
else:
for i in range(1, max_look_ahead):
if trace[call_instr_idx + i].instrType == "Write" and trace[call_instr_idx + i].addr == addr:
write_found = True
second_write_instr_idx = call_instr_idx + i
break
if not write_found:
print('failed4')
# failed
pass
# looking for the returning 0xc3
for i in range(1, max_look_ahead):
if trace[second_write_instr_idx + i].instrType == "Execute" and trace[second_write_instr_idx + i].value == 0xc3:
return_instr_idx = second_write_instr_idx + i + 1
return_addr = trace[return_instr_idx].addr
break
# find the initial call instruction
for i in range(1, max_look_ahead):
if trace[idx - i].instrType == "Execute" and trace[idx - i].value & 0xff == 0xe8:
initialCallAddr = trace[idx - i].addr
initial_call_found = True
break
if initial_call_found:
return (True, PathInfo(initialCallAddr, trace[call_instr_idx].addr, trace[idx].value, return_addr, False, return_instr_idx - idx, actual_instr_size, 0))
else:
print('failed5')
return (False, PathInfo())
# failed
# print(trace)
idx = 0
patches = []
while idx < len(trace):
if trace[idx].instrType != 'Write':
idx += 1
continue
found, info = find_matching_write(idx)
if found:
# print(info)
patches.append(info)
# print(hex(info.callAddr), hex(info.instrAddr), hex(info.bytesWritten), hex(info.returnAddr), info.isJumpToHalt, info.instrsToSkip, info.actualInstrSize, hex(info.haltInstrAddr))
idx += info.instrsToSkip
else:
idx += 1
import sys
# sys.exit(0)
import struct
from binaryninja import *
bv = BinaryView.load(r'shellcode.bin.bndb')
arch = bv.arch
print(hex(bv.start))
undo = bv.begin_undo_actions()
for patch in patches:
if patch.isJumpToHalt:
# data = arch.assemble('jmp 0x%x' % (patch.returnAddr), patch.callAddr)
# bv.write(patch.callAddr, data)
# jump to the actual instruction
# data = arch.assemble('jmp 0x%x' % (patch.instrAddr), patch.callAddr)
data = assemble_jmp(patch.instrAddr, patch.callAddr)
bv.write(patch.callAddr, data)
# Write the actual instruction bytes
# print("patching at 0x%x" % patch.instrAddr)
bv.write(patch.instrAddr, struct.pack('<I', patch.bytesWritten))
# Write NOP at the HALT instruction
bv.write(patch.haltInstrAddr, b'\x90')
# Write another jmp instruciton to the next valid instruction
addr_after_instr = patch.haltInstrAddr + 1
# data = arch.assemble('jmp 0x%x' % (patch.returnAddr), addr_after_instr)
data = assemble_jmp(patch.returnAddr, addr_after_instr)
# print("patching at 0x%x" % addr_after_instr)
bv.write(addr_after_instr, data)
else:
# jump to the actual instruction
# data = arch.assemble('jmp 0x%x' % (patch.instrAddr), patch.callAddr)
data = assemble_jmp(patch.instrAddr, patch.callAddr)
# print("patching at 0x%x" % patch.callAddr)
bv.write(patch.callAddr, data)
# Write the actual instruction bytes
# print("patching at 0x%x" % patch.instrAddr)
bv.write(patch.instrAddr, struct.pack('<I', patch.bytesWritten))
# Write another jmp instruciton to the next valid instruction
addr_after_instr = patch.instrAddr + patch.actualInstrSize
# data = arch.assemble('jmp 0x%x' % (patch.returnAddr), addr_after_instr)
data = assemble_jmp(patch.returnAddr, addr_after_instr)
# print("patching at 0x%x" % addr_after_instr)
bv.write(addr_after_instr, data)
bv.commit_undo_actions(undo)
bv.create_database(r'shellcode_patched.bndb')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment