Created
May 17, 2024 08:25
-
-
Save plowsec/c538083b3a01d5eeea8beffff6cf1058 to your computer and use it in GitHub Desktop.
IDA Pro plugin to propagate types (top-down) AND fix WDM unions AND rename tainted variables
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| ''' | |
| This IDAPython script automates the following operations to find x64 vulnerable kernel drivers with firmware access. | |
| * Triage | |
| 1. Identify IOCTL handlers in WDM/WDF drivers | |
| 2. Find execution paths from the handlers to the target API (MmMapIoSpace*) and instructions (IN/OUT) | |
| * Analysis | |
| 1. Fix union fields for IOCTL in the handlers and subroutines | |
| 2. Propagate function argument names/types in subroutines recursively to decide if input/output can be controlled | |
| (sometimes you need to refresh the code by pressing F5) | |
| Takahiro Haruyama (@cci_forensics) | |
| ''' | |
| from idc import * | |
| from idautils import * | |
| import idaapi | |
| import ida_kernwin | |
| import ida_hexrays | |
| from ida_hexrays import * | |
| from ida_struct import * | |
| import ida_typeinf, ida_pro, ida_auto, ida_loader, ida_idp, ida_kernwin | |
| import ntpath, os | |
| import time | |
| import sys | |
| # Define the action | |
| ACTION_NAME = "my:right_click_handler2222" | |
| ACTION_LABEL = "Get EA of Decompiled Function" | |
| ACTION_SHORTCUT = "" | |
| g_debug = True | |
| g_skip_lumina = False | |
| g_target_file_name = os.path.basename(get_input_file_path()) | |
| ERR_DECOMPILE_FAILED = -1 | |
| ERR_UNKNOWN_WDF_VERSION = -2 | |
| ERR_NO_XREFTO_WDF_BIND_INFO = -3 | |
| g_target_api_names = ['MmMapIoSpace', 'MmMapIoSpaceEx'] | |
| g_ioctl_handler_addrs = set() | |
| g_ioctl_handler_name = 'fn_ioctl_handler' | |
| g_follow_fn_names = ['DriverEntry', '_DriverEntry@8'] | |
| g_tinfo_apply_flag = ida_typeinf.TINFO_DEFINITE | |
| g_in_helper_names = ['__inbyte', '__inword', '__indword'] | |
| g_out_helper_names = ['__outbyte', '__outword', '__outdword'] | |
| g_tapi_paths = set() | |
| g_in_paths = set() | |
| g_out_paths = set() | |
| RENAME_RETRY_CNT = 10 | |
| # WDM | |
| IRP_MJ_DEVICE_CONTROL = 14 # _DRIVER_OBJECT.MajorFunction[14] = DispatchDeviceControl | |
| SOFF_PARAMS = 0x8 # Parameters structure offset in _IO_STACK_LOCATION | |
| UNUM_IOCTL = 0x10 # DeviceIoControl union number in _IO_STACK_LOCATION.Parameters | |
| SOFF_AIRP = 0x18 # AssociatedIrp structure offset in _IRP | |
| UNUM_SBUF = 0x2 # SystemBuffer union number in _IRP.AssociatedIrp | |
| import_type(-1, 'IO_STACK_LOCATION') | |
| import_type(-1, 'IRP') | |
| g_wdm_struc_union = [ | |
| {'name':'IO_STACK_LOCATION', 'offset':SOFF_PARAMS, 'union_num':UNUM_IOCTL}, | |
| {'name':'IRP', 'offset':SOFF_AIRP, 'union_num':UNUM_SBUF}, | |
| ] | |
| g_start_time = time.time() | |
| class ToFileStdOut(object): | |
| def __init__(self, path): | |
| self.outfile = open(path, "w") | |
| def write(self, text): | |
| self.outfile.write(text) | |
| def flush(self): | |
| self.outfile.flush() | |
| def isatty(self): | |
| return False | |
| def __del__(self): | |
| self.outfile.close() | |
| # Colorize output for Windows | |
| if os.name == 'nt': | |
| import ctypes | |
| ENABLE_PROCESSED_OUTPUT = 0x0001 | |
| ENABLE_WRAP_AT_EOL_OUTPUT = 0x0002 | |
| ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004 | |
| MODE = ENABLE_PROCESSED_OUTPUT + ENABLE_WRAP_AT_EOL_OUTPUT + ENABLE_VIRTUAL_TERMINAL_PROCESSING | |
| kernel32 = ctypes.windll.kernel32 | |
| handle = kernel32.GetStdHandle(-11) | |
| kernel32.SetConsoleMode(handle, MODE) | |
| def info(msg): | |
| print("\033[34m\033[1m[*]\033[0m {}".format(msg)) | |
| def success(msg): | |
| print("\033[32m\033[1m[+]\033[0m {}".format(msg)) | |
| def error(msg): | |
| print("\033[31m\033[1m[!]\033[0m {}".format(msg)) | |
| def debug(msg): | |
| if g_debug: | |
| print("\033[33m\033[1m[D]\033[0m {}".format(msg)) | |
| class MyRightClickHandler(ida_kernwin.action_handler_t): | |
| def __init__(self): | |
| ida_kernwin.action_handler_t.__init__(self) | |
| def activate(self, ctx): | |
| # Get the current decompiled function | |
| vu = ida_hexrays.get_widget_vdui(ctx.widget) | |
| if vu: | |
| ea = vu.cfunc.entry_ea | |
| print("EA of the currently decompiled function: %x" % ea) | |
| do_propagate(ea, vu.cfunc.type) | |
| return 1 | |
| def update(self, ctx): | |
| return ida_kernwin.AST_ENABLE_ALWAYS | |
| class my_hooks_t(ida_kernwin.UI_Hooks): | |
| def populating_widget_popup(self, widget, popup): | |
| if ida_kernwin.get_widget_type(widget) == ida_kernwin.BWN_PSEUDOCODE: | |
| ida_kernwin.attach_action_to_popup(widget, popup, ACTION_NAME) | |
| res = ida_kernwin.attach_action_to_popup( widget,popup,ACTION_NAME) | |
| print("Attach popup", res) | |
| class my_visitor_t(ctree_visitor_t): | |
| def __init__(self): | |
| ctree_visitor_t.__init__(self, CV_FAST) | |
| def check_var_flags(self, var, ea): | |
| _print = debug | |
| # Check the flags to judge if I should rename | |
| if var.has_nice_name: | |
| _print('{:#x}: {} has "nice name"'.format(ea, var.name)) | |
| if var.has_user_name: | |
| _print('{:#x}: {} has user-defined name'.format(ea, var.name)) | |
| if var.has_user_info: | |
| _print('{:#x}: {} has user-defined info'.format(ea, var.name)) | |
| if var.is_arg_var: | |
| _print('{:#x}: {} is a function argument'.format(ea, var.name)) | |
| #if var.is_promoted_arg(): | |
| # _print('{} is a promoted function argument'.format(var.name)) | |
| if var.is_notarg: | |
| _print('{:#x}: {} is a local variable'.format(ea, var.name)) | |
| # IDA decompiler has no API forcing lvar name | |
| def force_rename_lvar(self, ea, var, new_name): | |
| func_ea = get_func_attr(ea, FUNCATTR_START) | |
| debug('force_rename_lvar: function ea = {:#x}'.format(func_ea)) | |
| old_name = var.name | |
| if rename_lvar(func_ea, var.name, new_name): | |
| info('{:#x}: lvar name changed "{}" -> "{}" (rename_lvar)'.format(ea, old_name, new_name)) | |
| var.name = new_name # to refresh immediately | |
| return | |
| for i in range(RENAME_RETRY_CNT): | |
| if rename_lvar(func_ea, var.name, new_name + '_{}'.format(i + 1)): | |
| info('{:#x}: lvar name changed "{}" -> "{}" (rename_lvar)'.format(ea, old_name, | |
| new_name + '_{}'.format(i + 1))) | |
| var.name = new_name + '_{}'.format(i + 1) | |
| break | |
| else: | |
| error('{:#x}: renaming {} failed (rename_lvar, {} times)'.format(ea, var.name, RENAME_RETRY_CNT)) | |
| ''' | |
| # Try to rename using modify_user_lvars | |
| my_lvar_mod = my_lvar_modifier_t(var.name, new_name=new_name) | |
| if modify_user_lvars(func_ea, my_lvar_mod): | |
| info('{:#x}: lvar name "{}" set (modify_user_lvars)'.format(ea, new_name)) | |
| else: | |
| error('{:#x}: renaming {} failed (modify_user_lvars)'.format(ea, var.name)) | |
| ''' | |
| # Find lvar with type recursively | |
| def search_lvar(self, expr, _type=None): | |
| debug('{:#x}: cot number = {}'.format(expr.ea, expr.op)) | |
| var = tif = None | |
| if expr.op == cot_var: | |
| var = expr.v.getv() | |
| tif = _type | |
| debug('{:#x}: var={} type={}'.format(expr.ea, var.name, str(tif))) | |
| if tif: | |
| ida_typeinf.remove_tinfo_pointer(tif, None, tif.get_til()) | |
| elif expr.op == cot_cast: | |
| var, tif = self.search_lvar(expr.x, expr.type) | |
| elif expr.op in [cot_ref, cot_ptr]: | |
| var, tif = self.search_lvar(expr.x, _type) | |
| #debug('{:#x}: var={} ({})'.format(expr.ea, var.name, str(tif))) | |
| return var, tif | |
| # Summarize a name in a specified tree | |
| class sumname_visitor_t(my_visitor_t): | |
| MAX_NAME_LEN = 64 | |
| def __init__(self): | |
| ctree_visitor_t.__init__(self, CV_PARENTS) | |
| #ctree_visitor_t.__init__(self, CV_FAST) | |
| self.summed_name = '' | |
| self.ignore_names = ['LowPart', 'anonymous_0'] | |
| self.contains_struc_mem = False | |
| #def leave_expr(self, expr): # this makes function argument renaming fail | |
| def visit_expr(self, expr): | |
| debug('{:#x} (sumname_visitor_t): op = {}'.format(expr.ea, expr.op)) | |
| if expr.op == cot_var: | |
| var = expr.v.getv() | |
| self.check_var_flags(var, expr.ea) | |
| if (var.has_user_name or var.is_arg_var or not var.name.startswith('v')) and \ | |
| self.summed_name.find(var.name) == -1: | |
| #if self.summed_name.endswith('_deref') and var.name.startswith('_deref'): | |
| # self.summed_name += var.name[len('_deref'):] # prevent "_deref_deref_deref..." | |
| #else: | |
| self.summed_name += '_' + var.name | |
| ''' | |
| if not var.has_user_name and not var.is_arg_var and var.name.startswith('v'): | |
| # Abort (only user-defined or function argument names taken) | |
| #self.summed_name = '' | |
| #return 1 | |
| return 0 # consider the case "deref_SystemBuffer_field_20 + v6" | |
| ''' | |
| elif expr.op in [cot_memref, cot_memptr]: # x.m or x->m | |
| union_or_struc_name = expr.x.type.get_type_name() | |
| mid = get_member_id(get_struc(get_struc_id(union_or_struc_name)), expr.m) | |
| debug('{:#x} (sumname_visitor_t): mid = {:#x}'.format(expr.ea, mid)) | |
| if mid != BADADDR: | |
| mname = get_member_name(mid) | |
| debug('{:#x} (sumname_visitor_t): mname = {}'.format(expr.ea, mname)) | |
| if mname and mname not in self.ignore_names: | |
| self.summed_name += '_' + mname | |
| # Abort if one member name collected | |
| return 1 | |
| #elif expr.op == cot_memptr: # x->m | |
| # pass # TBD | |
| elif expr.op == cot_ref: # &x | |
| self.summed_name += '_ref' | |
| elif expr.op == cot_ptr: # *x | |
| self.summed_name += '_deref' | |
| elif expr.op == cot_num: | |
| parent = self.parent_expr() | |
| debug('{:#x} (sumname_visitor_t): cot_num under parent {}'.format(expr.ea, parent.op)) | |
| if parent.op == cot_add: | |
| if parent.x.op == cot_cast: | |
| type_no_ptr = ida_typeinf.remove_pointer(parent.type) | |
| debug('Casted type without pointer = {}, size = {}'.format(type_no_ptr, type_no_ptr.get_size())) | |
| field_num = expr.n._value * type_no_ptr.get_size() | |
| else: | |
| field_num = expr.n._value | |
| self.summed_name += '_field_{:x}'.format(field_num) | |
| elif expr.op == cot_obj: # e.g., global variable | |
| name = get_name(expr.obj_ea) | |
| debug('{:#x} (sumname_visitor_t): global variable {} ({:#x})'.format(expr.ea, name, expr.obj_ea)) | |
| self.summed_name += '_' + name | |
| elif expr.op == cot_call: | |
| # Abort (avoid assignment for return value) | |
| self.summed_name = '' | |
| return 1 | |
| return 0 | |
| def get_summed_name(self): | |
| debug('sumname_visitor_t: summarized name before filtering = {}'.format(self.summed_name)) | |
| # Avoid ones without memref/memptr or with just ref (e.g., &v1) | |
| if self.summed_name.startswith('_field_') or self.summed_name == '_ref': | |
| return '' | |
| else: | |
| if len(self.summed_name) > self.MAX_NAME_LEN: | |
| shortened = self.summed_name[:self.MAX_NAME_LEN] + '_cut' | |
| debug('sumname_visitor_t: summarized name {} is too long and will be cut back as {}'.format(self.summed_name, shortened)) | |
| self.summed_name = shortened | |
| return self.summed_name | |
| # Change type of the specified lvar name | |
| class my_lvar_modifier_t(user_lvar_modifier_t): | |
| def __init__(self, target_name, new_name=None, new_decl=None, new_tif=None): | |
| user_lvar_modifier_t.__init__(self) | |
| self.target_name = target_name | |
| self.new_name = new_name | |
| self.new_decl = new_decl | |
| self.new_tif = new_tif | |
| def modify_lvars(self, lvars): | |
| #debug('modify_lvars: len(lvars.lvvec) = {}'.format(len(lvars.lvvec))) | |
| if len(lvars.lvvec) == 0: | |
| error('modify_lvars: len(lvars.lvvec) == 0') | |
| for idx, one in enumerate(lvars.lvvec): | |
| debug('modify_lvars: target_name = "{}" current = "{}"'.format(self.target_name, one.name)) | |
| # Set the type to the target var | |
| if one.name == self.target_name: | |
| if self.new_name: | |
| one.name = self.new_name | |
| info('modify_lvars: Name "{}" set to {}'.format(one.name, self.target_name)) | |
| tif = None | |
| if self.new_decl: | |
| tif = ida_typeinf.tinfo_t() | |
| res = ida_typeinf.parse_decl(tif, None, self.new_decl, 0) | |
| #if not res: | |
| # error('{}: parse_decl from {} FAILED'.format(one.name, self.new_decl)) | |
| elif self.new_tif: | |
| tif = self.new_tif | |
| if tif: | |
| one.type = tif | |
| info('modify_lvars: Type "{}" set to {}'.format(str(tif), one.name)) | |
| return True | |
| return False | |
| class ioctl_propagator_t(my_visitor_t): | |
| def __init__(self, call_path, ea, cfunc=None, manual=False): | |
| #ctree_visitor_t.__init__(self, CV_FAST) | |
| #ctree_visitor_t.__init__(self, CV_PARENTS | CV_RESTART) | |
| ctree_visitor_t.__init__(self, CV_PARENTS | CV_POST | CV_RESTART) | |
| self.call_path = call_path | |
| if not manual: | |
| self.call_path += [ea] | |
| self.union_propagate_ll = {} # forced to rename local variable locations | |
| self.current_cfunc = cfunc | |
| def apply_func_type(self, caller_ea, callee_ea, func_tif, func_cdecl, path_start): | |
| if func_tif: | |
| apply_flags = g_tinfo_apply_flag | |
| if ida_typeinf.apply_tinfo(callee_ea, func_tif, apply_flags): | |
| info('{:#x}: [FUNC TYPE APPLIED (tinfo)] {}'.format(callee_ea, str(func_tif))) | |
| else: | |
| error('{:#x}: Applying a function type by tinfo FAILED {}'.format(callee_ea, str(func_tif))) | |
| elif func_cdecl: | |
| if ida_typeinf.apply_cdecl(None, callee_ea, func_cdecl): | |
| info('{:#x}: [FUNC TYPE APPLIED (cdecl)] {}'.format(callee_ea, func_cdecl)) | |
| else: | |
| error('{:#x}: Applying a function type by cdecl FAILED {}'.format(callee_ea, func_cdecl)) | |
| def propagate_arg_types(self, caller_ea, callee_ea, func_tif=None, func_cdecl=None, path_start=False, depth=0): | |
| debug("propagating") | |
| if callee_ea in self.call_path and not path_start: | |
| debug('{:#x}: Recursion detected'.format(caller_ea)) | |
| return | |
| if ida_kernwin.cvar.batch and time.time() - g_start_time > 120: | |
| debug('Abort, timeout') | |
| return | |
| debug(f'depth = {depth}') | |
| # Apply the function prototype | |
| self.apply_func_type(caller_ea, callee_ea, func_tif, func_cdecl, path_start) | |
| # Apply the same op recursively | |
| cfunc = get_ctree_root(callee_ea) | |
| if cfunc: | |
| if path_start: | |
| iv = ioctl_propagator_t([], callee_ea, cfunc) # Start from the IOCTL handler | |
| else: | |
| iv = ioctl_propagator_t(self.call_path, callee_ea, cfunc) | |
| debug(f'callpath len = {len(self.call_path)}') | |
| if len(self.call_path) > 15: | |
| debug(f'Aborting, too deep!') | |
| return | |
| #iv.apply_to(cfunc.body, None) | |
| iv.apply_to_exprs(cfunc.body, None) | |
| # delayed saving to prevent lvar renaming errors | |
| cfunc.save_user_unions() | |
| def is_libthunk(self, ea): | |
| fname = get_func_name(ea) | |
| flags = get_func_attr(ea, FUNCATTR_FLAGS) | |
| if flags & FUNC_LIB: | |
| debug('{}: ignored because of library function'.format(fname)) | |
| return True | |
| if flags & FUNC_THUNK: | |
| debug('{}: ignored because of thunk function'.format(fname)) | |
| return True | |
| return False | |
| def is_lumina(self, ea): | |
| fname = get_func_name(ea) | |
| flags = get_func_attr(ea, FUNCATTR_FLAGS) | |
| if flags & FUNC_LUMINA: | |
| debug('{}: function information provided by Lumina'.format(fname)) | |
| return True | |
| return True # ugly hack to make this script work with debug symbols | |
| def get_struc_member_tinfo(self, struc_name, mem_offset): | |
| # Get the member at the offset | |
| mem = get_member(get_struc(get_struc_id(struc_name)), mem_offset) | |
| # Get the member tinfo_t | |
| tif = ida_typeinf.tinfo_t() | |
| if get_or_guess_member_tinfo(tif, mem): | |
| return tif | |
| else: | |
| error('Cannot get the structure {} member {:#x} tinfo'.format(struc_name, mem_offset)) | |
| return None | |
| def set_union_type_number(self, expr, tif, unum): | |
| # Change the union type | |
| expr.type = tif | |
| info('{:#x}: [UNION TYPE APPLIED] {}'.format(expr.ea, tif.get_type_name())) | |
| # Change the union number | |
| expr.m = unum | |
| info('{:#x}: union member number changed to {:#x}'.format(expr.ea, expr.m)) | |
| #cfunc = get_ctree_root(get_func_attr(expr.ea, FUNCATTR_START)) | |
| cfunc = self.current_cfunc | |
| if cfunc: | |
| # Save the user selection into idb | |
| path = ida_pro.intvec_t() | |
| path.add_unique(unum) | |
| res = cfunc.set_user_union_selection(expr.ea, path) | |
| print("Set user union res:", res) | |
| cfunc.save_user_unions()# <- trigger lvar renaming errors | |
| else: | |
| print(f"Error ") | |
| # Save the location to rename the left-side lvar forcibly | |
| self.union_propagate_ll[expr.ea] = tif | |
| def wdm_fix_union_type_number(self, expr): | |
| for target in g_wdm_struc_union: | |
| if str(expr.x.x.type).find(target['name']) != -1 and expr.x.m == target['offset']: | |
| debug('{:#x}: changing the union type and number in {}'.format(expr.ea, target['name'])) | |
| tif_struc_mem = self.get_struc_member_tinfo(target['name'], target['offset']) | |
| if tif_struc_mem: | |
| tif_union_num = self.get_struc_member_tinfo(tif_struc_mem.get_type_name(), target['union_num']) | |
| if tif_union_num: | |
| res = self.set_union_type_number(expr, tif_union_num, target['union_num']) | |
| print(f"Res of set union member: {res}") | |
| else: | |
| print("Error2") | |
| else: | |
| print("Error!!") | |
| break | |
| def get_name_by_traverse(self, item, parent): | |
| sv = sumname_visitor_t() | |
| #sv.apply_to_exprs(item, None) | |
| sv.apply_to_exprs(item, parent) | |
| return sv.get_summed_name() | |
| # WDM: Identify IOCTL handler then fix the IOCTL-related union member numbers | |
| # WDF: Set argument names and types of APIs handling input/output | |
| def visit_expr(self, expr): | |
| if expr.op == cot_memptr: | |
| if False and ida_kernwin.cvar.batch: # Skip in batch mode | |
| return 0 | |
| debug('{:#x}: memptr {} (offset {:#x})'.format(expr.ea, str(expr.x.type), expr.m)) | |
| # WDM: Fix the correct union type & member number | |
| if expr.x.op == cot_memptr: | |
| self.wdm_fix_union_type_number(expr) | |
| elif expr.op == cot_memref: | |
| debug('{:#x}: memref {} (offset {:#x})'.format(expr.ea, str(expr.x.type), expr.m)) | |
| # WDM: Fix the correct union type & member number | |
| if expr.x.op == cot_memptr: | |
| self.wdm_fix_union_type_number(expr) | |
| return 0 | |
| # Propagate type/name recursively and save paths to the target APIs/instructions | |
| # WDF: Load WDF type information then identify IOCTL handler | |
| def leave_expr(self, expr): | |
| if expr.op == cot_call: | |
| #func_name = get_func_name(expr.x.obj_ea) | |
| func_name = get_name(expr.x.obj_ea) # to get API name | |
| debug('{:#x}: call {} ({:#x})'.format(expr.ea, func_name, expr.x.obj_ea)) | |
| # Identify the last path to the API call | |
| if func_name in g_target_api_names and self.call_path[0] in g_ioctl_handler_addrs: | |
| # Save the path | |
| global g_tapi_paths | |
| #g_tapi_paths.add(tuple(self.call_path + [expr.x.obj_ea])) # API address | |
| g_tapi_paths.add(tuple(self.call_path + [expr.ea])) # API call address | |
| # Avoid invalid ea, API calls and inline library functions | |
| if expr.x.obj_ea == BADADDR or self.is_libthunk(expr.x.obj_ea) or \ | |
| (not func_name.startswith('sub_') and func_name not in g_follow_fn_names and \ | |
| not self.is_lumina(expr.x.obj_ea)): | |
| return 0 | |
| # Avoid Lumina functions explicitly | |
| if self.is_lumina(expr.x.obj_ea) and g_skip_lumina: | |
| return 0 | |
| # Analyze the function arguments | |
| debug(('{:#x}: analyzing function arguments'.format(expr.ea))) | |
| pt_args = [] | |
| for i in range(expr.a.size()): | |
| arg = expr.a.at(i) | |
| arg_type = str(arg.type) | |
| arg_name = self.get_name_by_traverse(arg, expr) | |
| if arg_name: | |
| debug('{:#x}: argument summarized name {}'.format(arg.ea, arg_name)) | |
| arg_name += '_' + str(i) # Avoid duplicative names | |
| debug('arg {}: type = {}, name = {}'.format(i, arg_type, arg_name)) | |
| pt_args.append('{} {}'.format(arg_type, arg_name)) | |
| # Rename the function | |
| if func_name.startswith('sub_'): | |
| new_func_name = 'fn_{:X}'.format(expr.x.obj_ea) | |
| elif demangle_name(func_name, get_inf_attr(INF_SHORT_DN)): | |
| new_func_name = demangle_name(func_name, get_inf_attr(INF_SHORT_DN)).split('(')[0] | |
| else: # DriverEntry or Lumina functions or WdfVersionBind | |
| new_func_name = func_name | |
| ida_name.force_name(expr.x.obj_ea, new_func_name) | |
| cdecl_args = ', '.join(pt_args) | |
| func_pt = '{} __fastcall {}({});'.format(expr.x.type.get_rettype(), new_func_name, cdecl_args) | |
| debug('{:#x}: assembled function prototype for {:#x} = "{}"'.format(expr.ea, expr.x.obj_ea, | |
| func_pt)) | |
| breakpoint() | |
| self.propagate_arg_types(expr.ea, expr.x.obj_ea, func_cdecl=func_pt) | |
| elif expr.op == cot_asg: | |
| # x: local variable without user-defined name and no function argument | |
| var, tif = self.search_lvar(expr.x) | |
| if var: | |
| self.check_var_flags(var, expr.ea) | |
| # Avoid vars with user-defined names or from arguments | |
| if var.has_user_name or var.is_arg_var: | |
| #if var.has_user_name: | |
| return 0 | |
| # Avoid vars automatically-renamed by IDA (but include if the location is in the list) | |
| debug('union_propagate_ll = {}'.format(['{:#x}'.format(x) for x in self.union_propagate_ll])) | |
| if not var.name.startswith('v') and expr.ea not in self.union_propagate_ll and \ | |
| expr.y.ea not in self.union_propagate_ll: | |
| # return 0 | |
| pass | |
| debug('{:#x}: assign to lvar {}'.format(expr.ea, var.name)) | |
| # y: something derived from function argument (e.g., ptr->add->cast, cast, etc.) | |
| ''' | |
| # Memo: the following code will cause a crash | |
| func_ea = get_func_attr(expr.ea, FUNCATTR_START) | |
| cfunc = get_ctree_root(func_ea) | |
| cp = cfunc_parentee_t(cfunc) | |
| tif = ida_typeinf.tinfo_t() | |
| if cp.calc_rvalue_type(tif, expr.x): | |
| debug('calculated type is {}'.format(str(tif))) | |
| ''' | |
| # Traverse from y to summarize the right side name | |
| summed_name = self.get_name_by_traverse(expr.y, expr) | |
| if summed_name: | |
| debug('{:#x}: right side summarized name {}'.format(expr.y.ea, summed_name)) | |
| self.force_rename_lvar(expr.ea, var, summed_name) | |
| # Set types of fixed union member values | |
| if expr.y.ea in self.union_propagate_ll: | |
| #my_lvar_mod = my_lvar_modifier_t(var.name, new_tif=self.union_propagate_ll[expr.y.ea]) | |
| my_lvar_mod = my_lvar_modifier_t(var.name, new_tif=expr.y.type) | |
| modify_user_lvars(get_func_attr(expr.ea, FUNCATTR_START), my_lvar_mod) | |
| return 0 | |
| # Ported from examples/hexrays/decompile_entry_points.py | |
| def init_hexrays(): | |
| ALL_DECOMPILERS = { | |
| ida_idp.PLFM_386: "hexrays", | |
| ida_idp.PLFM_ARM: "hexarm", | |
| ida_idp.PLFM_PPC: "hexppc", | |
| ida_idp.PLFM_MIPS: "hexmips", | |
| } | |
| cpu = ida_idp.ph.id | |
| decompiler = ALL_DECOMPILERS.get(cpu, None) | |
| if not decompiler: | |
| error("No known decompilers for architecture with ID: %d" % ida_idp.ph.id) | |
| return False | |
| if ida_ida.inf_is_64bit(): | |
| if cpu == ida_idp.PLFM_386: | |
| decompiler = "hexx64" | |
| else: | |
| decompiler += "64" | |
| if ida_loader.load_plugin(decompiler) and init_hexrays_plugin(): | |
| return True | |
| else: | |
| error('Couldn\'t load or initialize decompiler: "%s"' % decompiler) | |
| return False | |
| def exit_without_change(status): | |
| print('-' * 50) # Differentiate the log | |
| # Not create/change idb | |
| process_config_line("ABANDON_DATABASE=YES") | |
| # Exit with the status code | |
| qexit(status) | |
| def get_ctree_root(ea): | |
| cfunc = None | |
| try: | |
| #cfunc = idaapi.decompile(ea) | |
| cfunc = decompile(ea, flags=DECOMP_NO_CACHE) | |
| except: | |
| error('Decompilation of a function {:#x} failed'.format(ea)) | |
| return cfunc | |
| def main(): | |
| ''' | |
| -1: Decompiler initialization failure | |
| 0: The execution works but no IOCTL handler found | |
| 100: IOCTL handler found but no path found | |
| 200 or 300: IOCTL handler found and 1 or 2 paths found (e.g., paths to the target API and OUT insn) | |
| 400: IOCTL and all paths found | |
| ''' | |
| status = 0 | |
| info('start') | |
| if ida_kernwin.cvar.batch: # batch mode execution | |
| # Wait until the initial auto analysis is finished | |
| ida_auto.auto_wait() | |
| # We need to load the decompiler manually | |
| if not init_hexrays(): | |
| error('{}: Decompiler initialization failed. Aborted.'.format(g_target_file_name)) | |
| if ida_kernwin.cvar.batch: | |
| exit_without_change(ERR_DECOMPILE_FAILED) | |
| # Demangle names | |
| idaapi.cvar.inf.demnames = 1 | |
| ida_kernwin.refresh_idaview_anyway() | |
| #ea = get_screen_ea() | |
| ea = get_inf_attr(INF_START_IP) | |
| info('{:#x}: analysis start at "{}"'.format(ea, get_name(ea))) | |
| cfunc = get_ctree_root(ea) | |
| if cfunc: | |
| iv = ioctl_propagator_t([], ea) | |
| #iv.apply_to(cfunc.body, None) | |
| iv.apply_to_exprs(cfunc.body, None) | |
| if g_ioctl_handler_addrs: | |
| success('IOCTL handler addresses = {}'.format(['{:#x}'.format(x) for x in g_ioctl_handler_addrs])) | |
| status += 100 | |
| info('{}: Done with status {}'.format(g_target_file_name, status)) | |
| if ida_kernwin.cvar.batch: | |
| ida_pro.qexit(0) | |
| def do_propagate(ea, typeinfo): | |
| # Demangle names | |
| idaapi.cvar.inf.demnames = 1 | |
| ida_kernwin.refresh_idaview_anyway() | |
| #ea = get_screen_ea() | |
| info('{:#x}: analysis start at "{}"'.format(ea, get_name(ea))) | |
| cfunc = get_ctree_root(ea) | |
| if cfunc: | |
| iv = ioctl_propagator_t([], ea, cfunc=cfunc, manual=True) | |
| # iv.apply_to(cfunc.body, None) | |
| iv.apply_to_exprs(cfunc.body, None) | |
| #iv.propagate_arg_types(None, ea, func_tif=typeinfo, path_start=True) | |
| cfunc.save_user_unions() | |
| if __name__ == '__main__': | |
| action_desc = ida_kernwin.action_desc_t( | |
| ACTION_NAME, | |
| ACTION_LABEL, | |
| MyRightClickHandler(), | |
| ACTION_SHORTCUT, | |
| "Propagate types from current function", | |
| -1 | |
| ) | |
| print(action_desc) | |
| # Register the action | |
| res = ida_kernwin.register_action(action_desc) | |
| print("Register action", res) | |
| my_hooks = my_hooks_t() | |
| my_hooks.hook() | |
| init_hexrays() | |
| print("Done") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment