Skip to content

Instantly share code, notes, and snippets.

@tahaconfiant
Created June 3, 2021 17:42
Show Gist options
  • Save tahaconfiant/af07e9194dcfeed838564b700998c14e to your computer and use it in GitHub Desktop.
Save tahaconfiant/af07e9194dcfeed838564b700998c14e to your computer and use it in GitHub Desktop.
HydroMac IDAPython script to decrypt strings
# author : [email protected] aka lordx64
# copyright 2021 - All rights reserved
# tested against macOS/Hydromac sample (aka MapperState) 919d049d5490adaaed70169ddd0537bfa2018a572e93b19801cf245f7fd28408
# compatible python 3.8, and IDAPython for IDA 7.6.210319
# this HydroMac String decryptor uses a helper class UEMU_HELPERS taken from https://github.com/alexhude/uEmu project
import idc
import struct
import idautils
from abc import ABC, abstractmethod
import time
# IDA Python SDK
from idaapi import *
from idc import *
from idautils import *
if IDA_SDK_VERSION >= 700:
# functions
IDAAPI_ScreenEA = get_screen_ea
IDAAPI_IsCode = is_code
IDAAPI_DelItems = del_items
IDAAPI_MakeCode = create_insn
IDAAPI_GetFlags = get_full_flags
IDAAPI_SetColor = set_color
IDAAPI_IsLoaded = is_loaded
IDAAPI_HasValue = has_value
IDAAPI_GetBptQty = get_bpt_qty
IDAAPI_GetBptEA = get_bpt_ea
IDAAPI_GetBptAttr = get_bpt_attr
IDAAPI_SegStart = get_segm_start
IDAAPI_SegEnd = get_segm_end
IDAAPI_GetBytes = get_bytes
IDAAPI_AskYN = ask_yn
IDAAPI_AskFile = ask_file
IDAAPI_AskLong = ask_long
IDAAPI_NextHead = next_head
IDAAPI_GetDisasm = lambda a, b: tag_remove(generate_disasm_line(a, b))
IDAAPI_NextThat = next_that
IDAAPI_Jump = jumpto
# classes
IDAAPI_Choose = Choose
else:
# functions
IDAAPI_ScreenEA = ScreenEA
IDAAPI_IsCode = isCode
IDAAPI_DelItems = MakeUnkn
IDAAPI_MakeCode = MakeCode
IDAAPI_GetFlags = getFlags
IDAAPI_SetColor = SetColor
IDAAPI_IsLoaded = isLoaded
IDAAPI_HasValue = hasValue
IDAAPI_GetBptQty = GetBptQty
IDAAPI_GetBptEA = GetBptEA
IDAAPI_GetBptAttr = GetBptAttr
IDAAPI_SegStart = SegStart
IDAAPI_SegEnd = SegEnd
IDAAPI_GetBytes = get_many_bytes
IDAAPI_AskYN = AskYN
IDAAPI_AskFile = AskFile
IDAAPI_AskLong = AskLong
IDAAPI_NextHead = NextHead
IDAAPI_GetDisasm = GetDisasmEx
IDAAPI_NextThat = nextthat
IDAAPI_Jump = Jump
# classes
IDAAPI_Choose = Choose2
# Unicorn SDK
from unicorn import *
from unicorn.arm_const import *
from unicorn.arm64_const import *
from unicorn.mips_const import *
from unicorn.x86_const import *
# === Configuration
class UEMU_CONFIG:
IDAViewColor_PC = 0x8066ff
IDAViewColor_Reset = 0xFFFFFFFF
UnicornPageSize = 0x1000
# === Helpers
class UEMU_HELPERS:
# IDA
@staticmethod
def exec_on_main(callable, dbflag):
res = execute_sync(callable, dbflag)
if res == -1:
uemu_log("! <C> execute_sync(%s) failed" % (str(callable)))
return False
return True if res == 1 else False
# Others
@staticmethod
def ALIGN_PAGE_DOWN(x):
return x & ~(UEMU_CONFIG.UnicornPageSize - 1)
@staticmethod
def ALIGN_PAGE_UP(x):
return (x + UEMU_CONFIG.UnicornPageSize - 1) & ~(UEMU_CONFIG.UnicornPageSize-1)
@staticmethod
def inf_is_be():
if IDA_SDK_VERSION >= 700:
return cvar.inf.is_be()
else:
return cvar.inf.mf
@staticmethod
def get_arch():
if ph.id == PLFM_386 and ph.flag & PR_USE64:
return "x64"
elif ph.id == PLFM_386 and ph.flag & PR_USE32:
return "x86"
elif ph.id == PLFM_ARM and ph.flag & PR_USE64:
if UEMU_HELPERS.inf_is_be():
return "arm64be"
else:
return "arm64le"
elif ph.id == PLFM_ARM and ph.flag & PR_USE32:
if UEMU_HELPERS.inf_is_be():
return "armbe"
else:
return "armle"
elif ph.id == PLFM_MIPS and ph.flag & PR_USE64:
if UEMU_HELPERS.inf_is_be():
return "mips64be"
else:
return "mips64le"
elif ph.id == PLFM_MIPS and ph.flag & PR_USE32:
if UEMU_HELPERS.inf_is_be():
return "mipsbe"
else:
return "mipsle"
else:
return ""
@staticmethod
def get_stack_register(arch):
if arch.startswith("arm64"):
arch = "arm64"
elif arch.startswith("arm"):
arch = "arm"
elif arch.startswith("mips"):
arch = "mips"
registers = {
"x64" : ("rsp", UC_X86_REG_RSP),
"x86" : ("esp", UC_X86_REG_ESP),
"arm" : ("SP", UC_ARM_REG_SP),
"arm64" : ("SP", UC_ARM64_REG_SP),
"mips" : ("sp", UC_MIPS_REG_29),
}
return registers[arch]
@staticmethod
def get_register_map(arch):
if arch.startswith("arm64"):
arch = "arm64"
elif arch.startswith("arm"):
arch = "arm"
elif arch.startswith("mips"):
arch = "mips"
registers = {
"x64" : [
[ "rax", UC_X86_REG_RAX ],
[ "rbx", UC_X86_REG_RBX ],
[ "rcx", UC_X86_REG_RCX ],
[ "rdx", UC_X86_REG_RDX ],
[ "rsi", UC_X86_REG_RSI ],
[ "rdi", UC_X86_REG_RDI ],
[ "rbp", UC_X86_REG_RBP ],
[ "rsp", UC_X86_REG_RSP ],
[ "r8", UC_X86_REG_R8 ],
[ "r9", UC_X86_REG_R9 ],
[ "r10", UC_X86_REG_R10 ],
[ "r11", UC_X86_REG_R11 ],
[ "r12", UC_X86_REG_R12 ],
[ "r13", UC_X86_REG_R13 ],
[ "r14", UC_X86_REG_R14 ],
[ "r15", UC_X86_REG_R15 ],
[ "rip", UC_X86_REG_RIP ],
[ "sp", UC_X86_REG_SP ],
],
"x86" : [
[ "eax", UC_X86_REG_EAX ],
[ "ebx", UC_X86_REG_EBX ],
[ "ecx", UC_X86_REG_ECX ],
[ "edx", UC_X86_REG_EDX ],
[ "esi", UC_X86_REG_ESI ],
[ "edi", UC_X86_REG_EDI ],
[ "ebp", UC_X86_REG_EBP ],
[ "esp", UC_X86_REG_ESP ],
[ "eip", UC_X86_REG_EIP ],
[ "sp", UC_X86_REG_SP ],
],
"arm" : [
[ "R0", UC_ARM_REG_R0 ],
[ "R1", UC_ARM_REG_R1 ],
[ "R2", UC_ARM_REG_R2 ],
[ "R3", UC_ARM_REG_R3 ],
[ "R4", UC_ARM_REG_R4 ],
[ "R5", UC_ARM_REG_R5 ],
[ "R6", UC_ARM_REG_R6 ],
[ "R7", UC_ARM_REG_R7 ],
[ "R8", UC_ARM_REG_R8 ],
[ "R9", UC_ARM_REG_R9 ],
[ "R10", UC_ARM_REG_R10 ],
[ "R11", UC_ARM_REG_R11 ],
[ "R12", UC_ARM_REG_R12 ],
[ "PC", UC_ARM_REG_PC ],
[ "SP", UC_ARM_REG_SP ],
[ "LR", UC_ARM_REG_LR ],
[ "CPSR", UC_ARM_REG_CPSR ]
],
"arm64" : [
[ "X0", UC_ARM64_REG_X0 ],
[ "X1", UC_ARM64_REG_X1 ],
[ "X2", UC_ARM64_REG_X2 ],
[ "X3", UC_ARM64_REG_X3 ],
[ "X4", UC_ARM64_REG_X4 ],
[ "X5", UC_ARM64_REG_X5 ],
[ "X6", UC_ARM64_REG_X6 ],
[ "X7", UC_ARM64_REG_X7 ],
[ "X8", UC_ARM64_REG_X8 ],
[ "X9", UC_ARM64_REG_X9 ],
[ "X10", UC_ARM64_REG_X10 ],
[ "X11", UC_ARM64_REG_X11 ],
[ "X12", UC_ARM64_REG_X12 ],
[ "X13", UC_ARM64_REG_X13 ],
[ "X14", UC_ARM64_REG_X14 ],
[ "X15", UC_ARM64_REG_X15 ],
[ "X16", UC_ARM64_REG_X16 ],
[ "X17", UC_ARM64_REG_X17 ],
[ "X18", UC_ARM64_REG_X18 ],
[ "X19", UC_ARM64_REG_X19 ],
[ "X20", UC_ARM64_REG_X20 ],
[ "X21", UC_ARM64_REG_X21 ],
[ "X22", UC_ARM64_REG_X22 ],
[ "X23", UC_ARM64_REG_X23 ],
[ "X24", UC_ARM64_REG_X24 ],
[ "X25", UC_ARM64_REG_X25 ],
[ "X26", UC_ARM64_REG_X26 ],
[ "X27", UC_ARM64_REG_X27 ],
[ "X28", UC_ARM64_REG_X28 ],
[ "PC", UC_ARM64_REG_PC ],
[ "SP", UC_ARM64_REG_SP ],
[ "FP", UC_ARM64_REG_FP ],
[ "LR", UC_ARM64_REG_LR ],
[ "NZCV", UC_ARM64_REG_NZCV ]
],
"mips" : [
[ "zero", UC_MIPS_REG_0 ],
[ "at", UC_MIPS_REG_1 ],
[ "v0", UC_MIPS_REG_2 ],
[ "v1", UC_MIPS_REG_3 ],
[ "a0", UC_MIPS_REG_4 ],
[ "a1", UC_MIPS_REG_5 ],
[ "a2", UC_MIPS_REG_6 ],
[ "a3", UC_MIPS_REG_7 ],
[ "t0", UC_MIPS_REG_8 ],
[ "t1", UC_MIPS_REG_9 ],
[ "t2", UC_MIPS_REG_10 ],
[ "t3", UC_MIPS_REG_11 ],
[ "t4", UC_MIPS_REG_12 ],
[ "t5", UC_MIPS_REG_13 ],
[ "t6", UC_MIPS_REG_14 ],
[ "t7", UC_MIPS_REG_15 ],
[ "s0", UC_MIPS_REG_16 ],
[ "s1", UC_MIPS_REG_17 ],
[ "s2", UC_MIPS_REG_18 ],
[ "s3", UC_MIPS_REG_19 ],
[ "s4", UC_MIPS_REG_20 ],
[ "s5", UC_MIPS_REG_21 ],
[ "s6", UC_MIPS_REG_22 ],
[ "s7", UC_MIPS_REG_23 ],
[ "t8", UC_MIPS_REG_24 ],
[ "t9", UC_MIPS_REG_25 ],
[ "k0", UC_MIPS_REG_26 ],
[ "k1", UC_MIPS_REG_27 ],
[ "gp", UC_MIPS_REG_28 ],
[ "sp", UC_MIPS_REG_29 ],
[ "fp", UC_MIPS_REG_30 ],
[ "ra", UC_MIPS_REG_31 ],
[ "pc", UC_MIPS_REG_PC ],
]
}
return registers[arch]
@staticmethod
def get_register_bits(arch):
if arch.startswith("arm64"):
arch = "arm64"
elif arch.startswith("arm"):
arch = "arm"
elif arch.startswith("mips"):
arch = "mips"
registers_bits = {
"x64" : 64,
"x86" : 32,
"arm" : 32,
"arm64" : 64,
"mips" : 32
}
return registers_bits[arch]
@staticmethod
def get_register_ext_map(arch):
if arch.startswith("arm64"):
arch = "arm64"
elif arch.startswith("arm"):
arch = "arm"
elif arch.startswith("mips"):
arch = "mips"
registers_ext = {
"x64" : [
],
"x86" : [
],
"arm" : [
[ "D0", UC_ARM_REG_D0 ],
[ "D1", UC_ARM_REG_D1 ],
[ "D2", UC_ARM_REG_D2 ],
[ "D3", UC_ARM_REG_D3 ],
[ "D4", UC_ARM_REG_D4 ],
[ "D5", UC_ARM_REG_D5 ],
[ "D6", UC_ARM_REG_D6 ],
[ "D7", UC_ARM_REG_D7 ],
[ "D8", UC_ARM_REG_D8 ],
[ "D9", UC_ARM_REG_D9 ],
[ "D10", UC_ARM_REG_D10 ],
[ "D11", UC_ARM_REG_D11 ],
[ "D12", UC_ARM_REG_D12 ],
[ "D13", UC_ARM_REG_D13 ],
[ "D14", UC_ARM_REG_D14 ],
[ "D15", UC_ARM_REG_D15 ],
[ "D16", UC_ARM_REG_D16 ],
[ "D17", UC_ARM_REG_D17 ],
[ "D18", UC_ARM_REG_D18 ],
[ "D19", UC_ARM_REG_D19 ],
[ "D20", UC_ARM_REG_D20 ],
[ "D21", UC_ARM_REG_D21 ],
[ "D22", UC_ARM_REG_D22 ],
[ "D23", UC_ARM_REG_D23 ],
[ "D24", UC_ARM_REG_D24 ],
[ "D25", UC_ARM_REG_D25 ],
[ "D26", UC_ARM_REG_D26 ],
[ "D27", UC_ARM_REG_D27 ],
[ "D28", UC_ARM_REG_D28 ],
[ "D29", UC_ARM_REG_D29 ],
[ "D30", UC_ARM_REG_D30 ],
[ "D31", UC_ARM_REG_D31 ],
],
"arm64" : [
[ "Q0", UC_ARM64_REG_Q0 ],
[ "Q1", UC_ARM64_REG_Q1 ],
[ "Q2", UC_ARM64_REG_Q2 ],
[ "Q3", UC_ARM64_REG_Q3 ],
[ "Q4", UC_ARM64_REG_Q4 ],
[ "Q5", UC_ARM64_REG_Q5 ],
[ "Q6", UC_ARM64_REG_Q6 ],
[ "Q7", UC_ARM64_REG_Q7 ],
[ "Q8", UC_ARM64_REG_Q8 ],
[ "Q9", UC_ARM64_REG_Q9 ],
[ "Q10", UC_ARM64_REG_Q10 ],
[ "Q11", UC_ARM64_REG_Q11 ],
[ "Q12", UC_ARM64_REG_Q12 ],
[ "Q13", UC_ARM64_REG_Q13 ],
[ "Q14", UC_ARM64_REG_Q14 ],
[ "Q15", UC_ARM64_REG_Q15 ],
[ "Q16", UC_ARM64_REG_Q16 ],
[ "Q17", UC_ARM64_REG_Q17 ],
[ "Q18", UC_ARM64_REG_Q18 ],
[ "Q19", UC_ARM64_REG_Q19 ],
[ "Q20", UC_ARM64_REG_Q20 ],
[ "Q21", UC_ARM64_REG_Q21 ],
[ "Q22", UC_ARM64_REG_Q22 ],
[ "Q23", UC_ARM64_REG_Q23 ],
[ "Q24", UC_ARM64_REG_Q24 ],
[ "Q25", UC_ARM64_REG_Q25 ],
[ "Q26", UC_ARM64_REG_Q26 ],
[ "Q27", UC_ARM64_REG_Q27 ],
[ "Q28", UC_ARM64_REG_Q28 ],
[ "Q29", UC_ARM64_REG_Q29 ],
[ "Q30", UC_ARM64_REG_Q30 ],
[ "Q31", UC_ARM64_REG_Q31 ],
],
"mips" : [
]
}
return registers_ext[arch]
@staticmethod
def get_register_ext_bits(arch):
if arch.startswith("arm64"):
arch = "arm64"
elif arch.startswith("arm"):
arch = "arm"
elif arch.startswith("mips"):
arch = "mips"
registers_ext_bits = {
"x64" : 0,
"x86" : 0,
"arm" : 64,
"arm64" : 128,
"mips" : 0
}
return registers_ext_bits[arch]
@staticmethod
def get_register_ext_format(arch):
if arch.startswith("arm64"):
arch = "arm64"
elif arch.startswith("arm"):
arch = "arm"
elif arch.startswith("mips"):
arch = "mips"
registers_ext_format = {
"x64" : "",
"x86" : "",
"arm" : "0x%.16X",
"arm64" : "0x%.32X",
"mips" : 0
}
return registers_ext_format[arch]
@staticmethod
def is_thumb_ea(ea):
def handler():
if ph.id == PLFM_ARM and not ph.flag & PR_USE64:
if IDA_SDK_VERSION >= 700:
t = get_sreg(ea, "T") # get T flag
else:
t = get_segreg(ea, 20) # get T flag
return t is not BADSEL and t != 0
else:
return 0
return UEMU_HELPERS.exec_on_main(handler, MFF_READ)
@staticmethod
def trim_spaces(string):
return ' '.join(str(string).split())
@staticmethod
def bytes_to_str(bytes):
return " ".join("{:02X}".format(ord(c) if type(c) is str else c) for c in bytes)
class InitedCallable(object):
def __call__(self, flags):
return IDAAPI_HasValue(flags)
class UninitedCallable(object):
def __call__(self, flags):
return not IDAAPI_HasValue(flags)
# === Log
def uemu_log(entry, name="uEmu"):
msg("[" + name + "]: " + entry + "\n")
class DefaultDecode(ABC):
def __init__(self):
uc_setup = {
"x64" : [ UC_X86_REG_RIP, UC_ARCH_X86, UC_MODE_64 ],
"x86" : [ UC_X86_REG_EIP, UC_ARCH_X86, UC_MODE_32 ],
"arm64be" : [ UC_ARM64_REG_PC, UC_ARCH_ARM64, UC_MODE_ARM | UC_MODE_BIG_ENDIAN ],
"arm64le" : [ UC_ARM64_REG_PC, UC_ARCH_ARM64, UC_MODE_ARM | UC_MODE_LITTLE_ENDIAN ],
"armbe" : [ UC_ARM_REG_PC, UC_ARCH_ARM, UC_MODE_ARM | UC_MODE_BIG_ENDIAN ],
"armle" : [ UC_ARM_REG_PC, UC_ARCH_ARM, UC_MODE_ARM | UC_MODE_LITTLE_ENDIAN ],
"mips64be" : [ UC_MIPS_REG_PC, UC_ARCH_MIPS, UC_MODE_MIPS64 | UC_MODE_BIG_ENDIAN ],
"mips64le" : [ UC_MIPS_REG_PC, UC_ARCH_MIPS, UC_MODE_MIPS64 | UC_MODE_LITTLE_ENDIAN ],
"mipsbe" : [ UC_MIPS_REG_PC, UC_ARCH_MIPS, UC_MODE_MIPS32 | UC_MODE_BIG_ENDIAN ],
"mipsle" : [ UC_MIPS_REG_PC, UC_ARCH_MIPS, UC_MODE_MIPS32 | UC_MODE_LITTLE_ENDIAN ],}
arch = UEMU_HELPERS.get_arch()
self.uc_reg_pc, self.uc_arch, self.uc_mode = uc_setup[arch]
self.lazy_mapping = False
uemu_log("Unicorn version [ %s ]" % (unicorn.__version__))
uemu_log("CPU arch set to [ %s ]" % (arch))
self.stack= 0x800000
self.stack_size =0x5000
self.heap = 0x900000
self.heap_size = 0x5000
super().__init__()
def init_cpu_context(self, pc):
reg_map = UEMU_HELPERS.get_register_map(UEMU_HELPERS.get_arch())
for idx, val in enumerate(reg_map):
self.mu.reg_write(reg_map[idx][1], 0x0)
self.mu.reg_write(self.uc_reg_pc, pc)
@abstractmethod
def hook_code(self, mu, address, size, user_data):
pass
@abstractmethod
def run_from(self, address):
pass
@abstractmethod
def hook_mem_invalid(self, uc, access, address, size, value, user_data):
pass
class HydroMacDecode(DefaultDecode):
def __init__(self, decryption_func_start, string_offset, string_size):
self.final_decrypted_string =""
self.init_custom_hooks()
self.string_offset = string_offset
self.string_size = string_size
self.decryption_func_start = decryption_func_start
super().__init__()
def map_heap(self, address, size):
# - size is unsigned and must be != 0
# - starting address must be aligned to 4KB
# - map size must be multiple of the page size (4KB)
# - permissions can only contain READ or WRITE perms
try:
print("setting up the stack")
memStart = address
memEnd = address + size
memStartAligned = UEMU_HELPERS.ALIGN_PAGE_DOWN(memStart)
memEndAligned = UEMU_HELPERS.ALIGN_PAGE_UP(memEnd)
uemu_log(" map [%X:%X] -> [%X:%X]" % (memStart, memEnd - 1, memStartAligned, memEndAligned - 1))
self.mu.mem_map(memStartAligned, memEndAligned - memStartAligned)
self.mu.reg_write(UC_X86_REG_RAX, memStartAligned)
print(f" rax set to heap start at {hex(memStartAligned)}")
except UcError as e:
uemu_log("! <U> %s" % e)
def map_stack(self, address, size):
# - size is unsigned and must be != 0
# - starting address must be aligned to 4KB
# - map size must be multiple of the page size (4KB)
# - permissions can only contain READ or WRITE perms
try:
print("setting up the stack")
memStart = address
memEnd = address + size
memStartAligned = UEMU_HELPERS.ALIGN_PAGE_DOWN(memStart)
memEndAligned = UEMU_HELPERS.ALIGN_PAGE_UP(memEnd)
uemu_log(" map [%X:%X] -> [%X:%X]" % (memStart, memEnd - 1, memStartAligned, memEndAligned - 1))
self.mu.mem_map(memStartAligned, memEndAligned - memStartAligned)
# Holus Decode specifics
print("we skip ret value, and two arguments")
self.esp = memEndAligned -4-4-4
self.mu.reg_write(UC_X86_REG_RSP, self.esp)
print("we point eax to the begining of the stack")
self.mu.reg_write(UC_X86_REG_RAX, memStartAligned)
print(f" esp set to {hex(self.esp)}")
except UcError as e:
uemu_log("! <U> %s" % e)
def map_memory(self, address, size):
# - size is unsigned and must be != 0
# - starting address must be aligned to 4KB
# - map size must be multiple of the page size (4KB)
# - permissions can only contain READ or WRITE perms
try:
memStart = address
memEnd = address + size
memStartAligned = UEMU_HELPERS.ALIGN_PAGE_DOWN(memStart)
memEndAligned = UEMU_HELPERS.ALIGN_PAGE_UP(memEnd)
uemu_log(" map [%X:%X] -> [%X:%X]" % (memStart, memEnd - 1, memStartAligned, memEndAligned - 1))
self.mu.mem_map(memStartAligned, memEndAligned - memStartAligned)
except UcError as e:
uemu_log("! <U> %s" % e)
def copy_inited_data(self, start, end):
if IDAAPI_IsLoaded(start):
find_inited = False
else:
find_inited = True
ptr = IDAAPI_NextThat(start, end, UEMU_HELPERS.InitedCallable() if find_inited else UEMU_HELPERS.UninitedCallable())
while ptr < end:
if not find_inited:
# found uninitialized
tmp_end = end if ptr > end else ptr
uemu_log(" cpy [%X:%X]" % (start, tmp_end - 1))
if IDAAPI_GetBytes(start, tmp_end - start):
self.mu.mem_write(start, IDAAPI_GetBytes(start, tmp_end - start))
else:
# found initialized
uemu_log(" skp [%X:%X]" % (start, ptr - 1))
start = ptr
find_inited = not find_inited
ptr = IDAAPI_NextThat(ptr, end, UEMU_HELPERS.InitedCallable() if find_inited else UEMU_HELPERS.UninitedCallable())
# process tail if any
if not find_inited:
tmp_end = end if ptr > end else ptr
uemu_log(" cpy [%X:%X]" % (start, tmp_end - 1))
self.mu.mem_write(start, IDAAPI_GetBytes(start, tmp_end - start))
def hook_mem_invalid(self, uc, access, address, size, value, user_data):
uemu_log("! <M> Missing memory at 0x%x, data size = %u, data value = 0x%x" %(address, size, value))
page_start = UEMU_HELPERS.ALIGN_PAGE_DOWN(address)
page_end = UEMU_HELPERS.ALIGN_PAGE_UP(address + size)
self.map_memory(page_start, page_end - page_start)
return True
def trace_log(self):
bytes = IDAAPI_GetBytes(self.pc, IDAAPI_NextHead(self.pc) - self.pc)
bytes_hex = UEMU_HELPERS.bytes_to_str(bytes)
uemu_log("* TRACE<I> 0x%X | %-16s | %s" % (self.pc, bytes_hex, IDAAPI_GetDisasm(self.pc, 0)))
def run_from(self, address, end):
self.mu = Uc(self.uc_arch, self.uc_mode)
self.mu.hook_add(UC_HOOK_CODE, self.hook_code)
self.mu.hook_add(UC_HOOK_MEM_READ_UNMAPPED | UC_HOOK_MEM_WRITE_UNMAPPED | UC_HOOK_MEM_FETCH_UNMAPPED, self.hook_mem_invalid)
self.pc = address
self.end = end
try:
if self.init_cpu_context(address) == False:
return
#setup the stack
self.map_stack(self.stack, self.stack_size)
self.map_heap(self.heap, self.heap_size)
if not self.lazy_mapping:
uemu_log("Mapping segments...")
# get all segments and merge neighbours
lastAddress = BADADDR
for segEA in Segments():
segStart = IDAAPI_SegStart(segEA)
segEnd = IDAAPI_SegEnd(segEA)
endAligned = UEMU_HELPERS.ALIGN_PAGE_UP(segEnd)
# merge with provious if
# - we have mapped some segments already
# - aligned old segment is overlapping new segment
# otherwise map new
uemu_log("* seg [%X:%X]" % (segStart, segEnd))
if lastAddress != BADADDR and lastAddress > segStart:
if lastAddress < segEnd:
self.map_memory(lastAddress, endAligned - lastAddress)
else:
self.map_memory(segStart, endAligned - segStart)
# copy initialized bytes
self.copy_inited_data(segStart, segEnd)
lastAddress = endAligned
self.trace_log()
IDAAPI_SetColor(self.pc, CIC_ITEM, UEMU_CONFIG.IDAViewColor_PC)
self.emuActive = True
try:
self.mu.emu_start(self.pc, self.end)
except Exception as e:
if self.mu.reg_read(UC_X86_REG_EIP) == 1:
return
else:
raise e
return self.final_decrypted_string
except UcError as e:
uemu_log("! <U> %s" % e)
def u32(self, data):
return struct.unpack("I", data)[0]
def p32(self, num):
return struct.pack("<I", num)
# these are hardcoded offset found on HydroMac sample
# of the functions we want to hook
def init_custom_hooks(self):
self.hooked_funcs = {0x100003583: '__Znwm',
0x100003301: '__Znwm',
0x100003462: '__Znwm',
0x10000359e: '__throw_length_error',
0x10004f097: '__throw_length_error',
0x1000034D2: '__throw_length_error',
0x1000032a2: '_memcpy',
0x100003321: '_memcpy',
0x100003491: '_memcpy',
0x10004e73a: '_objc_autoreleasePoolPush',
0x10005097C: 'delete',
0x10000334A: 'delete'}
def hook_code(self, mu, address, size, user_data):
global new_object_size
global new_object_location
if address == 0x10004E752: #ugly: this an address where we can safely overwrite RSI and edx
print(f"function called with {hex(self.string_offset)} and {hex(self.string_size)}")
# setup RSI and RDX
self.mu.reg_write(UC_X86_REG_RSI, self.string_offset)
self.mu.reg_write(UC_X86_REG_EDX, self.string_size)
IDAAPI_SetColor(address, CIC_ITEM, UEMU_CONFIG.IDAViewColor_Reset)
# time.sleep(0.1)
instruction = mu.mem_read(address, size)
IDAAPI_SetColor(address, CIC_ITEM, UEMU_CONFIG.IDAViewColor_PC)
func = self.hooked_funcs.get(address)
#00000001003041B0 00 00 00 A7 25 00 00 00 00 00 00 00 01 00 00 00 ....%...........
#00000001003041C0 30 30 30 30 30 30 30 30 2D 30 30 30 30 2D 30 30 00000000-0000-00
#00000001003041D0 30 30 2D 30 30 30 30 2D 30 30 30 30 30 30 30 30 00-0000-00000000
#00000001003041E0 30 30 30 30 00 00 00 00 00 00 00 00 00 00 03 00 0000............
if func:
if func == '_objc_autoreleasePoolPush':
print("detected _objc_autoreleasePoolPush")
if func == '__throw_length_error':
raise ValueError(f'emulation trigered a __throw_length_error at {hex(address)}')
if func == '__Znwm':
ulong = self.mu.reg_read(UC_X86_REG_RDI)
print(f"detected __Znwm with value of {hex(ulong)}")
new_object_size = ulong
memStart = self.heap
self.mu.reg_write(UC_X86_REG_RAX, memStart)
self.heap = memStart + new_object_size
ulong = self.mu.reg_read(UC_X86_REG_RAX)
new_object_location = ulong
print(f"detected __Znwm return value of {hex(ulong)}")
if func == '_memcpy':
print("detected _memcpy")
dst = self.mu.reg_read(UC_X86_REG_RDI)
src = self.mu.reg_read(UC_X86_REG_RSI)
count = new_object_size
print(f"copying {hex(count)} bytes")
# copying [RSI] -> [RDI]
c = 0
while c <= count:
data = self.u32(mu.mem_read(src+c, 4 ))
self.mu.mem_write(dst + c, self.p32(data))
c = c + 4
print(f" _memcpy write to RDI {hex(dst)}")
if func == 'delete':
print("detected delete")
mu.reg_write(UC_X86_REG_RIP, address+size) # skiping the function, no need to emulate them
if address == self.pc:
print(f"[hook_code] emulation started! at {hex(address)}")
# extract and return string
if address == self.end-size:
print(f"[hook_code] emulation finished! at {hex(address)}")
rsi_reg = mu.reg_read(UC_X86_REG_RSI)
print(f"[hook_code] rsi register points to : {hex(rsi_reg)}")
#decoded_address = self.u32(mu.mem_read(self.stack, 4))
decoded_string = ""
decoded_address = new_object_location
#while decoded_address < new_object_location + new_object_size:
while True:
c = mu.mem_read(decoded_address, 1)
# print(f" final byte at {hex(decoded_address)} {c}")
if int.from_bytes(c, byteorder='big', signed=False) == 0:
break
else:
decoded_string += c.decode("utf-8")
decoded_address = decoded_address + 1
self.final_decrypted_string = decoded_string
print(f"final decrypted string: '{decoded_string}'")
def extract_encrypted_string(start, end):
encrypted_string = []
index = start
while index <= end:
encrypted_string.append(get_wide_byte(index))
index = index + 1
return encrypted_string
def hunt_eggs(local_offset):
print(f" egg hunt started at {hex(local_offset)}")
egg_limit_up = local_offset + 0x30
egg_limit_down = local_offset - 0x30
init_offset = local_offset
while (local_offset < egg_limit_up):
if print_insn_mnem(local_offset) == "mov" and "edx" in print_operand(local_offset, 0):
edx = get_operand_value(local_offset, 1)
return edx
else:
local_offset = local_offset + 1
# we move backwards
local_offset = init_offset
while (local_offset > egg_limit_down):
if print_insn_mnem(local_offset) == "mov" and "edx" in print_operand(local_offset, 0):
edx = get_operand_value(local_offset, 1)
return edx
else:
local_offset = local_offset - 1
def find_encrypted_strings(tag, start, end):
ea = start
ret = {}
local_offset = start
while (local_offset < end):
opn = print_operand(local_offset, 1)
if opn.find(tag) != -1:
# we store the offset where this string is at:
string_offset = get_operand_value(local_offset, 1)
# from local offset we go hunt for "mov edx, int", above and beyond the offset
edx = hunt_eggs(local_offset)
if edx is not None:
print(f"string {hex(string_offset)} used in _text segment at {hex(local_offset)}, with edx set to {hex(edx)}")
ret[string_offset] = edx
local_offset = local_offset + get_item_size(local_offset)
else:
local_offset = local_offset + 1
return ret
# user defined offset
decryption_func_start=0x10004E729
#decryption_func_start=idc.get_screen_ea()
print(f"decrypting at {hex(decryption_func_start)}")
decryption_func_end=0x100003362
ea = get_first_seg()
if ea == ida_idaapi.BADADDR:
print("unable to get first segment, quiting")4
exit(1)
final_result = {}
while (ea != ida_idaapi.BADADDR):
start = get_segm_start(ea)
end = get_segm_end(ea)
if get_segm_name(ea) == "__text":
print ("analyzing: {}: {}-{}".format(get_segm_name(ea), hex(start), hex(end)))
string_map = find_encrypted_strings("unk_", start, end) # corresponds to the begining of an encrypted string
print("total found strings {}\n".format(len(string_map)))
for key in string_map:
engine = HydroMacDecode(decryption_func_start, key, string_map[key])
final_decoded_string = engine.run_from(decryption_func_start, decryption_func_end)
set_cmt(decryption_func_start, final_decoded_string, 0)
final_result[key] = final_decoded_string
ea = get_next_seg(ea)
for key in final_result:
print(f" encrypted string at {hex(key)} decoded to : {final_result[key]}")
for key in final_result:
for x in XrefsTo(key, flags=0):
print ("found xrefs at {} for {}".format(hex(x.frm), hex(key)))
print ("adding comment to xrefs")
set_cmt(x.frm, final_result[key], 0) # settings xrefs comments
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment