Last active
September 21, 2024 06:10
-
-
Save herrcore/0649d85a6838972db5da71bed6ed676b to your computer and use it in GitHub Desktop.
Lumma Stealer Deobfuscation (IDA Python)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# import idautils | |
import idc | |
import ida_bytes | |
import ida_ua | |
import ida_funcs | |
import ida_idp | |
from idautils import DecodeInstruction | |
import struct | |
jump_instructions = [ | |
'jmp', 'ja', 'jae', 'jb', 'jbe', 'jc', 'jcxz', 'jecxz', 'jrcxz', 'je', | |
'jg', 'jge', 'jl', 'jle', 'jna', 'jnae', 'jnb', 'jnbe', 'jnc', 'jne', | |
'jng', 'jnge', 'jnl', 'jnle', 'jno', 'jnp', 'jns', 'jnz', 'jo', 'jp', | |
'jpe', 'jpo', 'js', 'jz' | |
] | |
def scan_binary(start_addr, end_addr, file_data): | |
mem_mov_addr = None | |
mem_mov_reg = None | |
mem_mov_indexed_addr = None | |
mem_mov_indexed_reg = None | |
mem_mov_indexed_reg_index = None | |
curr_addr = start_addr | |
while curr_addr <= end_addr: | |
# Create an insn_t object at the current address | |
insn = ida_ua.insn_t() | |
ida_ua.decode_insn(insn, curr_addr) | |
# Get the disassembly of the current instruction | |
disasm = ida_ua.print_insn_mnem(insn.ea) | |
if disasm is None: | |
print(f"{hex(curr_addr)}: Skipping non code") | |
else: | |
#insn = DecodeInstruction(curr_addr) | |
#print(insn) | |
print(f"{hex(curr_addr)}: {disasm}") | |
if 'jmp' in disasm: | |
if insn.Op1.type == ida_ua.o_reg: | |
reg = ida_idp.get_reg_name(insn.Op1.reg, 4) | |
print(f"\tJump to reg {reg}") | |
# Get the closest mem move into the jump register | |
if mem_mov_reg == reg and mem_mov_indexed_reg == reg: | |
if mem_mov_addr < mem_mov_indexed_addr: | |
print(f"\tClosest mem move {hex(mem_mov_indexed_addr)}") | |
new_addr_0 = None | |
new_addr_1 = None | |
unicorn_reg = get_unicorn_reg_name(reg) | |
index_reg = get_unicorn_reg_name(mem_mov_indexed_reg_index) | |
new_addr_0 = emulate_jmp_table_condition(file_data, mem_mov_indexed_addr, curr_addr, unicorn_reg, index_reg, 0) | |
new_addr_1 = emulate_jmp_table_condition(file_data, mem_mov_indexed_addr, curr_addr, unicorn_reg, index_reg, 1) | |
if new_addr_0 is None: | |
print(f"\tERROR emulation failed for new_addr_0") | |
elif new_addr_1 is None: | |
print(f"\tERROR emulation failed for new_addr_1") | |
else: | |
print(f"\tEmulated jump [{mem_mov_indexed_reg_index} = 0]to {hex(new_addr_0)}") | |
print(f"\tEmulated jump [{mem_mov_indexed_reg_index} = 1]to {hex(new_addr_1)}") | |
ida_bytes.set_cmt(curr_addr, f"0={hex(new_addr_0)}\n1={hex(new_addr_1)}", 0) | |
patch_indexed(mem_mov_indexed_addr, curr_addr, reg, mem_mov_indexed_reg_index, new_addr_1, new_addr_0) | |
else: | |
print(f"\tClosest mem move {hex(mem_mov_addr)}") | |
new_addr = None | |
unicorn_reg = get_unicorn_reg_name(reg) | |
new_addr = emulate_jmp_table(file_data, mem_mov_addr, curr_addr, unicorn_reg) | |
if new_addr is None: | |
print(f"\tERROR emulation failed") | |
else: | |
print(f"\tEmulated jump to {hex(new_addr)}") | |
ida_bytes.set_cmt(curr_addr, hex(new_addr), 0) | |
patch_direct(mem_mov_addr,curr_addr,new_addr) | |
elif mem_mov_reg == reg: | |
print(f"\tClosest mem move {hex(mem_mov_addr)}") | |
new_addr = None | |
unicorn_reg = get_unicorn_reg_name(reg) | |
new_addr = emulate_jmp_table(file_data, mem_mov_addr, curr_addr, unicorn_reg) | |
if new_addr is None: | |
print(f"\tERROR emulation failed") | |
else: | |
print(f"\tEmulated jump to {hex(new_addr)}") | |
ida_bytes.set_cmt(curr_addr, hex(new_addr), 0) | |
patch_direct(mem_mov_addr,curr_addr,new_addr) | |
elif mem_mov_indexed_reg == reg: | |
print(f"\tClosest mem move {hex(mem_mov_indexed_addr)}") | |
new_addr_0 = None | |
new_addr_1 = None | |
unicorn_reg = get_unicorn_reg_name(reg) | |
index_reg = get_unicorn_reg_name(mem_mov_indexed_reg_index) | |
new_addr_0 = emulate_jmp_table_condition(file_data, mem_mov_indexed_addr, curr_addr, unicorn_reg, index_reg, 0) | |
new_addr_1 = emulate_jmp_table_condition(file_data, mem_mov_indexed_addr, curr_addr, unicorn_reg, index_reg, 1) | |
if new_addr_0 is None: | |
print(f"\tERROR emulation failed for new_addr_0") | |
elif new_addr_1 is None: | |
print(f"\tERROR emulation failed for new_addr_1") | |
else: | |
print(f"\tEmulated jump [{mem_mov_indexed_reg_index} = 0]to {hex(new_addr_0)}") | |
print(f"\tEmulated jump [{mem_mov_indexed_reg_index} = 1]to {hex(new_addr_1)}") | |
ida_bytes.set_cmt(curr_addr, f"0={hex(new_addr_0)}\n1={hex(new_addr_1)}", 0) | |
patch_indexed(mem_mov_indexed_addr, curr_addr, reg, mem_mov_indexed_reg_index, new_addr_1, new_addr_0) | |
else: | |
if mem_mov_indexed_addr is not None and mem_mov_addr is not None: | |
if mem_mov_addr > mem_mov_indexed_addr: | |
print(f"\tFurther mem move {hex(mem_mov_indexed_addr)}") | |
new_addr_0 = None | |
new_addr_1 = None | |
unicorn_reg = get_unicorn_reg_name(reg) | |
index_reg = get_unicorn_reg_name(mem_mov_indexed_reg_index) | |
new_addr_0 = emulate_jmp_table_condition(file_data, mem_mov_indexed_addr, curr_addr, unicorn_reg, index_reg, 0) | |
new_addr_1 = emulate_jmp_table_condition(file_data, mem_mov_indexed_addr, curr_addr, unicorn_reg, index_reg, 1) | |
if new_addr_0 is None: | |
print(f"\tERROR emulation failed for new_addr_0") | |
elif new_addr_1 is None: | |
print(f"\tERROR emulation failed for new_addr_1") | |
else: | |
print(f"\tEmulated jump [{mem_mov_indexed_reg_index} = 0]to {hex(new_addr_0)}") | |
print(f"\tEmulated jump [{mem_mov_indexed_reg_index} = 1]to {hex(new_addr_1)}") | |
ida_bytes.set_cmt(curr_addr, f"0={hex(new_addr_0)}\n1={hex(new_addr_1)}", 0) | |
patch_indexed(mem_mov_indexed_addr, curr_addr, reg, mem_mov_indexed_reg_index, new_addr_1, new_addr_0) | |
else: | |
print(f"\tFurther mem move {hex(mem_mov_addr)}") | |
new_addr = None | |
unicorn_reg = get_unicorn_reg_name(reg) | |
new_addr = emulate_jmp_table(file_data, mem_mov_addr, curr_addr, unicorn_reg) | |
if new_addr is None: | |
print(f"\tERROR emulation failed") | |
else: | |
print(f"\tEmulated jump to {hex(new_addr)}") | |
ida_bytes.set_cmt(curr_addr, hex(new_addr), 0) | |
patch_direct(mem_mov_addr,curr_addr,new_addr) | |
elif mem_mov_addr is not None: | |
print(f"\tFurther mem move {hex(mem_mov_addr)}") | |
new_addr = None | |
unicorn_reg = get_unicorn_reg_name(reg) | |
new_addr = emulate_jmp_table(file_data, mem_mov_addr, curr_addr, unicorn_reg) | |
if new_addr is None: | |
print(f"\tERROR emulation failed") | |
else: | |
print(f"\tEmulated jump to {hex(new_addr)}") | |
ida_bytes.set_cmt(curr_addr, hex(new_addr), 0) | |
patch_direct(mem_mov_addr,curr_addr,new_addr) | |
elif mem_mov_indexed_addr is not None: | |
print(f"\tFurther mem move {hex(mem_mov_indexed_addr)}") | |
new_addr_0 = None | |
new_addr_1 = None | |
unicorn_reg = get_unicorn_reg_name(reg) | |
index_reg = get_unicorn_reg_name(mem_mov_indexed_reg_index) | |
new_addr_0 = emulate_jmp_table_condition(file_data, mem_mov_indexed_addr, curr_addr, unicorn_reg, index_reg, 0) | |
new_addr_1 = emulate_jmp_table_condition(file_data, mem_mov_indexed_addr, curr_addr, unicorn_reg, index_reg, 1) | |
if new_addr_0 is None: | |
print(f"\tERROR emulation failed for new_addr_0") | |
elif new_addr_1 is None: | |
print(f"\tERROR emulation failed for new_addr_1") | |
else: | |
print(f"\tEmulated jump [{mem_mov_indexed_reg_index} = 0]to {hex(new_addr_0)}") | |
print(f"\tEmulated jump [{mem_mov_indexed_reg_index} = 1]to {hex(new_addr_1)}") | |
ida_bytes.set_cmt(curr_addr, f"0={hex(new_addr_0)}\n1={hex(new_addr_1)}", 0) | |
patch_indexed(mem_mov_indexed_addr, curr_addr, reg, mem_mov_indexed_reg_index, new_addr_1, new_addr_0) | |
# Re disassemble the patched bytes | |
else: | |
print(f"\tNo mem move found") | |
# Reset the memory tracking for any jmp | |
mem_mov_addr = None | |
mem_mov_reg = None | |
mem_mov_indexed_addr = None | |
mem_mov_indexed_reg = None | |
mem_mov_indexed_reg_index = None | |
elif 'mov' in disasm and insn.Op2.type == ida_ua.o_mem: | |
reg = ida_idp.get_reg_name(insn.Op1.reg, 4) | |
jump_table_addr = insn.Op2.addr | |
if insn.Op2.specflag1 == 0: | |
# Direct move | |
print(f"\tMove from memory directly {hex(jump_table_addr)}") | |
mem_mov_addr = curr_addr | |
mem_mov_reg = reg | |
else: | |
# Indexed move | |
index_reg = (insn.Op2.specflag2 >> 3) & 0x7 | |
index_reg_name = ida_idp.get_reg_name(index_reg, 4) | |
print(f"\tMove from indexed memory {hex(jump_table_addr)} index register {index_reg_name}") | |
mem_mov_indexed_addr = curr_addr | |
mem_mov_indexed_reg = reg | |
mem_mov_indexed_reg_index = index_reg_name | |
elif 'call' in disasm or disasm in jump_instructions: | |
# Reset the memory tracking for any control flow | |
mem_mov_addr = None | |
mem_mov_reg = None | |
mem_mov_indexed_addr = None | |
mem_mov_indexed_reg = None | |
mem_mov_indexed_reg_index = None | |
# Get next address | |
curr_addr = ida_bytes.next_head(curr_addr, end_addr) | |
import unicorn | |
from unicorn import * | |
from unicorn.x86_const import * | |
import struct | |
import pefile | |
def get_unicorn_reg_name(reg): | |
if reg == "eax": | |
return UC_X86_REG_EAX | |
elif reg == "ebx": | |
return UC_X86_REG_EBX | |
elif reg == "ecx": | |
return UC_X86_REG_ECX | |
elif reg == "edx": | |
return UC_X86_REG_EDX | |
elif reg == "esi": | |
return UC_X86_REG_ESI | |
elif reg == "edi": | |
return UC_X86_REG_EDI | |
else: | |
return None | |
def map_pe_sections(file_data): | |
# Parse the PE file | |
pe = pefile.PE(data=file_data) | |
# Initialize the Unicorn engine | |
mu = Uc(UC_ARCH_X86, UC_MODE_32) | |
# PE base | |
pe_base = pe.OPTIONAL_HEADER.ImageBase | |
# Map each section into the Unicorn engine | |
for section in pe.sections: | |
base_address = section.VirtualAddress + pe_base | |
section_size = max(section.SizeOfRawData, section.Misc_VirtualSize) | |
# Map the section into the Unicorn engine's memory | |
mu.mem_map(base_address, section_size) | |
# Write the section's data into the Unicorn engine's memory | |
mu.mem_write(base_address, section.get_data()) | |
# Add a stack for the Unicorn engine | |
stack_base = 0x1000000 | |
stack_size = 1024 * 1024 | |
mu.mem_map(stack_base, stack_size) | |
mu.reg_write(UC_X86_REG_ESP, (stack_base + stack_size) // 2) | |
mu.reg_write(UC_X86_REG_EBP, (stack_base + stack_size) // 2) | |
return mu | |
def hook_unmapped_mem(uc, access, address, size, value, user_data): | |
print("HOOK: Fix unmapped memory at address: 0x{0:X}".format(address)) | |
# Map the memory page on-the-fly | |
page_start = address & ~(0x1000 - 1) # Round down to nearest multiple of 0x1000 | |
uc.mem_map(page_start, 0x1000) # Map 1 page of memory | |
return True # Continue execution | |
def emulate_jmp_table(file_data, start, end, jmp_reg): | |
print(f"Emulation start:{hex(start)} end:{hex(end)} jmp_reg{jmp_reg}") | |
# try: | |
mu = map_pe_sections(file_data) | |
mu.hook_add(UC_HOOK_MEM_READ_UNMAPPED, hook_unmapped_mem) | |
mu.hook_add(UC_HOOK_MEM_WRITE_UNMAPPED, hook_unmapped_mem) | |
# Start emulation | |
mu.emu_start(start, end) | |
# read the value of eax register | |
reg_value = mu.reg_read(jmp_reg) | |
# except Exception as e: | |
# print(e) | |
# reg_value = None | |
return reg_value | |
def emulate_jmp_table_condition(file_data, start, end, jmp_reg, condition_reg, condition_value): | |
mu = map_pe_sections(file_data) | |
# Set the condition register to the desired value | |
mu.reg_write(condition_reg, condition_value) | |
mu.hook_add(UC_HOOK_MEM_READ_UNMAPPED, hook_unmapped_mem) | |
mu.hook_add(UC_HOOK_MEM_WRITE_UNMAPPED, hook_unmapped_mem) | |
# Start emulation | |
mu.emu_start(start, end) | |
# read the value of eax register | |
reg_value = mu.reg_read(jmp_reg) | |
return reg_value | |
def get_preserve_bytes(start, end): | |
# Starting at end go backwards until we find an inc instruction or the start | |
curr_addr = end | |
preserve_bytes = b"" | |
while curr_addr >= start: | |
insn = ida_ua.insn_t() | |
ida_ua.decode_insn(insn, curr_addr) | |
disasm = ida_ua.print_insn_mnem(curr_addr) | |
if disasm is None: | |
break | |
if 'inc' in disasm: | |
# Preserve from instruction after int to start of end | |
bytes_start = ida_bytes.next_head(curr_addr, end) | |
if bytes_start == idc.BADADDR: | |
break | |
preserve_bytes = ida_bytes.get_bytes(bytes_start, end - bytes_start) | |
break | |
curr_addr = ida_bytes.prev_head(curr_addr, start) | |
return preserve_bytes | |
def build_near_jump(address, destination): | |
# Calculate the relative offset | |
offset = destination - address - 5 | |
# Pack the offset into a little-endian byte string | |
jmp_bytes = struct.pack("<i", offset) | |
# Build the near jump instruction | |
jmp_instr = b"\xE9" + jmp_bytes | |
return jmp_instr | |
def build_near_je(address, destination): | |
# Calculate the relative offset | |
offset = destination - address - 6 | |
# Pack the offset into a little-endian byte string | |
jmp_bytes = struct.pack("<i", offset) | |
# Build the near jump instruction | |
jmp_instr = b"\x0F\x84" + jmp_bytes | |
return jmp_instr | |
def patch_direct(start, end, destination): | |
# Get the bytes to preserve | |
preserve_bytes = get_preserve_bytes(start, end) | |
jmp_address = start + len(preserve_bytes) | |
# Build the near jump instruction | |
jmp_instr = build_near_jump(jmp_address, destination) | |
# Fill the rest of the instruction with NOPs | |
# We add 2 for the jmp reg at the end | |
nops = b"\x90" * (end - jmp_address - len(jmp_instr) + 2) | |
patch_bytes = preserve_bytes + jmp_instr + nops | |
if len(patch_bytes) != end - start + 2: | |
print(f"ERROR: Patch bytes length {len(patch_bytes)} does not match {end - start + 2}") | |
else: | |
# Apply the patch | |
ida_bytes.patch_bytes(start, patch_bytes) | |
reg_mov = { | |
"eax_eax": b'\x89\xC0', | |
"eax_ebx": b'\x89\xD8', | |
"eax_ecx": b'\x89\xC8', | |
"eax_edx": b'\x89\xD0', | |
"ebx_eax": b'\x89\xC3', | |
"ebx_ebx": b'\x89\xDB', | |
"ebx_ecx": b'\x89\xCB', | |
"ebx_edx": b'\x89\xD3', | |
"ecx_eax": b'\x89\xC1', | |
"ecx_ebx": b'\x89\xD9', | |
"ecx_ecx": b'\x89\xC9', | |
"ecx_edx": b'\x89\xD1', | |
"edx_eax": b'\x89\xC2', | |
"edx_ebx": b'\x89\xDA', | |
"edx_ecx": b'\x89\xCA', | |
"edx_edx": b'\x89\xD2', | |
} | |
reg_cmp = { | |
"eax" : b'\x3C\x01', | |
"ebx" : b'\x80\xFB\x01', | |
"ecx" : b'\x80\xF9\x01', | |
"edx" : b'\x80\xFA\x01', | |
} | |
def patch_indexed(start, end, reg, index_reg, dest_1, dest_0): | |
patch_byes = b"" | |
# Get the safe reg to save condition | |
reg_mov_bytes = reg_mov[f"{reg}_{index_reg}"] | |
patch_byes += reg_mov_bytes | |
# Get the bytes to preserve | |
preserve_bytes = get_preserve_bytes(start, end) | |
patch_byes += preserve_bytes | |
# Get reg cmp bytes | |
cmp_bytes = reg_cmp[reg] | |
patch_byes += cmp_bytes | |
je_address = start + len(patch_byes) | |
# Build the near je instruction | |
je_instr = build_near_je(je_address, dest_1) | |
patch_byes += je_instr | |
# Build the near jmp instruction | |
jmp_address = start + len(patch_byes) | |
jmp_instr = build_near_jump(jmp_address, dest_0) | |
patch_byes += jmp_instr | |
# Fill the rest of the instruction with NOPs | |
# We add 2 for the jmp reg at the end | |
nops = b"\x90" * (end - jmp_address - len(jmp_instr) + 2) | |
patch_byes += nops | |
if len(patch_byes) != end - start + 2: | |
print(f"ERROR: Patch bytes length {len(patch_byes)} does not match {end - start + 2}") | |
else: | |
# Apply the patch | |
ida_bytes.patch_bytes(start, patch_byes) | |
# 0x434a30 we need to pick the further one | |
file_data = open("Z:\\tmp\\sample.bin", "rb").read() | |
#file_data = open("/tmp/sample.bin", "rb").read() | |
scan_binary(0x00433240, 0x00435149, file_data) | |
scan_binary(0x00435870, 0x00435A00 , file_data) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment