Skip to content

Instantly share code, notes, and snippets.

@pierrehpezier
Created March 19, 2026 14:30
Show Gist options
  • Select an option

  • Save pierrehpezier/35c90c38b2c579d612d5970b04702df8 to your computer and use it in GitHub Desktop.

Select an option

Save pierrehpezier/35c90c38b2c579d612d5970b04702df8 to your computer and use it in GitHub Desktop.
# Copyright (c) 2026 Nextron Systems
# Author: Pierre-Henri Pezier
import idaapi
import idautils
import idc
import re
import struct
from unicorn import *
from unicorn.x86_const import *
PAGE_SIZE = 0x1000
PAGE_MASK = ~(PAGE_SIZE - 1)
def get_block_insns(start_ea, end_ea):
insns = []
head = start_ea
while head < end_ea and head != idaapi.BADADDR:
mnem = idc.print_insn_mnem(head)
disasm = idc.GetDisasm(head)
insns.append((head, mnem, disasm))
head = idc.next_head(head, end_ea)
return insns
def detect_dispatcher(func):
"""Detect staging + dispatcher vars from the 2nd basic block."""
blocks = [(b.start_ea, b.end_ea) for b in idaapi.FlowChart(func)]
blocks.sort(key=lambda b: b[0])
if len(blocks) < 2:
return None, None, None
second_start = blocks[1][0]
for _ in range(5):
if idc.print_insn_mnem(second_start) == "jmp":
second_start = idc.get_operand_value(second_start, 0)
else:
break
block_end = second_start + 0x100
for s, e in blocks:
if s == second_start:
block_end = e
break
insns = get_block_insns(second_start, block_end)
for idx in range(len(insns) - 3):
_, m0, d0 = insns[idx]
_, m1, d1 = insns[idx + 1]
_, m2, d2 = insns[idx + 2]
_, m3, d3 = insns[idx + 3]
if m0 != "mov" or m1 != "mov" or m2 != "sub" or m3 != "jz":
continue
if "eax" not in d0 or "rbp" not in d0:
continue
if "eax" not in d1 or "rbp" not in d1:
continue
s = re.search(r'\[rbp\+(?:\w+\+)?(var_\w+)\]', d0)
d = re.search(r'\[rbp\+(?:\w+\+)?(var_\w+)\]', d1)
if s and d and s.group(1) != d.group(1):
return s.group(1), d.group(1), second_start # staging, dispatcher, dispatcher_ea
return None, None, None
def detect_leaves(func, staging_var, dispatcher_var):
"""Find all BBLs that jump to the last block of the function (dispatcher loop-back).
Split into real leaves (reference other var_ besides staging/dispatcher)
and routing blocks (only touch staging/dispatcher var)."""
blocks = [(b.start_ea, b.end_ea) for b in idaapi.FlowChart(func)]
blocks.sort(key=lambda b: b[0])
last_block_start = blocks[-1][0]
dispatch_vars = {staging_var, dispatcher_var}
real_leaves = []
routing_blocks = []
for start_ea, end_ea in blocks:
insns = get_block_insns(start_ea, end_ea)
if not insns:
continue
last_ea, last_mnem, _ = insns[-1]
is_loopback = last_mnem == "jmp" and idc.get_operand_value(last_ea, 0) == last_block_start
is_ret = last_mnem in ("retn", "ret")
if not is_loopback and not is_ret:
continue
# ret blocks are always real leaves
if is_ret:
real_leaves.append((start_ea, end_ea))
continue
# Collect all var_ references in this block
vars_used = set()
for _, _, dis in insns:
for m in re.finditer(r'var_\w+', dis):
vars_used.add(m.group(0))
# If the only vars referenced are staging/dispatcher, it's a routing block
if vars_used - dispatch_vars:
real_leaves.append((start_ea, end_ea))
else:
routing_blocks.append((start_ea, end_ea))
return real_leaves, routing_blocks, last_block_start
def get_var_displacement(func, var_name):
"""Get the signed RBP displacement for a stack variable."""
for block in idaapi.FlowChart(func):
insns = get_block_insns(block.start_ea, block.end_ea)
for ea, mnem, disasm in insns:
if var_name not in disasm:
continue
for op_idx in (0, 1):
op_type = idc.get_operand_type(ea, op_idx)
if op_type in (idc.o_displ, idc.o_phrase):
val = idc.get_operand_value(ea, op_idx)
if val > 0x7FFFFFFFFFFFFFFF:
val -= 0x10000000000000000
return val
return None
def build_dispatch_map(func, dispatcher_var):
"""Build map: sub_value -> jz_target from the dispatcher's sub/jz chain."""
dispatch_map = {}
for block in idaapi.FlowChart(func):
insns = get_block_insns(block.start_ea, block.end_ea)
if len(insns) < 3:
continue
for idx in range(len(insns) - 2):
_, m0, d0 = insns[idx]
ea1, m1, d1 = insns[idx + 1]
ea2, m2, d2 = insns[idx + 2]
if m0 != "mov" or m1 != "sub" or m2 != "jz":
continue
if dispatcher_var not in d0 or "eax" not in d0:
continue
sub_val = idc.get_operand_value(ea1, 1) & 0xFFFFFFFF
jz_target = idc.get_operand_value(ea2, 0)
dispatch_map[sub_val] = jz_target
return dispatch_map
def collect_global_addrs(insns):
"""Extract addresses of all cs:dword references."""
addrs = set()
for ea, mnem, dis in insns:
if "cs:" not in dis:
continue
for op_idx in (0, 1):
if idc.get_operand_type(ea, op_idx) == idc.o_mem:
addr = idc.get_operand_value(ea, op_idx)
if addr and addr != idaapi.BADADDR:
addrs.add(addr)
return addrs
def emulate_blocks(block_start, block_end, staging_disp, func, dispatcher_ea,
rbp_val=None, stack_snapshot=None):
"""Emulate across multiple BBLs following jmp targets until we reach the dispatcher.
Returns (staging_value, rbp_val, stack_snapshot)."""
# Build block map for the function
block_map = {}
for b in idaapi.FlowChart(func):
block_map[b.start_ea] = (b.start_ea, b.end_ea)
# Collect all instructions and globals across all blocks we'll visit
all_globals = {}
stack_base = 0x80000
stack_size = 0x10000
if rbp_val is None:
rbp_val = stack_base + stack_size // 2
try:
mu = Uc(UC_ARCH_X86, UC_MODE_64)
# Map the entire function's code region
func_obj = idaapi.get_func(block_start)
func_start = func_obj.start_ea
func_end = func_obj.end_ea
func_page = func_start & PAGE_MASK
func_map_size = ((func_end - func_page + PAGE_SIZE) & PAGE_MASK) or PAGE_SIZE
mu.mem_map(func_page, func_map_size)
# Write all function bytes
func_bytes = idaapi.get_bytes(func_start, func_end - func_start)
if func_bytes:
mu.mem_write(func_start, func_bytes)
mapped_pages = set()
for p in range(func_page, func_page + func_map_size, PAGE_SIZE):
mapped_pages.add(p)
# Map stack
if (stack_base & PAGE_MASK) not in mapped_pages:
mu.mem_map(stack_base, stack_size)
if stack_snapshot:
mu.mem_write(stack_base, stack_snapshot)
mu.reg_write(UC_X86_REG_RBP, rbp_val)
mu.reg_write(UC_X86_REG_RSP, rbp_val - 0x200)
# Walk blocks
current_start = block_start
visited = set()
while current_start is not None and current_start not in visited:
# Stop if we reached the dispatcher
if current_start == dispatcher_ea:
print(f" [emu] Reached dispatcher at {dispatcher_ea:#x}, stopping")
break
blk = block_map.get(current_start)
if not blk:
# current_start may be mid-block; find the containing block
containing = None
for bs, be in block_map.values():
if bs <= current_start < be:
containing = (current_start, be)
break
if not containing:
print(f" [emu] No block at {current_start:#x}, stopping")
break
blk = containing
visited.add(current_start)
b_start, b_end = blk
insns = get_block_insns(b_start, b_end)
if not insns:
break
# Map globals referenced by this block
for addr in collect_global_addrs(insns):
page = addr & PAGE_MASK
if page not in mapped_pages:
try:
mu.mem_map(page, PAGE_SIZE)
mapped_pages.add(page)
except UcError:
pass
val = idaapi.get_dword(addr)
mu.mem_write(addr, struct.pack("<I", val & 0xFFFFFFFF))
# Determine what to emulate and where to go next
last_ea, last_mnem, last_dis = insns[-1]
if last_mnem == "jmp":
emu_insns = insns[:-1]
current_start = idc.get_operand_value(last_ea, 0)
else:
emu_insns = insns
current_start = b_end # fallthrough
#print(f" [emu] {b_start:#x} - {b_end:#x} ({len(emu_insns)} insns)")
for ea, mnem, disasm in emu_insns:
insn_size = idc.get_item_size(ea)
try:
mu.emu_start(ea, ea + insn_size)
rax = mu.reg_read(UC_X86_REG_RAX)
rcx = mu.reg_read(UC_X86_REG_RCX)
rdx = mu.reg_read(UC_X86_REG_RDX)
#print(f" {ea:#x}: {disasm:50s} rax={rax:#x} rcx={rcx:#x} rdx={rdx:#x}")
except UcError as e:
print(f" {ea:#x}: {disasm:50s} [SKIP: {e}]")
# Read staging var using actual RBP (may have changed via lea rbp, [rsp+X])
actual_rbp = mu.reg_read(UC_X86_REG_RBP)
#print(f" [emu] actual RBP={actual_rbp:#x} (initial was {rbp_val:#x})")
result_bytes = mu.mem_read(actual_rbp + staging_disp, 4)
result = struct.unpack("<I", bytes(result_bytes))[0]
snap = bytes(mu.mem_read(stack_base, stack_size))
return result, actual_rbp, snap
except UcError as e:
print(f" [!] Unicorn error at {block_start:#x}: {e}")
return None, None, None
def resolve_to_real_leaf(initial_value, dispatch_map, real_leaf_set, routing_map,
staging_disp, func, dispatcher_ea, rbp_val, stack_snapshot):
"""Follow routing blocks from initial_value until we reach a real leaf."""
current = initial_value
visited = set()
while current is not None and current not in visited:
if current not in dispatch_map:
return None
visited.add(current)
target_ea = dispatch_map[current]
# Real leaf? Done.
if target_ea in real_leaf_set:
return target_ea
# Routing block? Emulate to get next value.
block = routing_map.get(target_ea)
if not block:
return None
block_start, block_end = block
next_val, rbp_val, stack_snapshot = emulate_blocks(
block_start, block_end, staging_disp, func, dispatcher_ea,
rbp_val, stack_snapshot)
current = next_val
return None
def resolve_value_by_emulation(staging_value, staging_disp, func, real_leaf_set,
loopback_ea, rbp_val, stack_snapshot):
"""Resolve a staging value to a real leaf by emulating through the dispatcher.
Sets the staging var, then lets Unicorn execute from loopback_ea through
the dispatcher's conditional branches until we land on a real leaf."""
func_obj = idaapi.get_func(loopback_ea)
func_start = func_obj.start_ea
func_end = func_obj.end_ea
stack_base = 0x80000
stack_size = 0x10000
if rbp_val is None:
rbp_val = stack_base + stack_size // 2
try:
mu = Uc(UC_ARCH_X86, UC_MODE_64)
# Map function code
func_page = func_start & PAGE_MASK
func_map_size = ((func_end - func_page + PAGE_SIZE) & PAGE_MASK) or PAGE_SIZE
mu.mem_map(func_page, func_map_size)
func_bytes = idaapi.get_bytes(func_start, func_end - func_start)
if func_bytes:
mu.mem_write(func_start, func_bytes)
mapped_pages = set()
for p in range(func_page, func_page + func_map_size, PAGE_SIZE):
mapped_pages.add(p)
# Map globals used by the function
for block in idaapi.FlowChart(func):
insns = get_block_insns(block.start_ea, block.end_ea)
for addr in collect_global_addrs(insns):
page = addr & PAGE_MASK
if page not in mapped_pages:
try:
mu.mem_map(page, PAGE_SIZE)
mapped_pages.add(page)
except UcError:
pass
val = idaapi.get_dword(addr)
mu.mem_write(addr, struct.pack("<I", val & 0xFFFFFFFF))
# Map stack
if (stack_base & PAGE_MASK) not in mapped_pages:
mu.mem_map(stack_base, stack_size)
if stack_snapshot:
mu.mem_write(stack_base, stack_snapshot)
mu.reg_write(UC_X86_REG_RBP, rbp_val)
mu.reg_write(UC_X86_REG_RSP, rbp_val - 0x200)
# Write the staging value
mu.mem_write(rbp_val + staging_disp, struct.pack("<I", staging_value & 0xFFFFFFFF))
# Use a code hook to stop when we enter a real leaf
result = [None]
def hook_code(uc, address, size, user_data):
if address in real_leaf_set:
result[0] = address
uc.emu_stop()
mu.hook_add(UC_HOOK_CODE, hook_code)
mu.emu_start(loopback_ea, 0, timeout=0, count=2000)
return result[0]
except UcError as e:
print(f" [!] Emulation resolve error for {staging_value:#x}: {e}")
return None
def reanalyze_range(start_ea, end_ea):
"""Force IDA to re-analyze a patched byte range."""
idc.del_items(start_ea, 0, end_ea - start_ea)
ea = start_ea
while ea < end_ea:
insn_len = idc.create_insn(ea)
if insn_len == 0:
ea += 1
else:
ea += insn_len
def patch_jmp(patch_ea, target_ea):
"""Write jmp <target> at patch_ea."""
rel32 = target_ea - (patch_ea + 5)
if not (-0x80000000 <= rel32 <= 0x7FFFFFFF):
print(f" [!] Jump out of range at {patch_ea:#x}")
return False
idaapi.patch_byte(patch_ea, 0xE9)
for i, b in enumerate(struct.pack("<i", rel32)):
idaapi.patch_byte(patch_ea + 1 + i, b)
return True
def patch_jnz(patch_ea, target_ea):
"""Write jnz <target> at patch_ea (6 bytes: 0F 85 rel32)."""
rel32 = target_ea - (patch_ea + 6)
if not (-0x80000000 <= rel32 <= 0x7FFFFFFF):
print(f" [!] Jnz out of range at {patch_ea:#x}")
return False
idaapi.patch_byte(patch_ea, 0x0F)
idaapi.patch_byte(patch_ea + 1, 0x85)
for i, b in enumerate(struct.pack("<i", rel32)):
idaapi.patch_byte(patch_ea + 2 + i, b)
return True
def patch_jz(patch_ea, target_ea):
"""Write jz <target> at patch_ea (6 bytes: 0F 84 rel32)."""
rel32 = target_ea - (patch_ea + 6)
if not (-0x80000000 <= rel32 <= 0x7FFFFFFF):
print(f" [!] Jz out of range at {patch_ea:#x}")
return False
idaapi.patch_byte(patch_ea, 0x0F)
idaapi.patch_byte(patch_ea + 1, 0x84)
for i, b in enumerate(struct.pack("<i", rel32)):
idaapi.patch_byte(patch_ea + 2 + i, b)
return True
def find_staging_mov(insns, staging_var):
"""Find last mov [rbp+staging_var], <imm> scanning backwards. Returns ea or None."""
for i in range(len(insns) - 1, -1, -1):
ea, mnem, dis = insns[i]
if mnem == "mov" and staging_var in dis and "[rbp" in dis:
if idc.get_operand_type(ea, 1) == idc.o_imm:
return ea
return None
def simplify_first_bbl(func, staging_var, dispatcher_var, staging_disp,
dispatch_map, real_leaf_set, routing_map, dispatcher_ea):
"""Emulate the first BBL, resolve its dispatch value to a real leaf,
patch the mov [rbp+staging], imm to a jmp to that leaf."""
blocks = [(b.start_ea, b.end_ea) for b in idaapi.FlowChart(func)]
blocks.sort(key=lambda b: b[0])
first_start, first_end = blocks[0]
# Emulate from first BBL, following jmps until dispatcher
initial_val, rbp_val, stack_snap = emulate_blocks(
first_start, first_end, staging_disp, func, dispatcher_ea)
if initial_val is None:
print(f"[!] Failed to emulate first BBL")
return rbp_val, stack_snap
print(f"[+] First BBL initial dispatch value: {initial_val:#x}")
# Resolve through routing blocks to real leaf
real_target = resolve_to_real_leaf(
initial_val, dispatch_map, real_leaf_set, routing_map,
staging_disp, func, dispatcher_ea, rbp_val, stack_snap)
if real_target is None:
print(f"[!] Could not resolve to a real leaf")
return rbp_val, stack_snap
target_name = idc.get_name(real_target) or f"{real_target:#x}"
print(f"[+] Resolves to: {target_name} ({real_target:#x})")
insns = get_block_insns(first_start, first_end)
patch_ea = find_staging_mov(insns, staging_var)
if patch_ea is None:
print(f"[!] Could not find mov [rbp+{staging_var}], imm in first BBL")
return rbp_val, stack_snap
if patch_jmp(patch_ea, real_target):
# NOP the rest of the block after the jmp
for addr in range(patch_ea + 5, first_end):
idaapi.patch_byte(addr, 0xcc)
reanalyze_range(patch_ea, first_end)
print(f"[+] Patched {patch_ea:#x}: jmp {target_name}")
return rbp_val, stack_snap
def simplify_leaves(func, staging_var, staging_disp, dispatch_map,
real_leaves, real_leaf_set, routing_map,
dispatcher_ea, rbp_val, stack_snapshot):
"""For each real leaf ending with mov [rbp+staging], <val> / jmp dispatcher,
emulate the leaf to get next dispatch value, resolve to real leaf, patch."""
patched = 0
for leaf_start, leaf_end in real_leaves:
insns = get_block_insns(leaf_start, leaf_end)
if not insns:
continue
# Must end with jmp (not ret)
if insns[-1][1] not in ("jmp",):
continue
# Must have a mov to staging var before the jmp
# Can be mov [rbp+staging], imm OR mov [rbp+staging], reg
staging_mov_ea = None
for i in range(len(insns) - 2, -1, -1):
ea, mnem, dis = insns[i]
if mnem == "mov" and staging_var in dis and "[rbp" in dis:
staging_mov_ea = ea
break
if staging_mov_ea is None:
continue
# Detect opaque predicate: mov ecx, cs:dword / mov eax, cs:dword / mov edx, ecx
opaque_start_ea = None
for i in range(len(insns) - 2):
ea0, m0, d0 = insns[i]
ea1, m1, d1 = insns[i + 1]
ea2, m2, d2 = insns[i + 2]
if (m0 == "mov" and "ecx" in d0 and "cs:" in d0
and m1 == "mov" and "eax" in d1 and "cs:" in d1
and m2 == "mov" and "edx" in d2 and "ecx" in d2):
opaque_start_ea = ea0
break
# Skip blocks that match the conditional opaque pattern WITHOUT cs:dword preamble
# (those are genuinely conditional and handled by simplify_conditional_opaques)
if opaque_start_ea is None and detect_conditional_opaque(insns, staging_var) is not None:
continue
# Patch point: opaque predicate start if present, else the staging mov
patch_ea = opaque_start_ea if opaque_start_ea else staging_mov_ea
leaf_name = idc.get_name(leaf_start) or f"{leaf_start:#x}"
# Emulate from opaque predicate start if present (self-contained),
# otherwise from block start
emu_start = opaque_start_ea if opaque_start_ea else leaf_start
next_val, _, _ = emulate_blocks(
emu_start, leaf_end, staging_disp, func, dispatcher_ea,
rbp_val, stack_snapshot)
if next_val is None:
print(f" [!] {leaf_name}: emulation failed")
continue
# Resolve through routing blocks to real leaf
real_target = resolve_to_real_leaf(
next_val, dispatch_map, real_leaf_set, routing_map,
staging_disp, func, dispatcher_ea, rbp_val, stack_snapshot)
if real_target is None:
print(f" [!] {leaf_name}: could not resolve {next_val:#x} to real leaf")
continue
target_name = idc.get_name(real_target) or f"{real_target:#x}"
if patch_jmp(patch_ea, real_target):
for addr in range(patch_ea + 5, leaf_end):
idaapi.patch_byte(addr, 0xcc)
reanalyze_range(patch_ea, leaf_end)
patched += 1
print(f" Patched {leaf_start:#x}: jmp {target_name}")
print(f"\n[+] Patched {patched}/{len(real_leaves)} real leaves")
return patched
def find_opaque_computation_start(insns, tail_idx):
"""Scan backwards from the conditional opaque tail to find where the opaque
computation begins (mov ecx, cs:dword / mov eax, cs:dword pattern).
Returns the index of the first opaque computation instruction, or tail_idx
if no computation prefix is found."""
opaque_mnems = {"mov", "xor", "and", "or", "test", "cmp", "not",
"shr", "shl", "sub", "add", "imul", "setz", "setnz",
"setl", "setg", "setle", "setge", "setnl", "setng",
"setb", "setnb", "seta", "setna"}
start_idx = tail_idx
for i in range(tail_idx - 1, -1, -1):
_, mnem_i, dis_i = insns[i]
if mnem_i not in opaque_mnems:
break
# Stop at any instruction that references stack vars (real code)
# Opaque computations only use cs:dword globals and register-only arithmetic
if "[rbp" in dis_i and "cs:" not in dis_i:
break
# Stop at memory writes via register (e.g. mov [rax], ecx)
if mnem_i == "mov" and re.match(r'mov\s+\[r', dis_i) and 'rbp' not in dis_i:
break
start_idx = i
return start_idx
def detect_conditional_opaque(insns, staging_var):
"""Detect the conditional opaque predicate pattern in a block:
mov eax, IMM_A
mov ecx, IMM_B
test <something>, 1
cmovnz eax, ecx
mov [rbp+staging_var], eax
jmp <dispatcher>
Returns (pred_start_ea, test_ea, test_size, imm_a, imm_b, comp_start_ea) or None.
comp_start_ea is where the opaque computation begins (may be earlier than pred_start_ea
if there is a cs:dword arithmetic preamble).
"""
# Scan backwards from end: jmp, mov [rbp+staging], eax, cmovnz, test, mov ecx imm, mov eax imm
if len(insns) < 6:
return None
_, m_last, _ = insns[-1]
if m_last != "jmp":
return None
_, m_store, d_store = insns[-2]
if m_store != "mov" or staging_var not in d_store or "[rbp" not in d_store or "eax" not in d_store:
return None
_, m_cmov, _ = insns[-3]
if not m_cmov.startswith("cmovnz"):
return None
ea_test, m_test, d_test = insns[-4]
if m_test != "test":
return None
# Must be testing something against 1
if "1" not in d_test:
return None
ea_ecx, m_ecx, d_ecx = insns[-5]
if m_ecx != "mov" or "ecx" not in d_ecx:
return None
if idc.get_operand_type(ea_ecx, 1) != idc.o_imm:
return None
imm_b = idc.get_operand_value(ea_ecx, 1) & 0xFFFFFFFF
ea_eax, m_eax, d_eax = insns[-6]
if m_eax != "mov" or "eax" not in d_eax:
return None
if idc.get_operand_type(ea_eax, 1) != idc.o_imm:
return None
imm_a = idc.get_operand_value(ea_eax, 1) & 0xFFFFFFFF
test_size = idc.get_item_size(ea_test)
# Scan backwards from the tail to find opaque computation start (cs:dword arithmetic)
tail_idx = len(insns) - 6
comp_start_idx = find_opaque_computation_start(insns, tail_idx)
comp_start_ea = insns[comp_start_idx][0]
return ea_eax, ea_test, test_size, imm_a, imm_b, comp_start_ea
def simplify_conditional_opaques(func, staging_var, staging_disp, dispatch_map,
real_leaves, real_leaf_set, routing_map,
dispatcher_ea, loopback_ea,
rbp_val, stack_snapshot):
"""Find leaves with conditional opaque predicates (mov eax,imm / mov ecx,imm /
test / cmovnz / mov staging / jmp).
If an opaque computation preamble (cs:dword arithmetic) precedes the tail,
emulate from there to resolve the predicate. If it always takes one branch,
patch with a single jmp from the computation start. Otherwise fall back to
test / jz target_A / jmp target_B."""
patched = 0
for leaf_start, leaf_end in real_leaves:
insns = get_block_insns(leaf_start, leaf_end)
if not insns:
continue
result = detect_conditional_opaque(insns, staging_var)
if result is None:
continue
pred_start_ea, test_ea, test_size, imm_a, imm_b, comp_start_ea = result
leaf_name = idc.get_name(leaf_start) or f"{leaf_start:#x}"
# Resolve both dispatch values to real leaves
# Try dispatch_map first, fall back to full emulation through dispatcher
target_a = resolve_to_real_leaf(
imm_a, dispatch_map, real_leaf_set, routing_map,
staging_disp, func, dispatcher_ea, rbp_val, stack_snapshot)
if target_a is None:
target_a = resolve_value_by_emulation(
imm_a, staging_disp, func, real_leaf_set,
loopback_ea, rbp_val, stack_snapshot)
target_b = resolve_to_real_leaf(
imm_b, dispatch_map, real_leaf_set, routing_map,
staging_disp, func, dispatcher_ea, rbp_val, stack_snapshot)
if target_b is None:
target_b = resolve_value_by_emulation(
imm_b, staging_disp, func, real_leaf_set,
loopback_ea, rbp_val, stack_snapshot)
if target_a is None:
print(f" [!] {leaf_name}: could not resolve imm_a {imm_a:#x}")
continue
if target_b is None:
print(f" [!] {leaf_name}: could not resolve imm_b {imm_b:#x}")
continue
name_a = idc.get_name(target_a) or f"{target_a:#x}"
name_b = idc.get_name(target_b) or f"{target_b:#x}"
# If there is an opaque computation preamble before the tail,
# emulate from its start to resolve the predicate deterministically
if comp_start_ea < pred_start_ea:
resolved, _, _ = emulate_blocks(
comp_start_ea, leaf_end, staging_disp, func, dispatcher_ea,
rbp_val, stack_snapshot)
if resolved is not None:
# Determine which branch was taken
if resolved == imm_a:
winner_target, winner_name = target_a, name_a
elif resolved == imm_b:
winner_target, winner_name = target_b, name_b
else:
# Resolved to something else, try full resolution
winner_target = resolve_to_real_leaf(
resolved, dispatch_map, real_leaf_set, routing_map,
staging_disp, func, dispatcher_ea, rbp_val, stack_snapshot)
if winner_target is None:
winner_target = resolve_value_by_emulation(
resolved, staging_disp, func, real_leaf_set,
loopback_ea, rbp_val, stack_snapshot)
winner_name = idc.get_name(winner_target) or f"{winner_target:#x}" if winner_target else None
if winner_target is not None:
# Patch from computation start with single jmp
if patch_jmp(comp_start_ea, winner_target):
#for addr in range(comp_start_ea + 5, leaf_end):
# idaapi.patch_byte(addr, 0x90)
reanalyze_range(comp_start_ea, leaf_end)
patched += 1
print(f" Patched {leaf_start:#x}: jmp {winner_name} (opaque always resolved to {resolved:#x})")
continue
# Genuinely conditional — rewrite as test / jz target_a / jmp target_b
# cmovnz: when Z, eax=imm_a; when NZ, eax=imm_b
# So jz → target_a (zero case), fallthrough jmp → target_b (non-zero case)
test_bytes = idaapi.get_bytes(test_ea, test_size)
if not test_bytes:
continue
write_ea = pred_start_ea
# Write the test instruction
for i, b in enumerate(test_bytes):
idaapi.patch_byte(write_ea + i, b)
write_ea += test_size
# Write jz target_a (zero → cmovnz didn't fire → eax = imm_a)
if not patch_jz(write_ea, target_a):
continue
write_ea += 6
# Write jmp target_b (non-zero → cmovnz fired → eax = imm_b)
if not patch_jmp(write_ea, target_b):
continue
# NOP the rest of the block
patch_end_ea = write_ea + 5
for addr in range(patch_end_ea, leaf_end):
idaapi.patch_byte(addr, 0xcc)
reanalyze_range(pred_start_ea, leaf_end)
patched += 1
print(f" Patched {leaf_start:#x}: test / jz {name_a} / jmp {name_b}")
print(f"\n[+] Patched {patched} conditional opaque predicate(s)")
return patched
def wipe_non_leaf_blocks(all_blocks, leaf_set, first_bbl_start):
"""NOP out every BBL that is not a leaf (neither real leaf nor routing block).
all_blocks is the list of (start_ea, end_ea) collected BEFORE any patching.
Preserves the first BBL and all leaves."""
wiped = 0
for start_ea, end_ea in all_blocks:
if start_ea == first_bbl_start:
continue
if start_ea in leaf_set:
continue
for addr in range(start_ea, end_ea):
idaapi.patch_byte(addr, 0xcc)
# Undefine and mark as data bytes so IDA doesn't try to disassemble
idc.del_items(start_ea, 0, end_ea - start_ea)
idaapi.create_data(start_ea, idaapi.FF_BYTE, end_ea - start_ea, idaapi.BADNODE)
wiped += 1
print(f"\n[+] Wiped {wiped} non-leaf block(s)")
def analyze(ea=None):
if ea is None:
ea = idc.here()
func = idaapi.get_func(ea)
if not func:
print(f"[!] No function at {ea:#x}")
return
func_name = idc.get_func_name(func.start_ea)
print(f"\n{'='*60}")
print(f"Analyzing {func_name}")
print(f"{'='*60}")
# Detect dispatcher
staging, dispatcher, dispatcher_ea = detect_dispatcher(func)
if staging:
print(f"[+] Dispatcher block at {dispatcher_ea:#x}")
print(f"[+] Staging var: {staging}")
print(f"[+] Dispatcher var: {dispatcher}")
else:
print("[!] Could not detect dispatcher vars")
return
# Get staging displacement
staging_disp = get_var_displacement(func, staging)
if staging_disp is None:
print(f"[!] Could not resolve displacement for {staging}")
return
print(f"[+] {staging} displacement: {staging_disp:#x}")
# Detect leaves
real_leaves, routing_blocks, loopback_ea = detect_leaves(func, staging, dispatcher)
print(f"\n[+] Loop-back block: {loopback_ea:#x}")
print(f"[+] Real leaves: {len(real_leaves)} | Routing blocks: {len(routing_blocks)}\n")
print(f" Real leaves:")
for start_ea, end_ea in real_leaves:
name = idc.get_name(start_ea) or f"{start_ea:#x}"
print(f" {name:30s} {start_ea:#x} - {end_ea:#x}")
print(f"\n Routing blocks:")
for start_ea, end_ea in routing_blocks:
name = idc.get_name(start_ea) or f"{start_ea:#x}"
print(f" {name:30s} {start_ea:#x} - {end_ea:#x}")
# Build dispatch map and lookup tables
dispatch_map = build_dispatch_map(func, dispatcher)
print(f"\n[+] Dispatch map: {len(dispatch_map)} entries")
real_leaf_set = set(s for s, e in real_leaves)
routing_map = {s: (s, e) for s, e in routing_blocks}
# Snapshot all blocks BEFORE any patching (FlowChart will change after patches)
all_blocks = [(b.start_ea, b.end_ea) for b in idaapi.FlowChart(func)]
# Simplify first BBL
print(f"\n[+] Simplifying first BBL...")
rbp_val, stack_snap = simplify_first_bbl(
func, staging, dispatcher, staging_disp,
dispatch_map, real_leaf_set, routing_map, dispatcher_ea)
# Simplify all real leaves (loop until no more patches)
round_num = 0
while True:
round_num += 1
print(f"\n[+] Simplifying real leaves (round {round_num})...")
count = simplify_leaves(func, staging, staging_disp, dispatch_map,
real_leaves, real_leaf_set, routing_map,
dispatcher_ea, rbp_val, stack_snap)
if count == 0:
break
# Simplify routing blocks (they can also contain opaque predicates)
print(f"\n[+] Simplifying routing blocks...")
simplify_leaves(func, staging, staging_disp, dispatch_map,
routing_blocks, real_leaf_set, routing_map,
dispatcher_ea, rbp_val, stack_snap)
# Simplify conditional opaque predicates (mov/mov/test/cmovnz pattern)
# Run on both real leaves and routing blocks
all_leaves = real_leaves + routing_blocks
print(f"\n[+] Simplifying conditional opaque predicates...")
simplify_conditional_opaques(func, staging, staging_disp, dispatch_map,
all_leaves, real_leaf_set, routing_map,
dispatcher_ea, loopback_ea,
rbp_val, stack_snap)
# Wipe every BBL that is not a real leaf (routing blocks, dispatcher, etc.)
first_bbl_start = min(s for s, _ in all_blocks)
wipe_non_leaf_blocks(all_blocks, real_leaf_set, first_bbl_start)
reanalyze_range(
min(s for s, _ in real_leaves),
max(e for _, e in real_leaves)
)
if __name__ == "__main__":
analyze()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment