|
# inspired by http://hooked-on-mnemonics.blogspot.com/2012/10/simple-deobfuscation-of-code.html |
|
|
|
from keystone import Ks, KS_ARCH_X86, KS_MODE_32, KS_MODE_64 |
|
|
|
import idc |
|
import ida_bytes |
|
import ida_kernwin |
|
import ida_ua |
|
|
|
class Insn: |
|
# simple wrapper for instructions, allows to remove IDA dependency if needed |
|
# There are no IDA api calls in the deobfuscation logic, only within this class |
|
def __init__(self, ea): |
|
self.ea = ea |
|
self.insn = ida_ua.insn_t() |
|
ida_ua.decode_insn(self.insn, self.ea) |
|
self.mnem = idc.print_insn_mnem(self.ea) |
|
self.disasm = idc.generate_disasm_line(self.ea, 0) |
|
self.size = self.insn.size |
|
self.first_byte = ida_bytes.get_byte(self.ea) |
|
self.new_ea = None |
|
self.bytes = None |
|
self.line = None |
|
|
|
def get_next(self): |
|
return Insn(idc.next_head(self.ea)) |
|
|
|
def get_line(self): |
|
if self.line is None: |
|
return f"{hex(self.new_ea)}\t{self.disasm: <32} {self.get_bytes().hex(' ')}" |
|
return self.line |
|
|
|
def get_op_value(self, n): |
|
return idc.get_operand_value(self.ea, n) |
|
|
|
def follow_jmp(self): |
|
return Insn(self.get_op_value(0)) |
|
|
|
def get_bytes(self): |
|
if self.bytes is None: |
|
return ida_bytes.get_bytes(self.ea, self.size) |
|
else: |
|
return self.bytes |
|
|
|
|
|
class Jmp: |
|
# simple placeholder to hold jmp info to assemble later |
|
def __init__(self, tgt): |
|
self.ea = -1 |
|
self.tgt = tgt |
|
self.new_ea = None |
|
self.size = 5 |
|
self.bytes = None |
|
self.line = "" |
|
self.mnem = 'jmp' |
|
|
|
def get_line(self): |
|
return self.line |
|
|
|
def get_bytes(self): |
|
return self.bytes |
|
|
|
|
|
class JmpDeobfuscate: |
|
def __init__(self, ea): |
|
self.ea = ea |
|
self.blocks = {} |
|
self.jmp_inserts = {} |
|
self.relocated_blocks = {} |
|
self.visited = set([]) |
|
self.padding = 0x20 |
|
|
|
if ida_bytes.get_item_size(ea) == 8: |
|
self.ks = Ks(KS_ARCH_X86, KS_MODE_64) |
|
else: |
|
self.ks = Ks(KS_ARCH_X86, KS_MODE_32) |
|
|
|
def _follow_jmp_chain(self, insn): |
|
insn = insn.follow_jmp() |
|
while True: |
|
if insn.mnem == 'jmp': |
|
self.visited.add(insn.ea) |
|
insn = insn.follow_jmp() |
|
else: |
|
break |
|
return insn |
|
|
|
def build_blocks(self, ea=None, branches=None): |
|
if ea is None: |
|
ea = self.ea |
|
if branches is None: |
|
branches = list([]) |
|
|
|
curr_block = {} |
|
insn = Insn(ea) |
|
while True: |
|
if insn.ea == idc.BADADDR: |
|
break |
|
|
|
if 'jmp' in insn.mnem: |
|
# if mnemonic is an unconditional jump, follow it |
|
# because it could be a jmp chain, we need to resolve |
|
# the final target |
|
jmp_insn = self._follow_jmp_chain(insn) |
|
# we add the initial jmp to the target list |
|
self.visited.add(insn.ea) |
|
# then we won't save this to the block and continue |
|
insn = jmp_insn |
|
continue |
|
|
|
# add any branches to the target list to visit later |
|
elif 'call' in insn.mnem and insn.first_byte == 0xe8: |
|
call_insn = self._follow_jmp_chain(insn) |
|
branches.append(call_insn.ea) |
|
elif 'j' in insn.mnem: |
|
jmp_insn = self._follow_jmp_chain(insn) |
|
branches.append(jmp_insn.ea) |
|
|
|
# we add current insn to visited and block |
|
self.visited.add(ea) |
|
curr_block[insn.ea] = insn |
|
|
|
# if we hit a ret, we're done with this block |
|
if 'ret' in insn.mnem: |
|
break |
|
|
|
next_insn = insn.get_next() |
|
# if the next insn is in the current block, we've found a loop |
|
# so a placeholder jmp is added now and will be assembled later |
|
if next_insn.ea in curr_block: |
|
curr_block[-1] = Jmp(next_insn.ea) |
|
self.jmp_inserts[next_insn.ea] = None |
|
break |
|
|
|
# continue to next insn |
|
insn = next_insn |
|
|
|
# once we've built a block, we add it to the blocks dict |
|
self.blocks[ea] = curr_block |
|
|
|
# loop through branches and build blocks for them |
|
for tgt in branches: |
|
# ensure we haven't visited this branch already |
|
if tgt not in self.visited: |
|
self.build_blocks(tgt, branches) |
|
|
|
def relocate_blocks(self): |
|
# relocate blocks to new addresses |
|
new_ea = 0 |
|
for key, block in self.blocks.items(): |
|
block_addr = new_ea |
|
for insn in block.values(): |
|
# used when assembling jmp placeholders |
|
if insn.ea in self.jmp_inserts: |
|
self.jmp_inserts[insn.ea] = new_ea |
|
insn.new_ea, new_ea = new_ea, new_ea + insn.size |
|
padding = self.padding - (new_ea % self.padding) |
|
new_ea += padding |
|
self.relocated_blocks[key] = { |
|
'block': block, # block is a dict of ea:insn |
|
'size': new_ea, # size of block |
|
'padding': padding, # padding to align to 0x20 |
|
'block_addr': block_addr, # block start address |
|
} |
|
|
|
def _fixup_block(self, block): |
|
new_ea = next(iter(block['block'].values())).new_ea |
|
for insn in block['block'].values(): |
|
insn.new_ea = new_ea |
|
new_ea += insn.size |
|
padding = self.padding - (new_ea % self.padding) |
|
new_ea += padding |
|
block.update({'size': new_ea, 'padding': padding}) |
|
|
|
@staticmethod |
|
def _build_new_line(insn, ea): |
|
insn.line = f"{hex(insn.new_ea)}\t{insn.mnem:<8}{ea:X}{'h': <22} {insn.get_bytes().hex(' ')}" |
|
|
|
def _assemble_new_relative(self, insn, tgt): |
|
old_size = insn.size |
|
|
|
rel_addr = tgt - insn.new_ea |
|
code = f"{insn.mnem} {rel_addr}" |
|
encoding, _ = self.ks.asm(code) |
|
|
|
insn.bytes = bytes(encoding) |
|
insn.size = len(insn.bytes) |
|
|
|
self._build_new_line(insn, rel_addr) |
|
|
|
if insn.size != old_size: |
|
self._assemble_new_relative(insn, tgt) |
|
return False |
|
|
|
return True |
|
|
|
def fix_relative_addresses(self): |
|
# fix relative addresses |
|
for block in self.relocated_blocks.values(): |
|
for insn in block['block'].values(): |
|
# if insn is a jmp placeholder, assemble it |
|
if isinstance(insn, Jmp): |
|
# we have a dict for this to translate original target to new target address |
|
tgt = self.jmp_inserts[insn.tgt] |
|
if not self._assemble_new_relative(insn, tgt): |
|
# if the size of the jmp changed, we need to fixup the block |
|
self._fixup_block(block) |
|
elif insn.mnem.startswith('j') and insn.bytes is None or \ |
|
insn.mnem == 'call' and insn.bytes is None and insn.first_byte == 0xe8: |
|
# we can use the original bianry to resolve the target |
|
tgt_insn = self._follow_jmp_chain(insn) |
|
# these will point to a block and relocated blocks are keyed by original block address |
|
tgt = self.relocated_blocks[tgt_insn.ea]['block_addr'] |
|
|
|
if not self._assemble_new_relative(insn, tgt): |
|
# if the size of the instruction changed, we need to fixup the block |
|
self._fixup_block(block) |
|
|
|
def print_blocks(self): |
|
for key, block in self.relocated_blocks.items(): |
|
print("=========================================================") |
|
print(f"Block at {hex(key)}") |
|
print("=========================================================") |
|
for insn in block['block'].values(): |
|
print(insn.get_line()) |
|
print("\n") |
|
|
|
def create_buffer(self): |
|
buffer = bytearray() |
|
for key, block in self.relocated_blocks.items(): |
|
for insn in block['block'].values(): |
|
buffer += insn.get_bytes() |
|
buffer += bytearray(b'\x90') * block['padding'] |
|
return buffer |
|
|
|
|
|
jd = JmpDeobfuscate(ida_kernwin.get_screen_ea()) |
|
jd.build_blocks() |
|
jd.relocate_blocks() |
|
jd.fix_relative_addresses() |
|
jd.print_blocks() |
|
|
|
deobfuscated_buffer = jd.create_buffer() |
|
|
|
# you can extend this to save to a file or add to idb |
|
|
|
# print buffer |
|
#print(f"buf = {bytes(deobfuscated_buffer)}") |