Created
March 22, 2023 06:56
-
-
Save stevenklar/89c290f61ce66bd06c0cf124be9f125f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Copyright (C) 2017 Mandiant, Inc. All Rights Reserved. | |
| #!/usr/bin/env python3 | |
| """ | |
| Run FLOSS to automatically extract obfuscated strings and apply them to the | |
| currently loaded module in IDA Pro. | |
| author: Willi Ballenthin | |
| email: [email protected] | |
| """ | |
| import os | |
| import time | |
| import logging | |
| from typing import List, Union | |
| import threading | |
| import idaapi | |
| import idc | |
| from idautils import * | |
| import viv_utils | |
| import functools | |
| import floss | |
| import floss.main | |
| import floss.utils | |
| import floss.render | |
| import floss.identify | |
| import floss.stackstrings | |
| import floss.tightstrings | |
| import floss.string_decoder | |
| from floss.results import AddressType, StackString, TightString, DecodedString | |
| # Imports for qt widget | |
| import ida_kernwin | |
| import ida_idaapi | |
| # from PyQt5.QtWidgets import QWidget, QVBoxLayout, QListWidget, QListWidgetItem | |
| from PyQt5.QtWidgets import QWidget, QVBoxLayout, QTableWidget, QTableWidgetItem, QAbstractItemView | |
| logger = logging.getLogger("floss.idaplugin") | |
| MIN_LENGTH = 4 | |
| # edit to get logs :) | |
| def floss_log(entry, name="FLOSS"): | |
| pass | |
| # idaapi.msg("[" + name + "]: " + entry + "\n") | |
| mutex = threading.Condition() | |
| def ida_wrapper(f): | |
| @functools.wraps(f) | |
| def wrapper(*args, **kwargs): | |
| rv = [] | |
| error = [] | |
| def work(): | |
| try: | |
| result = f(*args, **kwargs) | |
| rv.append(result) | |
| except Exception as e: | |
| error.append(e) | |
| with mutex: | |
| idaapi.execute_sync(work, idaapi.MFF_WRITE) | |
| if error: | |
| msg = 'Failed on calling {}.{} with args: {}, kwargs: {}\nException: {}' \ | |
| .format(f.__module__, f.__name__, args, kwargs, str(error[0])) | |
| print('[!!!] ERROR:', msg) | |
| raise error[0] | |
| floss_log("[DEBUG][ida_wrapper] %s" % rv[0]) | |
| return rv[0] | |
| return wrapper | |
| def ida_wrapper_read(f): | |
| @functools.wraps(f) | |
| def wrapper(*args, **kwargs): | |
| rv = [] | |
| error = [] | |
| def work(): | |
| try: | |
| result = f(*args, **kwargs) | |
| rv.append(result) | |
| except Exception as e: | |
| error.append(e) | |
| with mutex: | |
| idaapi.execute_sync(work, idaapi.MFF_READ) | |
| if error: | |
| msg = 'Failed on calling {}.{} with args: {}, kwargs: {}\nException: {}' \ | |
| .format(f.__module__, f.__name__, args, kwargs, str(error[0])) | |
| print('[!!!] ERROR:', msg) | |
| raise error[0] | |
| floss_log("[DEBUG][ida_wrapper_read] %s" % rv[0]) | |
| return rv[0] | |
| return wrapper | |
| @ida_wrapper | |
| def append_comment(ea: int, s: str, repeatable: bool = False) -> None: | |
| """ | |
| add the given string as a (possibly repeating) comment to the given address. | |
| does not add the comment if it already exists. | |
| adds the comment on its own line. | |
| Args: | |
| ea: the address at which to add the comment. | |
| s: the comment text. | |
| repeatable: if True, set a repeatable comment. | |
| """ | |
| # see: http://blogs.norman.com/2011/security-research/improving-ida-analysis-of-x64-exception-handling | |
| if repeatable: | |
| cmt = idc.get_cmt(ea, True) | |
| else: | |
| cmt = idc.get_cmt(ea, False) | |
| if not cmt: | |
| cmt = s # no existing comment | |
| else: | |
| if s in cmt: # ignore duplicates | |
| return | |
| cmt = cmt + "\r\n" + s | |
| if repeatable: | |
| idc.set_cmt(ea, cmt, True) | |
| else: | |
| idc.set_cmt(ea, cmt, False) | |
| def append_lvar_comment(fva: int, frame_offset: int, s: str, repeatable: bool = False) -> None: | |
| """ | |
| add the given string as a (possibly repeatable) stack variable comment to the given function. | |
| does not add the comment if it already exists. | |
| adds the comment on its own line. | |
| Args: | |
| fva: the address of the function with the stack variable. | |
| frame_offset: the offset into the stack frame at which the variable is found. | |
| s: the comment text. | |
| repeatable: if True, set a repeatable comment. | |
| """ | |
| @ida_wrapper_read | |
| def get_func_attr(fva, funcattr): | |
| idc.get_func_attr(fva, funcattr) | |
| stack = get_func_attr(fva, idc.FUNCATTR_FRAME) | |
| if not stack: | |
| raise RuntimeError("failed to find stack frame for function: 0x%x" % fva) | |
| lvar_offset = ( | |
| get_func_attr(fva, idc.FUNCATTR_FRSIZE) - frame_offset | |
| ) # alternative: idc.get_frame_lvar_size(fva) - frame_offset | |
| if not lvar_offset: | |
| raise RuntimeError("failed to compute local variable offset: 0x%x 0x%x %s" % (fva, stack, s)) | |
| if lvar_offset <= 0: | |
| raise RuntimeError("failed to compute positive local variable offset: 0x%x 0x%x %s" % (fva, stack, s)) | |
| @ida_wrapper_read | |
| def get_member_cmt(stack, lvar_offset, repeatable): | |
| idc.get_member_cmt(stack, lvar_offset, repeatable) | |
| string = get_member_cmt(stack, lvar_offset, repeatable) | |
| if not string: | |
| string = s | |
| else: | |
| if s in string: # ignore duplicates | |
| return | |
| string = string + "\r\n" + s | |
| @ida_wrapper | |
| def set_member_cmt(stack, lvar_offset, string, repeatable): | |
| idc.set_member_cmt(stack, lvar_offset, string, repeatable) | |
| if not set_member_cmt(stack, lvar_offset, string, repeatable): | |
| raise RuntimeError("failed to set comment: 0x%08x 0x%08x 0x%08x: %s" % (fva, stack, lvar_offset, s)) | |
| def apply_decoded_strings(decoded_strings: List[DecodedString]) -> None: | |
| for ds in decoded_strings: | |
| if not ds.string: | |
| continue | |
| def add_to_list_widget(string, address): | |
| list_widget.add_item(string, address) | |
| if ds.address_type == AddressType.GLOBAL: | |
| logger.info("decoded string at global address 0x%x: %s", ds.address, ds.string) | |
| append_comment(ds.address, ds.string) | |
| ida_kernwin.execute_sync(lambda: add_to_list_widget(ds.string, ds.address), ida_kernwin.MFF_WRITE) | |
| else: | |
| logger.info("decoded string for function call at 0x%x: %s", ds.decoded_at, ds.string) | |
| append_comment(ds.decoded_at, ds.string) | |
| ida_kernwin.execute_sync(lambda: add_to_list_widget(ds.string, ds.decoded_at), ida_kernwin.MFF_WRITE) | |
| def apply_stack_strings( | |
| strings: List[Union[StackString, TightString]], lvar_cmt: bool = True, cmt: bool = True | |
| ) -> None: | |
| """ | |
| lvar_cmt: apply stack variable comment | |
| cmt: apply regular comment | |
| """ | |
| for s in strings: | |
| if not s.string: | |
| continue | |
| logger.info("decoded stack/tight string in function 0x%x (pc: 0x%x): %s", s.function, s.program_counter, s.string) | |
| if lvar_cmt: | |
| try: | |
| # TODO this often fails due to wrong frame offset | |
| append_lvar_comment(s.function, s.frame_offset, s.string) | |
| list_widget.add_item(s.string, s.program_counter) | |
| except RuntimeError as e: | |
| pass | |
| # logger.warning("failed to apply stack/tight string: %s", str(e)) | |
| if cmt: | |
| append_comment(s.program_counter, s.string) | |
| list_widget.add_item(s.string, s.program_counter) | |
| def ignore_floss_logs(): | |
| logging.getLogger("floss.api_hooks").setLevel(logging.WARNING) | |
| logging.getLogger("floss.function_argument_getter").setLevel(logging.WARNING) | |
| logging.getLogger("viv_utils").setLevel(logging.CRITICAL) | |
| logging.getLogger("viv_utils.emulator_drivers").setLevel(logging.ERROR) | |
| floss.utils.set_vivisect_log_level(logging.CRITICAL) | |
| def main(argv=None): | |
| logging.basicConfig(level=logging.INFO) | |
| logging.getLogger().setLevel(logging.INFO) | |
| ignore_floss_logs() | |
| def show_list_widget(): | |
| list_widget.Show("FLOSS") | |
| ida_kernwin.execute_sync(show_list_widget, ida_kernwin.MFF_WRITE) | |
| @ida_wrapper_read | |
| def get_idb_path(): | |
| return idc.get_idb_path() | |
| idb_path = get_idb_path() | |
| fpath, _ = os.path.splitext(idb_path) | |
| viv_path = fpath + ".viv" | |
| if os.path.exists(viv_path): | |
| # logger.info("loading vivisect workspace from %r", viv_path) | |
| vw = viv_utils.getWorkspace(viv_path) | |
| else: | |
| # logger.info("loading vivisect workspace from IDB...") | |
| @ida_wrapper_read | |
| def loadWorkspaceFromIdb(): | |
| return viv_utils.loadWorkspaceFromIdb() | |
| vw = loadWorkspaceFromIdb() | |
| # logger.info("loaded vivisect workspace") | |
| selected_functions = set(vw.getFunctions()) | |
| time0 = time.time() | |
| # logger.info("identifying decoding functions...") | |
| decoding_function_features, library_functions = floss.identify.find_decoding_function_features( | |
| vw, selected_functions, disable_progress=True | |
| ) | |
| # logger.info("extracting stackstrings...") | |
| selected_functions = floss.identify.get_functions_without_tightloops(decoding_function_features) | |
| stack_strings = floss.stackstrings.extract_stackstrings( | |
| vw, selected_functions, MIN_LENGTH, verbosity=floss.render.Verbosity.VERBOSE, disable_progress=True | |
| ) | |
| logger.info("decoded %d stack strings", len(stack_strings)) | |
| apply_stack_strings(stack_strings) | |
| # logger.info("extracting tightstrings...") | |
| tightloop_functions = floss.identify.get_functions_with_tightloops(decoding_function_features) | |
| tight_strings = floss.tightstrings.extract_tightstrings( | |
| vw, | |
| tightloop_functions, | |
| min_length=MIN_LENGTH, | |
| verbosity=floss.render.Verbosity.VERBOSE, | |
| disable_progress=True, | |
| ) | |
| logger.info("decoded %d tight strings", len(tight_strings)) | |
| apply_stack_strings(tight_strings) | |
| # logger.info("decoding strings...") | |
| top_functions = floss.identify.get_top_functions(decoding_function_features, 20) | |
| fvas_to_emulate = floss.identify.get_function_fvas(top_functions) | |
| fvas_tight_functions = floss.identify.get_tight_function_fvas(decoding_function_features) | |
| fvas_to_emulate = floss.identify.append_unique(fvas_to_emulate, fvas_tight_functions) | |
| decoded_strings = floss.string_decoder.decode_strings( | |
| vw, | |
| fvas_to_emulate, | |
| MIN_LENGTH, | |
| verbosity=floss.render.Verbosity.VERBOSE, | |
| disable_progress=True, | |
| ) | |
| logger.info("decoded %d strings", len(decoded_strings)) | |
| apply_decoded_strings(decoded_strings) | |
| time1 = time.time() | |
| logger.info("finished execution after %f seconds", (time1 - time0)) | |
| return 0 | |
| # ----------------------------------------------------------------------- | |
| class FlossWidget(ida_kernwin.PluginForm): | |
| def OnCreate(self, form): | |
| self.parent = self.FormToPyQtWidget(form) | |
| self.layout = QVBoxLayout(self.parent) | |
| self.list_row = 0 | |
| self.table_widget = QTableWidget() | |
| self.layout.addWidget(self.table_widget) | |
| self.table_widget.setSelectionBehavior(QAbstractItemView.SelectRows) | |
| self.table_widget.setEditTriggers(QAbstractItemView.NoEditTriggers) | |
| self.table_widget.setSortingEnabled(True) | |
| self.table_widget.verticalHeader().setVisible(True) | |
| self.table_widget.resizeColumnsToContents() | |
| self.table_widget.setColumnCount(2) | |
| self.table_widget.setHorizontalHeaderLabels(["Name", "Address"]) | |
| self.table_widget.horizontalHeader().setStretchLastSection(True) | |
| self.table_widget.itemDoubleClicked.connect(self.on_item_double_clicked) | |
| def add_item(self, string, address): | |
| print(f"Adding string: {string}, address: {hex(address)}") # Debugging line | |
| self.table_widget.setRowCount(self.list_row + 1) | |
| name_item = QTableWidgetItem(string) | |
| address_item = QTableWidgetItem(hex(address)) | |
| address_item.setData(32, address) | |
| self.table_widget.setItem(self.list_row, 0, name_item) | |
| self.table_widget.setItem(self.list_row, 1, address_item) | |
| self.list_row += 1 | |
| print(f"Added at row: {self.list_row - 1}") # Debugging line | |
| def on_item_double_clicked(self, item): | |
| address = item.data(32) | |
| def jump_to_address(address): | |
| ida_kernwin.jumpto(address) | |
| ida_kernwin.execute_sync(lambda: jump_to_address(address), ida_kernwin.MFF_WRITE) | |
| def OnClose(self, form): | |
| pass | |
| def Show(self, caption): | |
| return ida_kernwin.PluginForm.Show(self, caption, options=(ida_kernwin.PluginForm.WOPN_PERSIST | ida_kernwin.PluginForm.WCLS_SAVE | ida_kernwin.PluginForm.WOPN_RESTORE)) | |
| list_widget = FlossWidget() | |
| # ----------------------------------------------------------------------- | |
| class floss_plugin_t(idaapi.plugin_t): | |
| comment = "" | |
| help = "FLOSS" | |
| flags = idaapi.PLUGIN_FIX | |
| wanted_name = "FLOSS" | |
| wanted_hotkey = "Ctrl-Alt-S" | |
| def init(self): | |
| floss_log("init plugin") | |
| return idaapi.PLUGIN_KEEP | |
| def runThread(self): | |
| # idaapi.execute_sync(main, 1) | |
| main() | |
| def run(self, arg=0): | |
| floss_log("run plugin") | |
| self.thread = threading.Thread(target=self.runThread) | |
| self.thread.start() | |
| def term(self): | |
| floss_log("terminate plugin") | |
| # Check if auto-analysis is complete. | |
| def run_plugin_on_timer(self): | |
| logger.info("RUN FUNCTION %s", "idaapi.refresh_idaview_anyway") | |
| if idaapi.refresh_idaview_anyway(): | |
| logger.info("RUN FUNCTION %s", "idaapi.auto_is_ok") | |
| if idaapi.auto_is_ok(): | |
| logger.info("RUN FUNCTION %s", "plugin.run") | |
| run() | |
| return -1 # Stop the timer. | |
| return 1000 # Check again in 1 second. | |
| plugin = floss_plugin_t() | |
| def PLUGIN_ENTRY(): | |
| return plugin | |
| def register_timer_on_init(): | |
| logger.info("RUN FUNCTION %s", "register_timer_on_idle") | |
| idaapi.register_timer(1000, plugin.run_plugin_on_timer) | |
| ## All not working because they were implemented starting from IDA 7.8 | |
| # idaapi.add_startup_notification(register_timer_on_startup) | |
| # idaapi.set_idc_func_ex("idle", register_timer_on_idle, None) | |
| # idaapi.add_hotkey("F3", register_timer_on_init) | |
| # register_timer_on_init() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment