Skip to content

Instantly share code, notes, and snippets.

@stevenklar
Created March 22, 2023 06:56
Show Gist options
  • Select an option

  • Save stevenklar/89c290f61ce66bd06c0cf124be9f125f to your computer and use it in GitHub Desktop.

Select an option

Save stevenklar/89c290f61ce66bd06c0cf124be9f125f to your computer and use it in GitHub Desktop.
# Copyright (C) 2017 Mandiant, Inc. All Rights Reserved.
#!/usr/bin/env python3
"""
Run FLOSS to automatically extract obfuscated strings and apply them to the
currently loaded module in IDA Pro.
author: Willi Ballenthin
email: [email protected]
"""
import os
import time
import logging
from typing import List, Union
import threading
import idaapi
import idc
from idautils import *
import viv_utils
import functools
import floss
import floss.main
import floss.utils
import floss.render
import floss.identify
import floss.stackstrings
import floss.tightstrings
import floss.string_decoder
from floss.results import AddressType, StackString, TightString, DecodedString
# Imports for qt widget
import ida_kernwin
import ida_idaapi
# from PyQt5.QtWidgets import QWidget, QVBoxLayout, QListWidget, QListWidgetItem
from PyQt5.QtWidgets import QWidget, QVBoxLayout, QTableWidget, QTableWidgetItem, QAbstractItemView
logger = logging.getLogger("floss.idaplugin")
MIN_LENGTH = 4
# edit to get logs :)
def floss_log(entry, name="FLOSS"):
pass
# idaapi.msg("[" + name + "]: " + entry + "\n")
mutex = threading.Condition()
def ida_wrapper(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
rv = []
error = []
def work():
try:
result = f(*args, **kwargs)
rv.append(result)
except Exception as e:
error.append(e)
with mutex:
idaapi.execute_sync(work, idaapi.MFF_WRITE)
if error:
msg = 'Failed on calling {}.{} with args: {}, kwargs: {}\nException: {}' \
.format(f.__module__, f.__name__, args, kwargs, str(error[0]))
print('[!!!] ERROR:', msg)
raise error[0]
floss_log("[DEBUG][ida_wrapper] %s" % rv[0])
return rv[0]
return wrapper
def ida_wrapper_read(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
rv = []
error = []
def work():
try:
result = f(*args, **kwargs)
rv.append(result)
except Exception as e:
error.append(e)
with mutex:
idaapi.execute_sync(work, idaapi.MFF_READ)
if error:
msg = 'Failed on calling {}.{} with args: {}, kwargs: {}\nException: {}' \
.format(f.__module__, f.__name__, args, kwargs, str(error[0]))
print('[!!!] ERROR:', msg)
raise error[0]
floss_log("[DEBUG][ida_wrapper_read] %s" % rv[0])
return rv[0]
return wrapper
@ida_wrapper
def append_comment(ea: int, s: str, repeatable: bool = False) -> None:
"""
add the given string as a (possibly repeating) comment to the given address.
does not add the comment if it already exists.
adds the comment on its own line.
Args:
ea: the address at which to add the comment.
s: the comment text.
repeatable: if True, set a repeatable comment.
"""
# see: http://blogs.norman.com/2011/security-research/improving-ida-analysis-of-x64-exception-handling
if repeatable:
cmt = idc.get_cmt(ea, True)
else:
cmt = idc.get_cmt(ea, False)
if not cmt:
cmt = s # no existing comment
else:
if s in cmt: # ignore duplicates
return
cmt = cmt + "\r\n" + s
if repeatable:
idc.set_cmt(ea, cmt, True)
else:
idc.set_cmt(ea, cmt, False)
def append_lvar_comment(fva: int, frame_offset: int, s: str, repeatable: bool = False) -> None:
"""
add the given string as a (possibly repeatable) stack variable comment to the given function.
does not add the comment if it already exists.
adds the comment on its own line.
Args:
fva: the address of the function with the stack variable.
frame_offset: the offset into the stack frame at which the variable is found.
s: the comment text.
repeatable: if True, set a repeatable comment.
"""
@ida_wrapper_read
def get_func_attr(fva, funcattr):
idc.get_func_attr(fva, funcattr)
stack = get_func_attr(fva, idc.FUNCATTR_FRAME)
if not stack:
raise RuntimeError("failed to find stack frame for function: 0x%x" % fva)
lvar_offset = (
get_func_attr(fva, idc.FUNCATTR_FRSIZE) - frame_offset
) # alternative: idc.get_frame_lvar_size(fva) - frame_offset
if not lvar_offset:
raise RuntimeError("failed to compute local variable offset: 0x%x 0x%x %s" % (fva, stack, s))
if lvar_offset <= 0:
raise RuntimeError("failed to compute positive local variable offset: 0x%x 0x%x %s" % (fva, stack, s))
@ida_wrapper_read
def get_member_cmt(stack, lvar_offset, repeatable):
idc.get_member_cmt(stack, lvar_offset, repeatable)
string = get_member_cmt(stack, lvar_offset, repeatable)
if not string:
string = s
else:
if s in string: # ignore duplicates
return
string = string + "\r\n" + s
@ida_wrapper
def set_member_cmt(stack, lvar_offset, string, repeatable):
idc.set_member_cmt(stack, lvar_offset, string, repeatable)
if not set_member_cmt(stack, lvar_offset, string, repeatable):
raise RuntimeError("failed to set comment: 0x%08x 0x%08x 0x%08x: %s" % (fva, stack, lvar_offset, s))
def apply_decoded_strings(decoded_strings: List[DecodedString]) -> None:
for ds in decoded_strings:
if not ds.string:
continue
def add_to_list_widget(string, address):
list_widget.add_item(string, address)
if ds.address_type == AddressType.GLOBAL:
logger.info("decoded string at global address 0x%x: %s", ds.address, ds.string)
append_comment(ds.address, ds.string)
ida_kernwin.execute_sync(lambda: add_to_list_widget(ds.string, ds.address), ida_kernwin.MFF_WRITE)
else:
logger.info("decoded string for function call at 0x%x: %s", ds.decoded_at, ds.string)
append_comment(ds.decoded_at, ds.string)
ida_kernwin.execute_sync(lambda: add_to_list_widget(ds.string, ds.decoded_at), ida_kernwin.MFF_WRITE)
def apply_stack_strings(
strings: List[Union[StackString, TightString]], lvar_cmt: bool = True, cmt: bool = True
) -> None:
"""
lvar_cmt: apply stack variable comment
cmt: apply regular comment
"""
for s in strings:
if not s.string:
continue
logger.info("decoded stack/tight string in function 0x%x (pc: 0x%x): %s", s.function, s.program_counter, s.string)
if lvar_cmt:
try:
# TODO this often fails due to wrong frame offset
append_lvar_comment(s.function, s.frame_offset, s.string)
list_widget.add_item(s.string, s.program_counter)
except RuntimeError as e:
pass
# logger.warning("failed to apply stack/tight string: %s", str(e))
if cmt:
append_comment(s.program_counter, s.string)
list_widget.add_item(s.string, s.program_counter)
def ignore_floss_logs():
logging.getLogger("floss.api_hooks").setLevel(logging.WARNING)
logging.getLogger("floss.function_argument_getter").setLevel(logging.WARNING)
logging.getLogger("viv_utils").setLevel(logging.CRITICAL)
logging.getLogger("viv_utils.emulator_drivers").setLevel(logging.ERROR)
floss.utils.set_vivisect_log_level(logging.CRITICAL)
def main(argv=None):
logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
ignore_floss_logs()
def show_list_widget():
list_widget.Show("FLOSS")
ida_kernwin.execute_sync(show_list_widget, ida_kernwin.MFF_WRITE)
@ida_wrapper_read
def get_idb_path():
return idc.get_idb_path()
idb_path = get_idb_path()
fpath, _ = os.path.splitext(idb_path)
viv_path = fpath + ".viv"
if os.path.exists(viv_path):
# logger.info("loading vivisect workspace from %r", viv_path)
vw = viv_utils.getWorkspace(viv_path)
else:
# logger.info("loading vivisect workspace from IDB...")
@ida_wrapper_read
def loadWorkspaceFromIdb():
return viv_utils.loadWorkspaceFromIdb()
vw = loadWorkspaceFromIdb()
# logger.info("loaded vivisect workspace")
selected_functions = set(vw.getFunctions())
time0 = time.time()
# logger.info("identifying decoding functions...")
decoding_function_features, library_functions = floss.identify.find_decoding_function_features(
vw, selected_functions, disable_progress=True
)
# logger.info("extracting stackstrings...")
selected_functions = floss.identify.get_functions_without_tightloops(decoding_function_features)
stack_strings = floss.stackstrings.extract_stackstrings(
vw, selected_functions, MIN_LENGTH, verbosity=floss.render.Verbosity.VERBOSE, disable_progress=True
)
logger.info("decoded %d stack strings", len(stack_strings))
apply_stack_strings(stack_strings)
# logger.info("extracting tightstrings...")
tightloop_functions = floss.identify.get_functions_with_tightloops(decoding_function_features)
tight_strings = floss.tightstrings.extract_tightstrings(
vw,
tightloop_functions,
min_length=MIN_LENGTH,
verbosity=floss.render.Verbosity.VERBOSE,
disable_progress=True,
)
logger.info("decoded %d tight strings", len(tight_strings))
apply_stack_strings(tight_strings)
# logger.info("decoding strings...")
top_functions = floss.identify.get_top_functions(decoding_function_features, 20)
fvas_to_emulate = floss.identify.get_function_fvas(top_functions)
fvas_tight_functions = floss.identify.get_tight_function_fvas(decoding_function_features)
fvas_to_emulate = floss.identify.append_unique(fvas_to_emulate, fvas_tight_functions)
decoded_strings = floss.string_decoder.decode_strings(
vw,
fvas_to_emulate,
MIN_LENGTH,
verbosity=floss.render.Verbosity.VERBOSE,
disable_progress=True,
)
logger.info("decoded %d strings", len(decoded_strings))
apply_decoded_strings(decoded_strings)
time1 = time.time()
logger.info("finished execution after %f seconds", (time1 - time0))
return 0
# -----------------------------------------------------------------------
class FlossWidget(ida_kernwin.PluginForm):
def OnCreate(self, form):
self.parent = self.FormToPyQtWidget(form)
self.layout = QVBoxLayout(self.parent)
self.list_row = 0
self.table_widget = QTableWidget()
self.layout.addWidget(self.table_widget)
self.table_widget.setSelectionBehavior(QAbstractItemView.SelectRows)
self.table_widget.setEditTriggers(QAbstractItemView.NoEditTriggers)
self.table_widget.setSortingEnabled(True)
self.table_widget.verticalHeader().setVisible(True)
self.table_widget.resizeColumnsToContents()
self.table_widget.setColumnCount(2)
self.table_widget.setHorizontalHeaderLabels(["Name", "Address"])
self.table_widget.horizontalHeader().setStretchLastSection(True)
self.table_widget.itemDoubleClicked.connect(self.on_item_double_clicked)
def add_item(self, string, address):
print(f"Adding string: {string}, address: {hex(address)}") # Debugging line
self.table_widget.setRowCount(self.list_row + 1)
name_item = QTableWidgetItem(string)
address_item = QTableWidgetItem(hex(address))
address_item.setData(32, address)
self.table_widget.setItem(self.list_row, 0, name_item)
self.table_widget.setItem(self.list_row, 1, address_item)
self.list_row += 1
print(f"Added at row: {self.list_row - 1}") # Debugging line
def on_item_double_clicked(self, item):
address = item.data(32)
def jump_to_address(address):
ida_kernwin.jumpto(address)
ida_kernwin.execute_sync(lambda: jump_to_address(address), ida_kernwin.MFF_WRITE)
def OnClose(self, form):
pass
def Show(self, caption):
return ida_kernwin.PluginForm.Show(self, caption, options=(ida_kernwin.PluginForm.WOPN_PERSIST | ida_kernwin.PluginForm.WCLS_SAVE | ida_kernwin.PluginForm.WOPN_RESTORE))
list_widget = FlossWidget()
# -----------------------------------------------------------------------
class floss_plugin_t(idaapi.plugin_t):
comment = ""
help = "FLOSS"
flags = idaapi.PLUGIN_FIX
wanted_name = "FLOSS"
wanted_hotkey = "Ctrl-Alt-S"
def init(self):
floss_log("init plugin")
return idaapi.PLUGIN_KEEP
def runThread(self):
# idaapi.execute_sync(main, 1)
main()
def run(self, arg=0):
floss_log("run plugin")
self.thread = threading.Thread(target=self.runThread)
self.thread.start()
def term(self):
floss_log("terminate plugin")
# Check if auto-analysis is complete.
def run_plugin_on_timer(self):
logger.info("RUN FUNCTION %s", "idaapi.refresh_idaview_anyway")
if idaapi.refresh_idaview_anyway():
logger.info("RUN FUNCTION %s", "idaapi.auto_is_ok")
if idaapi.auto_is_ok():
logger.info("RUN FUNCTION %s", "plugin.run")
run()
return -1 # Stop the timer.
return 1000 # Check again in 1 second.
plugin = floss_plugin_t()
def PLUGIN_ENTRY():
return plugin
def register_timer_on_init():
logger.info("RUN FUNCTION %s", "register_timer_on_idle")
idaapi.register_timer(1000, plugin.run_plugin_on_timer)
## All not working because they were implemented starting from IDA 7.8
# idaapi.add_startup_notification(register_timer_on_startup)
# idaapi.set_idc_func_ex("idle", register_timer_on_idle, None)
# idaapi.add_hotkey("F3", register_timer_on_init)
# register_timer_on_init()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment