Created
March 19, 2026 14:30
-
-
Save pierrehpezier/35c90c38b2c579d612d5970b04702df8 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Copyright (c) 2026 Nextron Systems | |
| # Author: Pierre-Henri Pezier | |
| import idaapi | |
| import idautils | |
| import idc | |
| import re | |
| import struct | |
| from unicorn import * | |
| from unicorn.x86_const import * | |
| PAGE_SIZE = 0x1000 | |
| PAGE_MASK = ~(PAGE_SIZE - 1) | |
| def get_block_insns(start_ea, end_ea): | |
| insns = [] | |
| head = start_ea | |
| while head < end_ea and head != idaapi.BADADDR: | |
| mnem = idc.print_insn_mnem(head) | |
| disasm = idc.GetDisasm(head) | |
| insns.append((head, mnem, disasm)) | |
| head = idc.next_head(head, end_ea) | |
| return insns | |
| def detect_dispatcher(func): | |
| """Detect staging + dispatcher vars from the 2nd basic block.""" | |
| blocks = [(b.start_ea, b.end_ea) for b in idaapi.FlowChart(func)] | |
| blocks.sort(key=lambda b: b[0]) | |
| if len(blocks) < 2: | |
| return None, None, None | |
| second_start = blocks[1][0] | |
| for _ in range(5): | |
| if idc.print_insn_mnem(second_start) == "jmp": | |
| second_start = idc.get_operand_value(second_start, 0) | |
| else: | |
| break | |
| block_end = second_start + 0x100 | |
| for s, e in blocks: | |
| if s == second_start: | |
| block_end = e | |
| break | |
| insns = get_block_insns(second_start, block_end) | |
| for idx in range(len(insns) - 3): | |
| _, m0, d0 = insns[idx] | |
| _, m1, d1 = insns[idx + 1] | |
| _, m2, d2 = insns[idx + 2] | |
| _, m3, d3 = insns[idx + 3] | |
| if m0 != "mov" or m1 != "mov" or m2 != "sub" or m3 != "jz": | |
| continue | |
| if "eax" not in d0 or "rbp" not in d0: | |
| continue | |
| if "eax" not in d1 or "rbp" not in d1: | |
| continue | |
| s = re.search(r'\[rbp\+(?:\w+\+)?(var_\w+)\]', d0) | |
| d = re.search(r'\[rbp\+(?:\w+\+)?(var_\w+)\]', d1) | |
| if s and d and s.group(1) != d.group(1): | |
| return s.group(1), d.group(1), second_start # staging, dispatcher, dispatcher_ea | |
| return None, None, None | |
| def detect_leaves(func, staging_var, dispatcher_var): | |
| """Find all BBLs that jump to the last block of the function (dispatcher loop-back). | |
| Split into real leaves (reference other var_ besides staging/dispatcher) | |
| and routing blocks (only touch staging/dispatcher var).""" | |
| blocks = [(b.start_ea, b.end_ea) for b in idaapi.FlowChart(func)] | |
| blocks.sort(key=lambda b: b[0]) | |
| last_block_start = blocks[-1][0] | |
| dispatch_vars = {staging_var, dispatcher_var} | |
| real_leaves = [] | |
| routing_blocks = [] | |
| for start_ea, end_ea in blocks: | |
| insns = get_block_insns(start_ea, end_ea) | |
| if not insns: | |
| continue | |
| last_ea, last_mnem, _ = insns[-1] | |
| is_loopback = last_mnem == "jmp" and idc.get_operand_value(last_ea, 0) == last_block_start | |
| is_ret = last_mnem in ("retn", "ret") | |
| if not is_loopback and not is_ret: | |
| continue | |
| # ret blocks are always real leaves | |
| if is_ret: | |
| real_leaves.append((start_ea, end_ea)) | |
| continue | |
| # Collect all var_ references in this block | |
| vars_used = set() | |
| for _, _, dis in insns: | |
| for m in re.finditer(r'var_\w+', dis): | |
| vars_used.add(m.group(0)) | |
| # If the only vars referenced are staging/dispatcher, it's a routing block | |
| if vars_used - dispatch_vars: | |
| real_leaves.append((start_ea, end_ea)) | |
| else: | |
| routing_blocks.append((start_ea, end_ea)) | |
| return real_leaves, routing_blocks, last_block_start | |
| def get_var_displacement(func, var_name): | |
| """Get the signed RBP displacement for a stack variable.""" | |
| for block in idaapi.FlowChart(func): | |
| insns = get_block_insns(block.start_ea, block.end_ea) | |
| for ea, mnem, disasm in insns: | |
| if var_name not in disasm: | |
| continue | |
| for op_idx in (0, 1): | |
| op_type = idc.get_operand_type(ea, op_idx) | |
| if op_type in (idc.o_displ, idc.o_phrase): | |
| val = idc.get_operand_value(ea, op_idx) | |
| if val > 0x7FFFFFFFFFFFFFFF: | |
| val -= 0x10000000000000000 | |
| return val | |
| return None | |
| def build_dispatch_map(func, dispatcher_var): | |
| """Build map: sub_value -> jz_target from the dispatcher's sub/jz chain.""" | |
| dispatch_map = {} | |
| for block in idaapi.FlowChart(func): | |
| insns = get_block_insns(block.start_ea, block.end_ea) | |
| if len(insns) < 3: | |
| continue | |
| for idx in range(len(insns) - 2): | |
| _, m0, d0 = insns[idx] | |
| ea1, m1, d1 = insns[idx + 1] | |
| ea2, m2, d2 = insns[idx + 2] | |
| if m0 != "mov" or m1 != "sub" or m2 != "jz": | |
| continue | |
| if dispatcher_var not in d0 or "eax" not in d0: | |
| continue | |
| sub_val = idc.get_operand_value(ea1, 1) & 0xFFFFFFFF | |
| jz_target = idc.get_operand_value(ea2, 0) | |
| dispatch_map[sub_val] = jz_target | |
| return dispatch_map | |
| def collect_global_addrs(insns): | |
| """Extract addresses of all cs:dword references.""" | |
| addrs = set() | |
| for ea, mnem, dis in insns: | |
| if "cs:" not in dis: | |
| continue | |
| for op_idx in (0, 1): | |
| if idc.get_operand_type(ea, op_idx) == idc.o_mem: | |
| addr = idc.get_operand_value(ea, op_idx) | |
| if addr and addr != idaapi.BADADDR: | |
| addrs.add(addr) | |
| return addrs | |
| def emulate_blocks(block_start, block_end, staging_disp, func, dispatcher_ea, | |
| rbp_val=None, stack_snapshot=None): | |
| """Emulate across multiple BBLs following jmp targets until we reach the dispatcher. | |
| Returns (staging_value, rbp_val, stack_snapshot).""" | |
| # Build block map for the function | |
| block_map = {} | |
| for b in idaapi.FlowChart(func): | |
| block_map[b.start_ea] = (b.start_ea, b.end_ea) | |
| # Collect all instructions and globals across all blocks we'll visit | |
| all_globals = {} | |
| stack_base = 0x80000 | |
| stack_size = 0x10000 | |
| if rbp_val is None: | |
| rbp_val = stack_base + stack_size // 2 | |
| try: | |
| mu = Uc(UC_ARCH_X86, UC_MODE_64) | |
| # Map the entire function's code region | |
| func_obj = idaapi.get_func(block_start) | |
| func_start = func_obj.start_ea | |
| func_end = func_obj.end_ea | |
| func_page = func_start & PAGE_MASK | |
| func_map_size = ((func_end - func_page + PAGE_SIZE) & PAGE_MASK) or PAGE_SIZE | |
| mu.mem_map(func_page, func_map_size) | |
| # Write all function bytes | |
| func_bytes = idaapi.get_bytes(func_start, func_end - func_start) | |
| if func_bytes: | |
| mu.mem_write(func_start, func_bytes) | |
| mapped_pages = set() | |
| for p in range(func_page, func_page + func_map_size, PAGE_SIZE): | |
| mapped_pages.add(p) | |
| # Map stack | |
| if (stack_base & PAGE_MASK) not in mapped_pages: | |
| mu.mem_map(stack_base, stack_size) | |
| if stack_snapshot: | |
| mu.mem_write(stack_base, stack_snapshot) | |
| mu.reg_write(UC_X86_REG_RBP, rbp_val) | |
| mu.reg_write(UC_X86_REG_RSP, rbp_val - 0x200) | |
| # Walk blocks | |
| current_start = block_start | |
| visited = set() | |
| while current_start is not None and current_start not in visited: | |
| # Stop if we reached the dispatcher | |
| if current_start == dispatcher_ea: | |
| print(f" [emu] Reached dispatcher at {dispatcher_ea:#x}, stopping") | |
| break | |
| blk = block_map.get(current_start) | |
| if not blk: | |
| # current_start may be mid-block; find the containing block | |
| containing = None | |
| for bs, be in block_map.values(): | |
| if bs <= current_start < be: | |
| containing = (current_start, be) | |
| break | |
| if not containing: | |
| print(f" [emu] No block at {current_start:#x}, stopping") | |
| break | |
| blk = containing | |
| visited.add(current_start) | |
| b_start, b_end = blk | |
| insns = get_block_insns(b_start, b_end) | |
| if not insns: | |
| break | |
| # Map globals referenced by this block | |
| for addr in collect_global_addrs(insns): | |
| page = addr & PAGE_MASK | |
| if page not in mapped_pages: | |
| try: | |
| mu.mem_map(page, PAGE_SIZE) | |
| mapped_pages.add(page) | |
| except UcError: | |
| pass | |
| val = idaapi.get_dword(addr) | |
| mu.mem_write(addr, struct.pack("<I", val & 0xFFFFFFFF)) | |
| # Determine what to emulate and where to go next | |
| last_ea, last_mnem, last_dis = insns[-1] | |
| if last_mnem == "jmp": | |
| emu_insns = insns[:-1] | |
| current_start = idc.get_operand_value(last_ea, 0) | |
| else: | |
| emu_insns = insns | |
| current_start = b_end # fallthrough | |
| #print(f" [emu] {b_start:#x} - {b_end:#x} ({len(emu_insns)} insns)") | |
| for ea, mnem, disasm in emu_insns: | |
| insn_size = idc.get_item_size(ea) | |
| try: | |
| mu.emu_start(ea, ea + insn_size) | |
| rax = mu.reg_read(UC_X86_REG_RAX) | |
| rcx = mu.reg_read(UC_X86_REG_RCX) | |
| rdx = mu.reg_read(UC_X86_REG_RDX) | |
| #print(f" {ea:#x}: {disasm:50s} rax={rax:#x} rcx={rcx:#x} rdx={rdx:#x}") | |
| except UcError as e: | |
| print(f" {ea:#x}: {disasm:50s} [SKIP: {e}]") | |
| # Read staging var using actual RBP (may have changed via lea rbp, [rsp+X]) | |
| actual_rbp = mu.reg_read(UC_X86_REG_RBP) | |
| #print(f" [emu] actual RBP={actual_rbp:#x} (initial was {rbp_val:#x})") | |
| result_bytes = mu.mem_read(actual_rbp + staging_disp, 4) | |
| result = struct.unpack("<I", bytes(result_bytes))[0] | |
| snap = bytes(mu.mem_read(stack_base, stack_size)) | |
| return result, actual_rbp, snap | |
| except UcError as e: | |
| print(f" [!] Unicorn error at {block_start:#x}: {e}") | |
| return None, None, None | |
| def resolve_to_real_leaf(initial_value, dispatch_map, real_leaf_set, routing_map, | |
| staging_disp, func, dispatcher_ea, rbp_val, stack_snapshot): | |
| """Follow routing blocks from initial_value until we reach a real leaf.""" | |
| current = initial_value | |
| visited = set() | |
| while current is not None and current not in visited: | |
| if current not in dispatch_map: | |
| return None | |
| visited.add(current) | |
| target_ea = dispatch_map[current] | |
| # Real leaf? Done. | |
| if target_ea in real_leaf_set: | |
| return target_ea | |
| # Routing block? Emulate to get next value. | |
| block = routing_map.get(target_ea) | |
| if not block: | |
| return None | |
| block_start, block_end = block | |
| next_val, rbp_val, stack_snapshot = emulate_blocks( | |
| block_start, block_end, staging_disp, func, dispatcher_ea, | |
| rbp_val, stack_snapshot) | |
| current = next_val | |
| return None | |
| def resolve_value_by_emulation(staging_value, staging_disp, func, real_leaf_set, | |
| loopback_ea, rbp_val, stack_snapshot): | |
| """Resolve a staging value to a real leaf by emulating through the dispatcher. | |
| Sets the staging var, then lets Unicorn execute from loopback_ea through | |
| the dispatcher's conditional branches until we land on a real leaf.""" | |
| func_obj = idaapi.get_func(loopback_ea) | |
| func_start = func_obj.start_ea | |
| func_end = func_obj.end_ea | |
| stack_base = 0x80000 | |
| stack_size = 0x10000 | |
| if rbp_val is None: | |
| rbp_val = stack_base + stack_size // 2 | |
| try: | |
| mu = Uc(UC_ARCH_X86, UC_MODE_64) | |
| # Map function code | |
| func_page = func_start & PAGE_MASK | |
| func_map_size = ((func_end - func_page + PAGE_SIZE) & PAGE_MASK) or PAGE_SIZE | |
| mu.mem_map(func_page, func_map_size) | |
| func_bytes = idaapi.get_bytes(func_start, func_end - func_start) | |
| if func_bytes: | |
| mu.mem_write(func_start, func_bytes) | |
| mapped_pages = set() | |
| for p in range(func_page, func_page + func_map_size, PAGE_SIZE): | |
| mapped_pages.add(p) | |
| # Map globals used by the function | |
| for block in idaapi.FlowChart(func): | |
| insns = get_block_insns(block.start_ea, block.end_ea) | |
| for addr in collect_global_addrs(insns): | |
| page = addr & PAGE_MASK | |
| if page not in mapped_pages: | |
| try: | |
| mu.mem_map(page, PAGE_SIZE) | |
| mapped_pages.add(page) | |
| except UcError: | |
| pass | |
| val = idaapi.get_dword(addr) | |
| mu.mem_write(addr, struct.pack("<I", val & 0xFFFFFFFF)) | |
| # Map stack | |
| if (stack_base & PAGE_MASK) not in mapped_pages: | |
| mu.mem_map(stack_base, stack_size) | |
| if stack_snapshot: | |
| mu.mem_write(stack_base, stack_snapshot) | |
| mu.reg_write(UC_X86_REG_RBP, rbp_val) | |
| mu.reg_write(UC_X86_REG_RSP, rbp_val - 0x200) | |
| # Write the staging value | |
| mu.mem_write(rbp_val + staging_disp, struct.pack("<I", staging_value & 0xFFFFFFFF)) | |
| # Use a code hook to stop when we enter a real leaf | |
| result = [None] | |
| def hook_code(uc, address, size, user_data): | |
| if address in real_leaf_set: | |
| result[0] = address | |
| uc.emu_stop() | |
| mu.hook_add(UC_HOOK_CODE, hook_code) | |
| mu.emu_start(loopback_ea, 0, timeout=0, count=2000) | |
| return result[0] | |
| except UcError as e: | |
| print(f" [!] Emulation resolve error for {staging_value:#x}: {e}") | |
| return None | |
| def reanalyze_range(start_ea, end_ea): | |
| """Force IDA to re-analyze a patched byte range.""" | |
| idc.del_items(start_ea, 0, end_ea - start_ea) | |
| ea = start_ea | |
| while ea < end_ea: | |
| insn_len = idc.create_insn(ea) | |
| if insn_len == 0: | |
| ea += 1 | |
| else: | |
| ea += insn_len | |
| def patch_jmp(patch_ea, target_ea): | |
| """Write jmp <target> at patch_ea.""" | |
| rel32 = target_ea - (patch_ea + 5) | |
| if not (-0x80000000 <= rel32 <= 0x7FFFFFFF): | |
| print(f" [!] Jump out of range at {patch_ea:#x}") | |
| return False | |
| idaapi.patch_byte(patch_ea, 0xE9) | |
| for i, b in enumerate(struct.pack("<i", rel32)): | |
| idaapi.patch_byte(patch_ea + 1 + i, b) | |
| return True | |
| def patch_jnz(patch_ea, target_ea): | |
| """Write jnz <target> at patch_ea (6 bytes: 0F 85 rel32).""" | |
| rel32 = target_ea - (patch_ea + 6) | |
| if not (-0x80000000 <= rel32 <= 0x7FFFFFFF): | |
| print(f" [!] Jnz out of range at {patch_ea:#x}") | |
| return False | |
| idaapi.patch_byte(patch_ea, 0x0F) | |
| idaapi.patch_byte(patch_ea + 1, 0x85) | |
| for i, b in enumerate(struct.pack("<i", rel32)): | |
| idaapi.patch_byte(patch_ea + 2 + i, b) | |
| return True | |
| def patch_jz(patch_ea, target_ea): | |
| """Write jz <target> at patch_ea (6 bytes: 0F 84 rel32).""" | |
| rel32 = target_ea - (patch_ea + 6) | |
| if not (-0x80000000 <= rel32 <= 0x7FFFFFFF): | |
| print(f" [!] Jz out of range at {patch_ea:#x}") | |
| return False | |
| idaapi.patch_byte(patch_ea, 0x0F) | |
| idaapi.patch_byte(patch_ea + 1, 0x84) | |
| for i, b in enumerate(struct.pack("<i", rel32)): | |
| idaapi.patch_byte(patch_ea + 2 + i, b) | |
| return True | |
| def find_staging_mov(insns, staging_var): | |
| """Find last mov [rbp+staging_var], <imm> scanning backwards. Returns ea or None.""" | |
| for i in range(len(insns) - 1, -1, -1): | |
| ea, mnem, dis = insns[i] | |
| if mnem == "mov" and staging_var in dis and "[rbp" in dis: | |
| if idc.get_operand_type(ea, 1) == idc.o_imm: | |
| return ea | |
| return None | |
| def simplify_first_bbl(func, staging_var, dispatcher_var, staging_disp, | |
| dispatch_map, real_leaf_set, routing_map, dispatcher_ea): | |
| """Emulate the first BBL, resolve its dispatch value to a real leaf, | |
| patch the mov [rbp+staging], imm to a jmp to that leaf.""" | |
| blocks = [(b.start_ea, b.end_ea) for b in idaapi.FlowChart(func)] | |
| blocks.sort(key=lambda b: b[0]) | |
| first_start, first_end = blocks[0] | |
| # Emulate from first BBL, following jmps until dispatcher | |
| initial_val, rbp_val, stack_snap = emulate_blocks( | |
| first_start, first_end, staging_disp, func, dispatcher_ea) | |
| if initial_val is None: | |
| print(f"[!] Failed to emulate first BBL") | |
| return rbp_val, stack_snap | |
| print(f"[+] First BBL initial dispatch value: {initial_val:#x}") | |
| # Resolve through routing blocks to real leaf | |
| real_target = resolve_to_real_leaf( | |
| initial_val, dispatch_map, real_leaf_set, routing_map, | |
| staging_disp, func, dispatcher_ea, rbp_val, stack_snap) | |
| if real_target is None: | |
| print(f"[!] Could not resolve to a real leaf") | |
| return rbp_val, stack_snap | |
| target_name = idc.get_name(real_target) or f"{real_target:#x}" | |
| print(f"[+] Resolves to: {target_name} ({real_target:#x})") | |
| insns = get_block_insns(first_start, first_end) | |
| patch_ea = find_staging_mov(insns, staging_var) | |
| if patch_ea is None: | |
| print(f"[!] Could not find mov [rbp+{staging_var}], imm in first BBL") | |
| return rbp_val, stack_snap | |
| if patch_jmp(patch_ea, real_target): | |
| # NOP the rest of the block after the jmp | |
| for addr in range(patch_ea + 5, first_end): | |
| idaapi.patch_byte(addr, 0xcc) | |
| reanalyze_range(patch_ea, first_end) | |
| print(f"[+] Patched {patch_ea:#x}: jmp {target_name}") | |
| return rbp_val, stack_snap | |
| def simplify_leaves(func, staging_var, staging_disp, dispatch_map, | |
| real_leaves, real_leaf_set, routing_map, | |
| dispatcher_ea, rbp_val, stack_snapshot): | |
| """For each real leaf ending with mov [rbp+staging], <val> / jmp dispatcher, | |
| emulate the leaf to get next dispatch value, resolve to real leaf, patch.""" | |
| patched = 0 | |
| for leaf_start, leaf_end in real_leaves: | |
| insns = get_block_insns(leaf_start, leaf_end) | |
| if not insns: | |
| continue | |
| # Must end with jmp (not ret) | |
| if insns[-1][1] not in ("jmp",): | |
| continue | |
| # Must have a mov to staging var before the jmp | |
| # Can be mov [rbp+staging], imm OR mov [rbp+staging], reg | |
| staging_mov_ea = None | |
| for i in range(len(insns) - 2, -1, -1): | |
| ea, mnem, dis = insns[i] | |
| if mnem == "mov" and staging_var in dis and "[rbp" in dis: | |
| staging_mov_ea = ea | |
| break | |
| if staging_mov_ea is None: | |
| continue | |
| # Detect opaque predicate: mov ecx, cs:dword / mov eax, cs:dword / mov edx, ecx | |
| opaque_start_ea = None | |
| for i in range(len(insns) - 2): | |
| ea0, m0, d0 = insns[i] | |
| ea1, m1, d1 = insns[i + 1] | |
| ea2, m2, d2 = insns[i + 2] | |
| if (m0 == "mov" and "ecx" in d0 and "cs:" in d0 | |
| and m1 == "mov" and "eax" in d1 and "cs:" in d1 | |
| and m2 == "mov" and "edx" in d2 and "ecx" in d2): | |
| opaque_start_ea = ea0 | |
| break | |
| # Skip blocks that match the conditional opaque pattern WITHOUT cs:dword preamble | |
| # (those are genuinely conditional and handled by simplify_conditional_opaques) | |
| if opaque_start_ea is None and detect_conditional_opaque(insns, staging_var) is not None: | |
| continue | |
| # Patch point: opaque predicate start if present, else the staging mov | |
| patch_ea = opaque_start_ea if opaque_start_ea else staging_mov_ea | |
| leaf_name = idc.get_name(leaf_start) or f"{leaf_start:#x}" | |
| # Emulate from opaque predicate start if present (self-contained), | |
| # otherwise from block start | |
| emu_start = opaque_start_ea if opaque_start_ea else leaf_start | |
| next_val, _, _ = emulate_blocks( | |
| emu_start, leaf_end, staging_disp, func, dispatcher_ea, | |
| rbp_val, stack_snapshot) | |
| if next_val is None: | |
| print(f" [!] {leaf_name}: emulation failed") | |
| continue | |
| # Resolve through routing blocks to real leaf | |
| real_target = resolve_to_real_leaf( | |
| next_val, dispatch_map, real_leaf_set, routing_map, | |
| staging_disp, func, dispatcher_ea, rbp_val, stack_snapshot) | |
| if real_target is None: | |
| print(f" [!] {leaf_name}: could not resolve {next_val:#x} to real leaf") | |
| continue | |
| target_name = idc.get_name(real_target) or f"{real_target:#x}" | |
| if patch_jmp(patch_ea, real_target): | |
| for addr in range(patch_ea + 5, leaf_end): | |
| idaapi.patch_byte(addr, 0xcc) | |
| reanalyze_range(patch_ea, leaf_end) | |
| patched += 1 | |
| print(f" Patched {leaf_start:#x}: jmp {target_name}") | |
| print(f"\n[+] Patched {patched}/{len(real_leaves)} real leaves") | |
| return patched | |
| def find_opaque_computation_start(insns, tail_idx): | |
| """Scan backwards from the conditional opaque tail to find where the opaque | |
| computation begins (mov ecx, cs:dword / mov eax, cs:dword pattern). | |
| Returns the index of the first opaque computation instruction, or tail_idx | |
| if no computation prefix is found.""" | |
| opaque_mnems = {"mov", "xor", "and", "or", "test", "cmp", "not", | |
| "shr", "shl", "sub", "add", "imul", "setz", "setnz", | |
| "setl", "setg", "setle", "setge", "setnl", "setng", | |
| "setb", "setnb", "seta", "setna"} | |
| start_idx = tail_idx | |
| for i in range(tail_idx - 1, -1, -1): | |
| _, mnem_i, dis_i = insns[i] | |
| if mnem_i not in opaque_mnems: | |
| break | |
| # Stop at any instruction that references stack vars (real code) | |
| # Opaque computations only use cs:dword globals and register-only arithmetic | |
| if "[rbp" in dis_i and "cs:" not in dis_i: | |
| break | |
| # Stop at memory writes via register (e.g. mov [rax], ecx) | |
| if mnem_i == "mov" and re.match(r'mov\s+\[r', dis_i) and 'rbp' not in dis_i: | |
| break | |
| start_idx = i | |
| return start_idx | |
| def detect_conditional_opaque(insns, staging_var): | |
| """Detect the conditional opaque predicate pattern in a block: | |
| mov eax, IMM_A | |
| mov ecx, IMM_B | |
| test <something>, 1 | |
| cmovnz eax, ecx | |
| mov [rbp+staging_var], eax | |
| jmp <dispatcher> | |
| Returns (pred_start_ea, test_ea, test_size, imm_a, imm_b, comp_start_ea) or None. | |
| comp_start_ea is where the opaque computation begins (may be earlier than pred_start_ea | |
| if there is a cs:dword arithmetic preamble). | |
| """ | |
| # Scan backwards from end: jmp, mov [rbp+staging], eax, cmovnz, test, mov ecx imm, mov eax imm | |
| if len(insns) < 6: | |
| return None | |
| _, m_last, _ = insns[-1] | |
| if m_last != "jmp": | |
| return None | |
| _, m_store, d_store = insns[-2] | |
| if m_store != "mov" or staging_var not in d_store or "[rbp" not in d_store or "eax" not in d_store: | |
| return None | |
| _, m_cmov, _ = insns[-3] | |
| if not m_cmov.startswith("cmovnz"): | |
| return None | |
| ea_test, m_test, d_test = insns[-4] | |
| if m_test != "test": | |
| return None | |
| # Must be testing something against 1 | |
| if "1" not in d_test: | |
| return None | |
| ea_ecx, m_ecx, d_ecx = insns[-5] | |
| if m_ecx != "mov" or "ecx" not in d_ecx: | |
| return None | |
| if idc.get_operand_type(ea_ecx, 1) != idc.o_imm: | |
| return None | |
| imm_b = idc.get_operand_value(ea_ecx, 1) & 0xFFFFFFFF | |
| ea_eax, m_eax, d_eax = insns[-6] | |
| if m_eax != "mov" or "eax" not in d_eax: | |
| return None | |
| if idc.get_operand_type(ea_eax, 1) != idc.o_imm: | |
| return None | |
| imm_a = idc.get_operand_value(ea_eax, 1) & 0xFFFFFFFF | |
| test_size = idc.get_item_size(ea_test) | |
| # Scan backwards from the tail to find opaque computation start (cs:dword arithmetic) | |
| tail_idx = len(insns) - 6 | |
| comp_start_idx = find_opaque_computation_start(insns, tail_idx) | |
| comp_start_ea = insns[comp_start_idx][0] | |
| return ea_eax, ea_test, test_size, imm_a, imm_b, comp_start_ea | |
| def simplify_conditional_opaques(func, staging_var, staging_disp, dispatch_map, | |
| real_leaves, real_leaf_set, routing_map, | |
| dispatcher_ea, loopback_ea, | |
| rbp_val, stack_snapshot): | |
| """Find leaves with conditional opaque predicates (mov eax,imm / mov ecx,imm / | |
| test / cmovnz / mov staging / jmp). | |
| If an opaque computation preamble (cs:dword arithmetic) precedes the tail, | |
| emulate from there to resolve the predicate. If it always takes one branch, | |
| patch with a single jmp from the computation start. Otherwise fall back to | |
| test / jz target_A / jmp target_B.""" | |
| patched = 0 | |
| for leaf_start, leaf_end in real_leaves: | |
| insns = get_block_insns(leaf_start, leaf_end) | |
| if not insns: | |
| continue | |
| result = detect_conditional_opaque(insns, staging_var) | |
| if result is None: | |
| continue | |
| pred_start_ea, test_ea, test_size, imm_a, imm_b, comp_start_ea = result | |
| leaf_name = idc.get_name(leaf_start) or f"{leaf_start:#x}" | |
| # Resolve both dispatch values to real leaves | |
| # Try dispatch_map first, fall back to full emulation through dispatcher | |
| target_a = resolve_to_real_leaf( | |
| imm_a, dispatch_map, real_leaf_set, routing_map, | |
| staging_disp, func, dispatcher_ea, rbp_val, stack_snapshot) | |
| if target_a is None: | |
| target_a = resolve_value_by_emulation( | |
| imm_a, staging_disp, func, real_leaf_set, | |
| loopback_ea, rbp_val, stack_snapshot) | |
| target_b = resolve_to_real_leaf( | |
| imm_b, dispatch_map, real_leaf_set, routing_map, | |
| staging_disp, func, dispatcher_ea, rbp_val, stack_snapshot) | |
| if target_b is None: | |
| target_b = resolve_value_by_emulation( | |
| imm_b, staging_disp, func, real_leaf_set, | |
| loopback_ea, rbp_val, stack_snapshot) | |
| if target_a is None: | |
| print(f" [!] {leaf_name}: could not resolve imm_a {imm_a:#x}") | |
| continue | |
| if target_b is None: | |
| print(f" [!] {leaf_name}: could not resolve imm_b {imm_b:#x}") | |
| continue | |
| name_a = idc.get_name(target_a) or f"{target_a:#x}" | |
| name_b = idc.get_name(target_b) or f"{target_b:#x}" | |
| # If there is an opaque computation preamble before the tail, | |
| # emulate from its start to resolve the predicate deterministically | |
| if comp_start_ea < pred_start_ea: | |
| resolved, _, _ = emulate_blocks( | |
| comp_start_ea, leaf_end, staging_disp, func, dispatcher_ea, | |
| rbp_val, stack_snapshot) | |
| if resolved is not None: | |
| # Determine which branch was taken | |
| if resolved == imm_a: | |
| winner_target, winner_name = target_a, name_a | |
| elif resolved == imm_b: | |
| winner_target, winner_name = target_b, name_b | |
| else: | |
| # Resolved to something else, try full resolution | |
| winner_target = resolve_to_real_leaf( | |
| resolved, dispatch_map, real_leaf_set, routing_map, | |
| staging_disp, func, dispatcher_ea, rbp_val, stack_snapshot) | |
| if winner_target is None: | |
| winner_target = resolve_value_by_emulation( | |
| resolved, staging_disp, func, real_leaf_set, | |
| loopback_ea, rbp_val, stack_snapshot) | |
| winner_name = idc.get_name(winner_target) or f"{winner_target:#x}" if winner_target else None | |
| if winner_target is not None: | |
| # Patch from computation start with single jmp | |
| if patch_jmp(comp_start_ea, winner_target): | |
| #for addr in range(comp_start_ea + 5, leaf_end): | |
| # idaapi.patch_byte(addr, 0x90) | |
| reanalyze_range(comp_start_ea, leaf_end) | |
| patched += 1 | |
| print(f" Patched {leaf_start:#x}: jmp {winner_name} (opaque always resolved to {resolved:#x})") | |
| continue | |
| # Genuinely conditional — rewrite as test / jz target_a / jmp target_b | |
| # cmovnz: when Z, eax=imm_a; when NZ, eax=imm_b | |
| # So jz → target_a (zero case), fallthrough jmp → target_b (non-zero case) | |
| test_bytes = idaapi.get_bytes(test_ea, test_size) | |
| if not test_bytes: | |
| continue | |
| write_ea = pred_start_ea | |
| # Write the test instruction | |
| for i, b in enumerate(test_bytes): | |
| idaapi.patch_byte(write_ea + i, b) | |
| write_ea += test_size | |
| # Write jz target_a (zero → cmovnz didn't fire → eax = imm_a) | |
| if not patch_jz(write_ea, target_a): | |
| continue | |
| write_ea += 6 | |
| # Write jmp target_b (non-zero → cmovnz fired → eax = imm_b) | |
| if not patch_jmp(write_ea, target_b): | |
| continue | |
| # NOP the rest of the block | |
| patch_end_ea = write_ea + 5 | |
| for addr in range(patch_end_ea, leaf_end): | |
| idaapi.patch_byte(addr, 0xcc) | |
| reanalyze_range(pred_start_ea, leaf_end) | |
| patched += 1 | |
| print(f" Patched {leaf_start:#x}: test / jz {name_a} / jmp {name_b}") | |
| print(f"\n[+] Patched {patched} conditional opaque predicate(s)") | |
| return patched | |
| def wipe_non_leaf_blocks(all_blocks, leaf_set, first_bbl_start): | |
| """NOP out every BBL that is not a leaf (neither real leaf nor routing block). | |
| all_blocks is the list of (start_ea, end_ea) collected BEFORE any patching. | |
| Preserves the first BBL and all leaves.""" | |
| wiped = 0 | |
| for start_ea, end_ea in all_blocks: | |
| if start_ea == first_bbl_start: | |
| continue | |
| if start_ea in leaf_set: | |
| continue | |
| for addr in range(start_ea, end_ea): | |
| idaapi.patch_byte(addr, 0xcc) | |
| # Undefine and mark as data bytes so IDA doesn't try to disassemble | |
| idc.del_items(start_ea, 0, end_ea - start_ea) | |
| idaapi.create_data(start_ea, idaapi.FF_BYTE, end_ea - start_ea, idaapi.BADNODE) | |
| wiped += 1 | |
| print(f"\n[+] Wiped {wiped} non-leaf block(s)") | |
| def analyze(ea=None): | |
| if ea is None: | |
| ea = idc.here() | |
| func = idaapi.get_func(ea) | |
| if not func: | |
| print(f"[!] No function at {ea:#x}") | |
| return | |
| func_name = idc.get_func_name(func.start_ea) | |
| print(f"\n{'='*60}") | |
| print(f"Analyzing {func_name}") | |
| print(f"{'='*60}") | |
| # Detect dispatcher | |
| staging, dispatcher, dispatcher_ea = detect_dispatcher(func) | |
| if staging: | |
| print(f"[+] Dispatcher block at {dispatcher_ea:#x}") | |
| print(f"[+] Staging var: {staging}") | |
| print(f"[+] Dispatcher var: {dispatcher}") | |
| else: | |
| print("[!] Could not detect dispatcher vars") | |
| return | |
| # Get staging displacement | |
| staging_disp = get_var_displacement(func, staging) | |
| if staging_disp is None: | |
| print(f"[!] Could not resolve displacement for {staging}") | |
| return | |
| print(f"[+] {staging} displacement: {staging_disp:#x}") | |
| # Detect leaves | |
| real_leaves, routing_blocks, loopback_ea = detect_leaves(func, staging, dispatcher) | |
| print(f"\n[+] Loop-back block: {loopback_ea:#x}") | |
| print(f"[+] Real leaves: {len(real_leaves)} | Routing blocks: {len(routing_blocks)}\n") | |
| print(f" Real leaves:") | |
| for start_ea, end_ea in real_leaves: | |
| name = idc.get_name(start_ea) or f"{start_ea:#x}" | |
| print(f" {name:30s} {start_ea:#x} - {end_ea:#x}") | |
| print(f"\n Routing blocks:") | |
| for start_ea, end_ea in routing_blocks: | |
| name = idc.get_name(start_ea) or f"{start_ea:#x}" | |
| print(f" {name:30s} {start_ea:#x} - {end_ea:#x}") | |
| # Build dispatch map and lookup tables | |
| dispatch_map = build_dispatch_map(func, dispatcher) | |
| print(f"\n[+] Dispatch map: {len(dispatch_map)} entries") | |
| real_leaf_set = set(s for s, e in real_leaves) | |
| routing_map = {s: (s, e) for s, e in routing_blocks} | |
| # Snapshot all blocks BEFORE any patching (FlowChart will change after patches) | |
| all_blocks = [(b.start_ea, b.end_ea) for b in idaapi.FlowChart(func)] | |
| # Simplify first BBL | |
| print(f"\n[+] Simplifying first BBL...") | |
| rbp_val, stack_snap = simplify_first_bbl( | |
| func, staging, dispatcher, staging_disp, | |
| dispatch_map, real_leaf_set, routing_map, dispatcher_ea) | |
| # Simplify all real leaves (loop until no more patches) | |
| round_num = 0 | |
| while True: | |
| round_num += 1 | |
| print(f"\n[+] Simplifying real leaves (round {round_num})...") | |
| count = simplify_leaves(func, staging, staging_disp, dispatch_map, | |
| real_leaves, real_leaf_set, routing_map, | |
| dispatcher_ea, rbp_val, stack_snap) | |
| if count == 0: | |
| break | |
| # Simplify routing blocks (they can also contain opaque predicates) | |
| print(f"\n[+] Simplifying routing blocks...") | |
| simplify_leaves(func, staging, staging_disp, dispatch_map, | |
| routing_blocks, real_leaf_set, routing_map, | |
| dispatcher_ea, rbp_val, stack_snap) | |
| # Simplify conditional opaque predicates (mov/mov/test/cmovnz pattern) | |
| # Run on both real leaves and routing blocks | |
| all_leaves = real_leaves + routing_blocks | |
| print(f"\n[+] Simplifying conditional opaque predicates...") | |
| simplify_conditional_opaques(func, staging, staging_disp, dispatch_map, | |
| all_leaves, real_leaf_set, routing_map, | |
| dispatcher_ea, loopback_ea, | |
| rbp_val, stack_snap) | |
| # Wipe every BBL that is not a real leaf (routing blocks, dispatcher, etc.) | |
| first_bbl_start = min(s for s, _ in all_blocks) | |
| wipe_non_leaf_blocks(all_blocks, real_leaf_set, first_bbl_start) | |
| reanalyze_range( | |
| min(s for s, _ in real_leaves), | |
| max(e for _, e in real_leaves) | |
| ) | |
| if __name__ == "__main__": | |
| analyze() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment