Last active
November 5, 2023 22:12
-
-
Save williballenthin/1cb2512b726d3bbc955746f69eaed0da to your computer and use it in GitHub Desktop.
automatically resolve shellcode hashes into symbolic names using emulation, example: https://asciinema.org/a/EaHLv3yy7nGnh7mfHQ5DVy1LJ
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import logging | |
import pefile | |
import ucutils | |
import unicorn | |
import capstone | |
import argparse | |
import ucutils.emu | |
import ucutils.cli | |
import ucutils.plat.win32 | |
logger = logging.getLogger('auto-shellcode-hashes') | |
# unicorn and capstone are separate projects. | |
# i'm not sure that the register mappings are guaranteed to be consistent. | |
# so we build a mapping that translates capstone <-> unicorn register constants | |
U2C = {} | |
C2U = {} | |
# mapping from constant to string representation | |
U2S = {} | |
C2S = {} | |
REGS = set([]) | |
for const_name in dir(capstone.x86_const): | |
if not const_name.startswith('X86_REG_'): | |
continue | |
uconst_name = 'UC_' + const_name | |
reg_name = const_name[len('X86_REG_'):].lower() | |
uconst = getattr(unicorn.x86_const, uconst_name) | |
cconst = getattr(capstone.x86_const, const_name) | |
U2C[uconst] = cconst | |
C2U[cconst] = uconst | |
U2S[uconst] = reg_name | |
C2S[cconst] = reg_name | |
REGS.add(reg_name) | |
def load(emu, sc_addr, sc, dlls): | |
''' | |
load the shellcode at the given address, and map in the given DLLs. | |
maps the following: | |
- instructions | |
- TEB, PEB, and LDR_DATA | |
- stack | |
- each DLL | |
''' | |
logger.debug('mapping instructions at 0x%x', sc_addr) | |
emu.mem.map_data(sc_addr, sc, reason='code') | |
# stack layout: | |
# | |
# min-addr -> STACK_ADDR | |
# $sp ------> STACK_ADDR + 0x1000 | |
# $bp ------> STACK_ADDR + 0x2000 | |
# max-addr -> STACK_ADDR + 0x3000 | |
logger.debug('mapping stack at 0x%x', ucutils.STACK_ADDR) | |
emu.mem.map_region(ucutils.STACK_ADDR, ucutils.STACK_SIZE, reason='stack') | |
emu.stack_pointer = ucutils.STACK_ADDR + 0x1000 | |
emu.base_pointer = ucutils.STACK_ADDR + 0x2000 | |
emu.plat.map_teb() | |
for dll in dlls: | |
# we map DLLs as read-only since we do not want to emulate any of their code | |
emu.plat.load_dll(dll, perms=unicorn.UC_PROT_READ) | |
return sc_addr | |
class SimpleCmpRegisterTaintTracker(ucutils.emu.Hook): | |
''' | |
hook emulation and search for instructions like: | |
cmp REG, REG | |
where one of REG has the given target value. | |
tracks the other "tainted" value in `.tainted_values`. | |
''' | |
HOOK_TYPE = unicorn.UC_HOOK_CODE | |
def __init__(self, target): | |
super(SimpleCmpRegisterTaintTracker, self).__init__() | |
# look for comparsions against this target value. | |
self.target = target | |
# these are the values compared against our target. | |
self.tainted_values = set([]) | |
def hook(self, emu, address, size, user_data): | |
buf = emu.mem_read(address, size) | |
insn = next(emu.dis.disasm(bytes(buf), address)) | |
if insn.mnemonic != 'cmp': | |
return | |
op0, op1 = insn.operands | |
if op0.type != capstone.x86_const.X86_OP_REG: | |
return | |
if op1.type != capstone.x86_const.X86_OP_REG: | |
return | |
val0 = emu.reg_read(C2U[op0.reg]) | |
val1 = emu.reg_read(C2U[op1.reg]) | |
if val0 != self.target and val1 != self.target: | |
return | |
logger.debug('0x%x: tainted comparison: cmp %s=0x%x, %s=0x%x', | |
address, | |
C2S[op0.reg], | |
val0, | |
C2S[op1.reg], | |
val1) | |
self.tainted_values.add(val0) | |
self.tainted_values.add(val1) | |
class SimpleCmpMemTaintTracker(ucutils.emu.Hook): | |
''' | |
hook memory reads and search for instructions like: | |
cmp [mem], REG | |
or: | |
cmp REG, [mem] | |
where [mem] has the given target value. | |
tracks the value of REG in `.tainted_values`. | |
''' | |
HOOK_TYPE = unicorn.UC_HOOK_MEM_READ | |
def __init__(self, target): | |
super(SimpleCmpMemTaintTracker, self).__init__() | |
# look for comparsions against this target value. | |
self.target = target | |
# these are the values compared against our target. | |
self.tainted_values = set([]) | |
def hook(self, emu, _, address, size, __, ___): | |
if size != emu.ptr_size: | |
return | |
val = emu.arch.parse_ptr(emu, address) | |
if val != self.target: | |
return | |
buf = emu.mem_read(emu.program_counter, 0x10) | |
insn = next(emu.dis.disasm(bytes(buf), emu.program_counter)) | |
if insn.mnemonic != 'cmp': | |
return | |
op0, op1 = insn.operands | |
if op0.type == capstone.x86_const.X86_OP_REG: | |
v = emu.reg_read(C2U[op0.reg]) | |
elif op1.type == capstone.x86_const.X86_OP_REG: | |
v = emu.reg_read(C2U[op1.reg]) | |
self.tainted_values.add(v) | |
logger.info('%x: tainted value: %08x', emu.program_counter, v) | |
def compute_available_exports(dlls): | |
''' | |
collect all the exports available from the given DLLs loaded at their preferred addresses. | |
Args: | |
dlls (List[Dict[str, any]]): list of dicts with keys: | |
filename (str): filename of DLL. | |
pe (pefile.PE): parsed DLL. | |
Returns: | |
Dict[int, str]: mapping from load address to symbol name. | |
''' | |
ret = {} | |
for dll in dlls: | |
pe = dll['pe'] | |
image_base = pe.OPTIONAL_HEADER.ImageBase | |
for symbol in pe.DIRECTORY_ENTRY_EXPORT.symbols: | |
symbol_addr = image_base + symbol.address | |
if symbol.name: | |
# if exported by ordinal only, then ignore | |
ret[symbol_addr] = symbol.name.decode('ascii') | |
return ret | |
def resolve_address(dlls, addr): | |
''' | |
resolve the name of the export that prefers to be loaded at the given address. | |
Args: | |
dlls (List[Dict[str, any]]): list of dicts with keys: | |
filename (str): filename of DLL. | |
pe (pefile.PE): parsed DLL. | |
addr (int): preferred virtual address of export. | |
Returns: | |
str: the name of the export. | |
Raises: | |
KeyError: if the export is not found. | |
''' | |
for dll in dlls: | |
pe = dll['pe'] | |
image_base = pe.OPTIONAL_HEADER.ImageBase | |
for symbol in pe.DIRECTORY_ENTRY_EXPORT.symbols: | |
if addr != image_base + symbol.address: | |
continue | |
return symbol.name.decode('ascii') | |
raise KeyError(addr) | |
def extract_imports(emu, sc_addr, sc_len, dlls, resolver_offset): | |
''' | |
Emulate the shellcode at the given address with the given DLLs loaded | |
to resolve API hashes. | |
Assume the given function accepts a single argument: the pointer-sized hash to resolve. | |
Assume the given function returns the resolved pointer in EAX. | |
Args: | |
emu (ucutils.emu.Emulator): Unicorn emulator instance. | |
sc_addr (int): address of the start of the shellcode region. | |
sc_len (int): the size of the shellcode buffer. | |
dlls (List[Dict[str, any]]): list of dicts with keys: | |
filename (str): filename of DLL. | |
pe (pefile.PE): parsed DLL. | |
resolver_offset (int): relative offset into shellcode region of resolver function. | |
Returns: | |
Dict[int, str]: mapping from hash to export name. | |
''' | |
imports = {} | |
def in_shellcode(addr): | |
return sc_addr <= addr < sc_addr + sc_len | |
# here's the strategy: | |
# 1. emulate the resolver with a fake hash, monitoring for comparisons against the hash | |
# 2. for each tainted hash value, | |
# re-run the resolver function, and see what function pointer is resolved. | |
TAINTED_VALUE = 0x69696969 | |
emu.program_counter = sc_addr + resolver_offset | |
# we just pick some place we know is mapped as code. | |
# don't intend to actually execute here. | |
ret_addr = sc_addr | |
# find tainted hashes. | |
with ucutils.emu.context(emu): | |
# arg0: tainted value | |
emu.push(TAINTED_VALUE) | |
emu.push(ret_addr) | |
tt = SimpleCmpRegisterTaintTracker(TAINTED_VALUE) | |
tt2 = SimpleCmpMemTaintTracker(TAINTED_VALUE) | |
with ucutils.emu.hook(emu, tt): | |
with ucutils.emu.hook(emu, tt2): | |
try: | |
emu.go(ret_addr) # ret from hash function | |
except unicorn.UcError as e: | |
if e.errno == unicorn.UC_ERR_READ_UNMAPPED: | |
# metasploit will walk right off the end of the loaded modules list | |
# e.g. c24296214e969566e1cc36995eb184e5 at offset 0x15 | |
# | |
# resolve_function | |
# 00000006 60 PUSHAD | |
# 00000007 89 e5 MOV EBP, ESP | |
# 00000009 31 c0 XOR EAX, EAX | |
# 0000000b 64 8b 50 30 MOV EDX, dword ptr FS :[EAX + 0x30] | |
# 0000000f 8b 52 0c MOV EDX, dword ptr [EDX + 0xc] | |
# 00000012 8b 52 14 MOV EDX, dword ptr [EDX + 0x14] | |
# 00000015 8b 72 28 MOV ESI, dword ptr [EDX + 0x28] <<<<<<<<<<< | |
pass | |
else: | |
logger.warning('emulation error: %s, $pc: 0x%x', str(e), emu.program_counter) | |
tainted_values = tt.tainted_values | tt2.tainted_values | |
logger.info('identified %d tainted values', len(tainted_values)) | |
exports = compute_available_exports(dlls) | |
logger.info('identified %d potential symbols', len(exports)) | |
for hash in tainted_values: | |
logger.debug('attempting to resolve hash: %08x', hash) | |
try: | |
with ucutils.emu.context(emu): | |
# pretend we just CALL'd to the resolver routine with one argument: the potential API hash | |
emu.push(hash) | |
emu.push(ret_addr) | |
emu.program_counter = sc_addr + resolver_offset | |
# there are two potential outcomes that we want to catch: | |
# 1. the routine returns the function pointer, so we'll pluck that from eax. | |
# 2. the routine jumps directly to the resolved function, (this is what metasploit does). | |
# | |
# to handle (1), we'll definitely want to stop once the resolver function is complete. | |
# so, we only emulate to the return value. | |
# to handle (2), then we ensure all DLLs are mapped read-only (not executable). | |
# this way, the emulator will except when fetching a non-executable instruction. | |
# note: in practice, this is much faster than registering a hook (see `BreakOnConditionHook`). | |
try: | |
emu.go(ret_addr) | |
except unicorn.UcError as e: | |
if e.errno == unicorn.UC_ERR_FETCH_PROT: | |
# probably in case (2) ...or something is broken, | |
# so see if we're at the start of an exported symbol. | |
pfunc = emu.program_counter | |
else: | |
# something is wrong here | |
logger.info('failed to resolve hash %08x', hash) | |
continue | |
else: | |
# probably in case (1), | |
# so the function pointer is in the return value location (eax) | |
pfunc = emu.eax | |
if pfunc in exports: | |
fname = exports[pfunc] | |
logger.info('resolved %08x to function 0x%08x (%s)', hash, pfunc, fname) | |
imports[hash] = fname | |
else: | |
logger.info('failed to resolve hash %08x', hash) | |
continue | |
except unicorn.UcError as e: | |
logger.warning('emulation error: %s, failed to resolve hash: %08x', str(e), hash) | |
continue | |
return imports | |
def main(argv=None): | |
if argv is None: | |
argv = sys.argv[1:] | |
parser = argparse.ArgumentParser(description="Automatically extract shellcode hash resolutions.") | |
parser.add_argument("input", type=str, | |
help="Path to input file") | |
parser.add_argument("resolver_offset", type=lambda s: int(s, 0x10), | |
help="Relative offset to resolver function") | |
parser.add_argument("dlls", type=str, nargs='+', | |
help="Paths to DLL files to map") | |
parser.add_argument("-v", "--verbose", action="store_true", | |
help="Enable debug logging") | |
parser.add_argument("-q", "--quiet", action="store_true", | |
help="Disable all output but errors") | |
args = parser.parse_args(args=argv) | |
if args.verbose: | |
logging.basicConfig(level=logging.DEBUG) | |
logging.getLogger().setLevel(logging.DEBUG) | |
elif args.quiet: | |
logging.basicConfig(level=logging.ERROR) | |
logging.getLogger().setLevel(logging.ERROR) | |
else: | |
logging.basicConfig(level=logging.INFO) | |
logging.getLogger().setLevel(logging.INFO) | |
with open(args.input, 'rb') as f: | |
sc = f.read() | |
dlls = [] | |
for dllpath in args.dlls: | |
pe = pefile.PE(dllpath) | |
dlls.append({ | |
'filename': os.path.basename(dllpath), | |
'path': dllpath, | |
'pe': pe | |
}) | |
emu = ucutils.emu.Emulator(unicorn.UC_ARCH_X86, unicorn.UC_MODE_32, plat=ucutils.plat.win32) | |
load(emu, ucutils.CODE_ADDR, sc, dlls) | |
for hash, func in extract_imports(emu, ucutils.CODE_ADDR, len(sc), dlls, args.resolver_offset).items(): | |
print('%08x: %s' % (hash, func)) | |
return 0 | |
if __name__ == "__main__": | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment