Created
April 3, 2025 17:54
-
-
Save 19h/cb13985dea858b9cce99a5b3117b4f26 to your computer and use it in GitHub Desktop.
IDA Pro plugin using Google Gemini AI to suggest names, comments, var renames for functions & callees (depth 2). Shows results in PyQt5 tabbed UI for review/apply across multiple functions.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"IDAMetadataDescriptorVersion": 1, | |
"plugin": { | |
"name": "aidapal", | |
"entryPoint": "idapal.py" | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
summary: IDA Pro plugin using Google AI (Gemini) for code analysis assistance. | |
description: | |
This plugin integrates with Google's Generative AI API (Gemini) | |
to provide suggestions for function names, comments, and local | |
variable renames within IDA Pro's decompiled pseudocode view. | |
It adds context menu items to the Pseudocode view to trigger the | |
analysis FOR THE CURRENT FUNCTION AND ITS CALLEES (up to depth 2). | |
The results are presented in a custom PyQt5 dockable widget with tabs | |
for each analyzed function, where the user can review and selectively | |
apply the suggestions across multiple functions. | |
Includes logic to decompile and include called functions (up to depth 2) | |
in the context provided to the AI. Decompilation and other IDA API calls | |
are performed synchronously in the main IDA thread. | |
Requires: | |
- IDA Pro 7.6+ (with Python 3 and PyQt5 support) | |
- Hex-Rays Decompiler | |
- google-generativeai library (`pip install google-generativeai`) | |
- pydantic library (`pip install pydantic`) | |
- A Google AI API Key set in the environment variable `GOOGLE_AI_API_KEY`. | |
DEBUG VERSION: Includes extensive logging and multi-function support. | |
""" | |
# --- Imports --- | |
import ida_kernwin | |
import ida_hexrays | |
import ida_funcs | |
import ida_name | |
import ida_bytes | |
import idaapi | |
import idautils | |
import idc | |
import ida_xref | |
import ida_nalt # Although get_cmt moved, keep for potential future use | |
import ida_ua # Added for instruction analysis in get_callee_decompilation | |
import ida_idp | |
import threading | |
import json | |
import textwrap | |
import os | |
import sys | |
from functools import partial | |
import traceback # For detailed error logging | |
# Third-party libraries | |
try: | |
from PyQt5 import QtCore, QtGui, QtWidgets | |
from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QScrollArea, | |
QLabel, QVBoxLayout, QHBoxLayout, QGridLayout, | |
QGroupBox, QCheckBox, QPushButton, QFrame, | |
QTabWidget) # Added QTabWidget | |
from PyQt5.QtGui import QPalette, QColor | |
print("aiDAPal DEBUG: PyQt5 imported successfully.") | |
except ImportError: | |
print("aiDAPal Error: PyQt5 not found. Please ensure it's installed in IDA's Python environment.") | |
# Optionally, prevent the plugin from loading if PyQt5 is missing | |
# raise ImportError("PyQt5 is required for the aiDAPal UI.") | |
try: | |
from pydantic import BaseModel, Field | |
print("aiDAPal DEBUG: pydantic imported successfully.") | |
except ImportError: | |
print("aiDAPal Error: pydantic not found. Please install it: pip install pydantic") | |
# raise ImportError("pydantic is required for aiDAPal.") | |
try: | |
import google.generativeai as genai | |
print("aiDAPal DEBUG: google-generativeai imported successfully.") | |
except ImportError: | |
print("aiDAPal Error: google-generativeai not found. Please install it: pip install google-generativeai") | |
# raise ImportError("google-generativeai is required for aiDAPal.") | |
# --- Configuration --- | |
# Google AI Configuration | |
# Read API Key from environment variable for security | |
#GOOGLE_AI_API_KEY = os.environ.get("GOOGLE_AI_API_KEY") | |
GOOGLE_AI_API_KEY = "Axxxxxxxkxxx_xxxxxxxxx" # Replace with your actual key or use env var | |
# Default model to use if multiple are available or for the primary action | |
# See https://ai.google.dev/models/gemini for available models | |
DEFAULT_GEMINI_MODEL = "gemini-2.5-pro-exp-03-25" # Using a common, potentially faster model for testing | |
# List of Google AI models to offer as actions in the context menu. | |
# You can add more models here if desired, e.g., "gemini-1.5-pro-latest" | |
# Each model name will create a separate context menu entry. | |
# Ensure the models listed are compatible with the API usage below. | |
MODELS_TO_REGISTER = [DEFAULT_GEMINI_MODEL] # Keep it simple for debugging | |
# Safety settings for Google AI (adjust thresholds as needed) | |
# Options: BLOCK_NONE, BLOCK_ONLY_HIGH, BLOCK_MEDIUM_AND_ABOVE, BLOCK_LOW_AND_ABOVE | |
DEFAULT_SAFETY_SETTINGS = [ | |
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"}, | |
{"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"}, | |
{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"}, | |
{"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"}, | |
] | |
# Timeout for API requests in seconds (Note: Timeout for generate_content might not be directly supported yet) | |
API_TIMEOUT = 180 | |
# --- Concurrency Control --- | |
g_analysis_in_progress = set() # Set of primary func_ea currently being analyzed | |
g_analysis_lock = threading.Lock() # To protect the set | |
print("aiDAPal DEBUG: Concurrency control variables initialized.") | |
# --- Pydantic Models for Conceptual Structure --- | |
# Although we use an explicit dict schema for the API call, | |
# these models help document the expected structure. | |
class VariableRename(BaseModel): | |
"""Represents a suggested rename for a single variable.""" | |
original_name: str = Field(description="The original name of the local variable or argument as seen in the pseudocode.") | |
new_name: str = Field(description="The suggested new, more descriptive name for the variable.") | |
rename_reason: str | None = Field(None, description="The reason for the suggested rename.") | |
rename_reason_findings: str | None = Field(None, description="Any specific findings that led to the suggested rename.") | |
class SingleFunctionAnalysis(BaseModel): | |
"""Defines the expected structure for the analysis of ONE function.""" | |
# function_ea: int = Field(description="The entry address (effective address) of the function being analyzed.") # REMOVED | |
original_function_name: str = Field(description="The original name of the function (including sub_...) as provided in the context header.") # ADDED | |
function_name: str = Field(description="A concise and descriptive suggested name for the function, suitable for IDA Pro naming conventions.") | |
comment: str = Field(description="A multi-line comment explaining the function's purpose, parameters (if discernible), return value (if discernible), and overall logic. Formatted for C-style block comments without the enclosing /* */.") | |
variables: list[VariableRename] = Field(description="A list of suggested renames for local variables and function arguments within this specific function.") | |
observations: list[dict] | None = Field(None, description="Observations about this function.") | |
function_name_reason: str | None = Field(None, description="Reason for the suggested function name.") | |
function_name_reason_findings: str | None = Field(None, description="Findings for the suggested function name.") | |
comment_reason: str | None = Field(None, description="Reason for the suggested comment.") | |
comment_reason_findings: str | None = Field(None, description="Findings for the suggested comment.") | |
class MultiFunctionAnalysisResult(BaseModel): | |
"""Defines the expected structure of the JSON analysis result from the AI for multiple functions.""" | |
function_analyses: list[SingleFunctionAnalysis] = Field(description="A list containing the analysis results for each function provided in the context.") | |
# --- Explicit Schema Definition for Google AI API (MULTI-FUNCTION) --- | |
explicit_multi_function_analysis_schema = { | |
"type": "object", | |
"properties": { | |
"function_analyses": { | |
"type": "array", | |
"description": "A list containing the analysis results for each function provided in the context.", | |
"items": { | |
"type": "object", | |
"properties": { | |
# "function_ea": { # REMOVED | |
# "type": "integer", | |
# "description": "The entry address (effective address) of the function being analyzed." | |
# }, | |
"original_function_name": { # ADDED | |
"type": "string", | |
"description": "The original name of the function (e.g., 'sub_140001000' or 'MyFunction') exactly as it appears in the '// --- Function: NAME (EA) ---' header provided in the context." | |
}, | |
"observations": { # NEW/RESTORED | |
"type": "array", | |
"description": "Any observations about this function that are important to consider when trying to interpret its purpose, parameters, return value, and overall logic.", | |
"items": { | |
"type": "object", | |
"properties": { | |
"observation": { | |
"type": "string", | |
"description": "A single observation about this function." | |
}, | |
"observation_impact": { | |
"type": "string", | |
"description": "The impact of the observation on this function's purpose, parameters, return value, and overall logic." | |
} | |
}, | |
"required": ["observation", "observation_impact"] | |
} | |
}, | |
"comment_reason": { # NEW/RESTORED | |
"type": "string", | |
"description": "The reason for the suggested comment for this function. Reason must precede the comment." | |
}, | |
"comment_reason_findings": { # NEW/RESTORED | |
"type": "string", | |
"description": "Any specific findings that led to the suggested comment for this function. Reason must precede the comment." | |
}, | |
"function_name_reason": { # NEW/RESTORED | |
"type": "string", | |
"description": "The reason for the suggested function name for this function. Reason must precede the function name." | |
}, | |
"function_name_reason_findings": { # NEW/RESTORED | |
"type": "string", | |
"description": "Any specific findings that led to the suggested function name for this function. Reason must precede the function name." | |
}, | |
"variables": { | |
"type": "array", | |
"description": "A list of suggested renames for local variables and function arguments *within this specific function only*. If unsure, leave the variable out or suggest the original name. Reason must precede the new name. Variables must precede new function name and comment.", | |
"items": { | |
"type": "object", | |
"properties": { | |
"rename_reason": { # NEW/RESTORED | |
"type": "string", | |
"description": "The reason for the suggested rename." | |
}, | |
"rename_reason_findings": { # NEW/RESTORED | |
"type": "string", | |
"description": "Any specific findings that led to the suggested rename." | |
}, | |
"original_name": { | |
"type": "string", | |
"description": "The original name of the local variable or argument as seen in this function's pseudocode." | |
}, | |
"new_name": { | |
"type": "string", | |
"description": "The suggested new, more descriptive name for the variable." | |
} | |
}, | |
"required": ["rename_reason", "rename_reason_findings", "original_name", "new_name"] # UPDATED required | |
} | |
}, | |
"comment": { | |
"type": "string", | |
"description": "A multi-line comment explaining this function's purpose, parameters, return value, and logic. Formatted for C-style block comments without the enclosing /* */. Use plain text only." | |
}, | |
"function_name": { | |
"type": "string", | |
"description": "A concise and descriptive suggested name for this function, suitable for IDA Pro naming conventions." | |
}, | |
}, | |
# UPDATED required list for each function analysis item | |
"required": [ | |
# "function_ea", # REMOVED | |
"original_function_name", # ADDED | |
"observations", | |
"function_name_reason", | |
"function_name_reason_findings", | |
"function_name", | |
"comment_reason", | |
"comment_reason_findings", | |
"comment", | |
"variables" | |
] | |
} | |
} | |
}, | |
# Require the top-level list | |
"required": ["function_analyses"] | |
} | |
print("aiDAPal DEBUG: Explicit MULTI-FUNCTION JSON schema defined (using original_function_name).") | |
# --- UI Widget Classes (Mostly Unchanged, used within tabs) --- | |
class FunctionNameWidget(QWidget): | |
"""Widget to display and accept/reject the suggested function name.""" | |
accepted = True # Default state | |
def __init__(self, function_name): | |
super(FunctionNameWidget, self).__init__() | |
# print(f"aiDAPal DEBUG: [FunctionNameWidget.__init__] Initializing with name: '{function_name}'") # Less noisy | |
layout = QVBoxLayout() | |
layout.setAlignment(QtCore.Qt.AlignLeft | QtCore.Qt.AlignVCenter) | |
group_box = QGroupBox("Suggested Function Name") | |
group_layout = QHBoxLayout() | |
group_layout.setAlignment(QtCore.Qt.AlignLeft | QtCore.Qt.AlignVCenter) | |
group_layout.setSpacing(10) | |
self.checkbox = QCheckBox() | |
self.checkbox.setCheckState(QtCore.Qt.Checked) | |
self.checkbox.stateChanged.connect(self.accepted_state_change) | |
self.name_label = QLabel(function_name) | |
self.name_label.setWordWrap(True) # Allow wrapping if name is long | |
group_layout.addWidget(self.checkbox) | |
group_layout.addWidget(self.name_label) | |
group_box.setLayout(group_layout) | |
layout.addWidget(group_box) | |
self.setLayout(layout) | |
# print("aiDAPal DEBUG: [FunctionNameWidget.__init__] Initialization complete.") | |
def accepted_state_change(self, state): | |
self.accepted = (state == QtCore.Qt.Checked) | |
# print(f'aiDAPal DEBUG: [FunctionNameWidget.accepted_state_change] Function Name Accepted state changed to: {self.accepted}') | |
class CommentWidget(QWidget): | |
"""Widget to display and accept/reject the suggested comment.""" | |
accepted = True # Default state | |
def __init__(self, comment): | |
super(CommentWidget, self).__init__() | |
# print(f"aiDAPal DEBUG: [CommentWidget.__init__] Initializing with comment (length: {len(comment)}).") | |
layout = QVBoxLayout() | |
layout.setAlignment(QtCore.Qt.AlignLeft | QtCore.Qt.AlignVCenter) | |
group_box = QGroupBox("Suggested Comment") | |
group_layout = QHBoxLayout() | |
# Align checkbox to top-left of the comment area | |
group_layout.setAlignment(QtCore.Qt.AlignLeft | QtCore.Qt.AlignTop) | |
group_layout.setSpacing(10) | |
self.checkbox = QCheckBox() | |
self.checkbox.setCheckState(QtCore.Qt.Checked) | |
self.checkbox.stateChanged.connect(self.accepted_state_change) | |
self.comment_area = QLabel(comment) | |
self.comment_area.setWordWrap(True) | |
# Set a reasonable minimum width, height will adjust | |
self.comment_area.setMinimumWidth(400) | |
self.comment_area.setTextInteractionFlags(QtCore.Qt.TextSelectableByMouse) # Allow text selection | |
group_layout.addWidget(self.checkbox) | |
group_layout.addWidget(self.comment_area) | |
group_box.setLayout(group_layout) | |
layout.addWidget(group_box) | |
self.setLayout(layout) | |
# print("aiDAPal DEBUG: [CommentWidget.__init__] Initialization complete.") | |
def accepted_state_change(self, state): | |
self.accepted = (state == QtCore.Qt.Checked) | |
# print(f'aiDAPal DEBUG: [CommentWidget.accepted_state_change] Comment Accepted state changed to: {self.accepted}') | |
class VariableWidget(QWidget): | |
"""Widget to display and accept/reject suggested variable renames in a scrollable grid.""" | |
def __init__(self, variables): | |
super(VariableWidget, self).__init__() | |
# print(f"aiDAPal DEBUG: [VariableWidget.__init__] Initializing with {len(variables)} variable suggestions.") | |
main_layout = QVBoxLayout() | |
main_layout.setAlignment(QtCore.Qt.AlignTop | QtCore.Qt.AlignLeft) | |
group_box = QGroupBox("Suggested Variable Renames") | |
group_layout = QGridLayout() | |
group_layout.setAlignment(QtCore.Qt.AlignTop | QtCore.Qt.AlignLeft) | |
group_layout.setSpacing(10) | |
group_layout.setColumnStretch(1, 1) # Allow labels to expand | |
group_layout.setColumnStretch(3, 1) # Allow labels to expand | |
self.checkboxes = [] | |
self.variable_data = variables # Store the original data | |
columns = 2 # Adjust number of columns for variable display | |
# print(f"aiDAPal DEBUG: [VariableWidget.__init__] Creating grid with {columns} columns.") | |
for i, var_data in enumerate(variables): | |
row = i // columns | |
col_base = (i % columns) * 4 # 4 widgets per entry: checkbox, old, arrow, new | |
original_name = var_data.get('original_name', 'N/A') | |
new_name = var_data.get('new_name', 'N/A') | |
checkbox = QCheckBox() | |
checkbox.setCheckState(QtCore.Qt.Checked) | |
checkbox.stateChanged.connect(lambda state, index=i: self.accepted_state_change(state, index)) | |
self.checkboxes.append(checkbox) | |
lbl_original = QLabel(original_name) | |
lbl_arrow = QLabel("→") # Use arrow symbol | |
lbl_new = QLabel(new_name) | |
lbl_original.setTextInteractionFlags(QtCore.Qt.TextSelectableByMouse) | |
lbl_new.setTextInteractionFlags(QtCore.Qt.TextSelectableByMouse) | |
# Add widgets to the grid layout | |
group_layout.addWidget(checkbox, row, col_base + 0) | |
group_layout.addWidget(lbl_original, row, col_base + 1) | |
group_layout.addWidget(lbl_arrow, row, col_base + 2, alignment=QtCore.Qt.AlignCenter) | |
group_layout.addWidget(lbl_new, row, col_base + 3) | |
group_box.setLayout(group_layout) | |
# Create ScrollArea | |
scroll_area = QScrollArea() | |
scroll_area.setWidgetResizable(True) | |
scroll_area.setVerticalScrollBarPolicy(QtCore.Qt.ScrollBarAsNeeded) # Show scrollbar only when needed | |
scroll_area.setHorizontalScrollBarPolicy(QtCore.Qt.ScrollBarAsNeeded) | |
scroll_area.setWidget(group_box) | |
# Set a minimum height for the scroll area to prevent it collapsing | |
scroll_area.setMinimumHeight(100) # Reduced min height slightly | |
# print("aiDAPal DEBUG: [VariableWidget.__init__] ScrollArea created.") | |
main_layout.addWidget(scroll_area) | |
self.setLayout(main_layout) | |
# print("aiDAPal DEBUG: [VariableWidget.__init__] Initialization complete.") | |
def accepted_state_change(self, state, index): | |
# This callback currently doesn't modify overall state, | |
# relies on get_selected_variables() for final check. | |
# print(f'aiDAPal DEBUG: [VariableWidget.accepted_state_change] Variable {index} Accepted State changed to: {state == QtCore.Qt.Checked}') | |
pass | |
def get_selected_variables(self): | |
"""Returns a list of variable data dicts for the checked items.""" | |
selected = [] | |
for i, checkbox in enumerate(self.checkboxes): | |
if checkbox.isChecked(): | |
selected.append(self.variable_data[i]) | |
# print(f"aiDAPal DEBUG: [VariableWidget.get_selected_variables] Returning {len(selected)} selected variables.") | |
return selected | |
# --- Main UI Form Class (Modified for Tabs) --- | |
class aiDAPalUIForm(ida_kernwin.PluginForm): | |
"""The main dockable widget form for displaying AI suggestions using tabs for multiple functions.""" | |
def __init__(self, analysis_results_list, primary_func_ea): | |
super(aiDAPalUIForm, self).__init__() | |
primary_func_ea_str = f"0x{primary_func_ea:X}" if primary_func_ea else "None" | |
print(f"aiDAPal DEBUG: [aiDAPalUIForm.__init__] Initializing multi-function form for analysis triggered by {primary_func_ea_str}.") | |
self.analysis_results = analysis_results_list if analysis_results_list else [] | |
self.primary_func_ea = primary_func_ea # Store the EA of the function that triggered the analysis | |
# Store references to widgets, keyed by function EA | |
self.widgets_by_ea = {} # { func_ea: {'name': FunctionNameWidget, 'comment': CommentWidget, 'vars': VariableWidget}, ... } | |
self.parent_widget = None # Store the PyQt widget reference | |
print(f"aiDAPal DEBUG: [aiDAPalUIForm.__init__] Received {len(self.analysis_results)} function analysis results.") | |
print(f"aiDAPal DEBUG: [aiDAPalUIForm.__init__] Initialization complete for {primary_func_ea_str}.") | |
def OnCreate(self, form): | |
"""Called when the IDA form is created.""" | |
print("aiDAPal DEBUG: [aiDAPalUIForm.OnCreate] Form creation started.") | |
try: | |
self.parent_widget = self.FormToPyQtWidget(form) | |
print("aiDAPal DEBUG: [aiDAPalUIForm.OnCreate] Parent widget obtained.") | |
self.PopulateForm() | |
print("aiDAPal DEBUG: [aiDAPalUIForm.OnCreate] Form population finished.") | |
except Exception as e: | |
print(f"aiDAPal Error: [aiDAPalUIForm.OnCreate] Exception: {e}") | |
traceback.print_exc() | |
def PopulateForm(self): | |
"""Creates and arranges the PyQt widgets within the form using tabs.""" | |
print("aiDAPal DEBUG: [aiDAPalUIForm.PopulateForm] Starting form population with tabs.") | |
if not self.parent_widget: | |
print("aiDAPal Error: [aiDAPalUIForm.PopulateForm] Parent widget not available.") | |
return | |
# Main layout for the entire form | |
main_layout = QVBoxLayout() | |
main_layout.setContentsMargins(0, 0, 0, 0) # Use full space | |
# Create Tab Widget | |
tab_widget = QTabWidget() | |
self.widgets_by_ea = {} # Reset widget storage | |
print(f"aiDAPal DEBUG: [aiDAPalUIForm.PopulateForm] Processing {len(self.analysis_results)} results to create tabs.") | |
if not self.analysis_results: | |
main_layout.addWidget(QLabel("No analysis results received from AI.")) | |
# Sort results to potentially show primary function first (optional) | |
# sorted_results = sorted(self.analysis_results, key=lambda r: 0 if r.get('function_ea') == self.primary_func_ea else 1) | |
sorted_results = self.analysis_results # Or keep original order | |
for result_data in sorted_results: | |
# IMPORTANT: Use the function_ea added back during the mapping phase | |
func_ea = result_data.get('function_ea') | |
if func_ea is None: | |
print(f"aiDAPal Warning: [aiDAPalUIForm.PopulateForm] Skipping result item missing 'function_ea' (mapping likely failed): {result_data.get('original_function_name', 'N/A')}") | |
continue | |
func_ea_str = f"0x{func_ea:X}" | |
# Use execute_sync to safely get the function name from the main thread during UI setup | |
func_name_ida = None | |
name_container = [None] | |
def get_name_main(ea, container): | |
try: | |
container[0] = ida_funcs.get_func_name(ea) or f"sub_{ea:X}" | |
return 1 | |
except: | |
container[0] = f"sub_{ea:X}" # Fallback | |
return 0 | |
name_sync_status = ida_kernwin.execute_sync(lambda: get_name_main(func_ea, name_container), ida_kernwin.MFF_READ) | |
if name_sync_status == 1: | |
func_name_ida = name_container[0] | |
else: | |
func_name_ida = f"sub_{func_ea_str}" # Fallback if sync failed | |
print(f"aiDAPal Warning: [PopulateForm] Failed to get function name for {func_ea_str} via execute_sync.") | |
tab_title = f"{func_name_ida} ({func_ea_str})" | |
print(f"aiDAPal DEBUG: [aiDAPalUIForm.PopulateForm] Creating tab for: {tab_title}") | |
# Create a container widget for the tab content | |
tab_content_widget = QWidget() | |
tab_layout = QVBoxLayout() | |
tab_layout.setAlignment(QtCore.Qt.AlignTop | QtCore.Qt.AlignLeft) | |
# --- Function Name Section --- | |
func_name_sugg = result_data.get('function_name', 'N/A') | |
name_widget = FunctionNameWidget(func_name_sugg) | |
tab_layout.addWidget(name_widget) | |
# --- Comment Section --- | |
comment_sugg = result_data.get('comment', 'No comment suggested.') | |
comment_widget = CommentWidget(comment_sugg) | |
tab_layout.addWidget(comment_widget) | |
# --- Variables Section --- | |
variables_sugg = result_data.get('variables', []) | |
variable_widget = None # Initialize | |
if variables_sugg: | |
variable_widget = VariableWidget(variables_sugg) | |
tab_layout.addWidget(variable_widget) | |
else: | |
tab_layout.addWidget(QLabel("No variable rename suggestions for this function.")) | |
# --- (Optional UI Enhancement - Display Observations/Reasons) --- | |
# observations = result_data.get('observations', []) | |
# if observations: | |
# obs_text = "Observations:\n" + "\n".join([f"- {o.get('observation','?')} (Impact: {o.get('observation_impact','?')})" for o in observations]) | |
# obs_label = QLabel(obs_text) | |
# obs_label.setWordWrap(True) | |
# obs_label.setStyleSheet("QLabel { color : gray; margin-top: 10px; }") # Style hint | |
# tab_layout.addWidget(obs_label) | |
# # Similar labels could be added for function_name_reason, comment_reason etc. | |
tab_layout.addStretch(1) # Push content upwards | |
tab_content_widget.setLayout(tab_layout) | |
# Add tab to the tab widget | |
tab_widget.addTab(tab_content_widget, tab_title) | |
# Store widget references for this function | |
self.widgets_by_ea[func_ea] = { | |
'name': name_widget, | |
'comment': comment_widget, | |
'vars': variable_widget, # Will be None if no variables suggested | |
'data': result_data # Store original data for easy access | |
} | |
print(f" aiDAPal DEBUG: [PopulateForm] Widgets stored for {func_ea_str}") | |
# Add TabWidget to the main layout | |
main_layout.addWidget(tab_widget) | |
# --- Buttons --- | |
print("aiDAPal DEBUG: [aiDAPalUIForm.PopulateForm] Creating buttons.") | |
accept_button = QPushButton("Apply Selected") | |
cancel_button = QPushButton("Close") | |
# Button layout | |
button_layout = QHBoxLayout() | |
button_layout.addStretch() # Push buttons to the right | |
button_layout.addWidget(accept_button) | |
button_layout.addWidget(cancel_button) | |
# Add button layout to the main layout (below tabs) | |
main_layout.addLayout(button_layout) | |
# Connect buttons | |
print("aiDAPal DEBUG: [aiDAPalUIForm.PopulateForm] Connecting button signals.") | |
accept_button.clicked.connect(self.on_accept_clicked) | |
cancel_button.clicked.connect(self.on_cancel_clicked) | |
self.parent_widget.setLayout(main_layout) | |
# Set a reasonable minimum size for the dockable window | |
self.parent_widget.setMinimumSize(600, 500) # Increased size slightly for tabs | |
print("aiDAPal DEBUG: [aiDAPalUIForm.PopulateForm] Layout set, minimum size set. Population complete.") | |
def on_accept_clicked(self): | |
"""Handles the 'Apply Selected' button click, gathering selections and running updates in main thread.""" | |
print("aiDAPal DEBUG: [aiDAPalUIForm.on_accept_clicked] 'Apply Selected' clicked.") | |
changes_by_ea = {} # { func_ea: {'function_name': str|None, 'comment': str|None, 'variables': list[dict]}, ... } | |
print(f"aiDAPal DEBUG: [on_accept_clicked] Gathering selections from {len(self.widgets_by_ea)} functions...") | |
for func_ea, widgets in self.widgets_by_ea.items(): | |
func_ea_str = f"0x{func_ea:X}" | |
# print(f" aiDAPal DEBUG: [on_accept_clicked] Processing EA: {func_ea_str}") # Noisy | |
selected_changes = {} | |
original_data = widgets['data'] | |
# Check Function Name | |
if widgets['name'] and widgets['name'].accepted: | |
selected_changes['function_name'] = original_data.get('function_name') | |
else: | |
selected_changes['function_name'] = None | |
# Check Comment | |
if widgets['comment'] and widgets['comment'].accepted: | |
selected_changes['comment'] = original_data.get('comment') | |
else: | |
selected_changes['comment'] = None | |
# Check Variables | |
selected_vars = [] | |
if widgets['vars']: # Check if VariableWidget exists | |
selected_vars = widgets['vars'].get_selected_variables() | |
selected_changes['variables'] = selected_vars | |
# Only add to final dict if there's something to apply for this function | |
if selected_changes['function_name'] or selected_changes['comment'] or selected_changes['variables']: | |
changes_by_ea[func_ea] = selected_changes | |
# print(f" aiDAPal DEBUG: [on_accept_clicked] Changes queued for {func_ea_str}") # Noisy | |
# else: print(f" aiDAPal DEBUG: [on_accept_clicked] No changes selected for {func_ea_str}") # Noisy | |
if not changes_by_ea: | |
print("aiDAPal DEBUG: [aiDAPalUIForm.on_accept_clicked] No changes selected across all functions. Closing.") | |
ida_kernwin.info("aiDAPal: No changes were selected.") | |
self.Close(0) | |
return | |
print(f"aiDAPal DEBUG: [aiDAPalUIForm.on_accept_clicked] Final changes gathered for {len(changes_by_ea)} functions. Preparing main thread update.") | |
# --- Execute Update Logic in Main Thread --- | |
update_success = False | |
update_result_container = [False] # Container for success status | |
def run_update_in_main_thread(changes_dict, result_container): | |
print(f"aiDAPal DEBUG: [run_update_in_main_thread] Executing in main thread...") | |
try: | |
# Call the actual update logic function | |
result_container[0] = self._perform_ida_updates(changes_dict) | |
print(f"aiDAPal DEBUG: [run_update_in_main_thread] _perform_ida_updates returned: {result_container[0]}") | |
return 1 # Indicate success to execute_sync | |
except Exception as e_update_main: | |
print(f"aiDAPal Error: [run_update_in_main_thread] Exception during main thread update: {e_update_main}") | |
traceback.print_exc() | |
result_container[0] = False # Ensure failure state | |
return 0 # Indicate failure to execute_sync | |
# Execute synchronously | |
sync_status = ida_kernwin.execute_sync( | |
lambda: run_update_in_main_thread(changes_by_ea, update_result_container), | |
ida_kernwin.MFF_WRITE # Essential for database modifications | |
) | |
print(f"aiDAPal DEBUG: [on_accept_clicked] execute_sync for updates returned status: {sync_status}") | |
if sync_status == 1: | |
update_success = update_result_container[0] | |
print(f"aiDAPal DEBUG: [on_accept_clicked] Update success status from main thread: {update_success}") | |
if update_success: | |
ida_kernwin.info(f"aiDAPal: Applied selected changes to {len(changes_by_ea)} function(s).") | |
else: | |
ida_kernwin.warning(f"aiDAPal: Failed to apply some or all changes. Check Output window.") | |
else: | |
print(f"aiDAPal Error: [on_accept_clicked] execute_sync for updates failed or was cancelled (status: {sync_status}).") | |
ida_kernwin.warning(f"aiDAPal: Failed to execute update operation in main thread.") | |
# Close the form regardless of update success/failure after attempting | |
print("aiDAPal DEBUG: [aiDAPalUIForm.on_accept_clicked] Closing form.") | |
self.Close(0) | |
print("aiDAPal DEBUG: [aiDAPalUIForm.on_accept_clicked] Exiting.") | |
def _perform_ida_updates(self, changes_by_ea): | |
""" | |
Internal function containing the logic to apply changes to IDA. | |
MUST be called from the main IDA thread (e.g., via execute_sync). | |
""" | |
print(f"aiDAPal DEBUG: [_perform_ida_updates] Starting update process for {len(changes_by_ea)} functions (MAIN THREAD).") | |
overall_changes_made = False | |
overall_refresh_needed = False # Track if Hex-Rays view needs refresh for *any* function | |
all_updates_succeeded = True # Track if any individual update fails | |
for func_ea, changes in changes_by_ea.items(): | |
func_ea_str = f"0x{func_ea:X}" | |
print(f"--- Updating function {func_ea_str} ---") | |
func_changes_made = False | |
func_refresh_needed = False | |
if func_ea == idaapi.BADADDR: | |
print(f" aiDAPal Error: [_perform_ida_updates] Invalid function entry address {func_ea_str} in changes dict. Skipping.") | |
all_updates_succeeded = False # Mark as failed if we skip invalid data | |
continue | |
# --- Pre-Update Checks --- | |
func_t_obj = ida_funcs.get_func(func_ea) # Safe now in main thread | |
if not func_t_obj: | |
print(f" aiDAPal Error: [_perform_ida_updates] Pre-check failed: ida_funcs.get_func(0x{func_ea:X}) returned None. Cannot apply comment or reliably rename.") | |
# Don't mark all_updates_succeeded as False yet, name setting might still work | |
else: | |
print(f" aiDAPal DEBUG: [_perform_ida_updates] Pre-check OK: Found func_t object for 0x{func_ea:X}.") | |
can_decompile_for_rename = False | |
if changes.get("variables"): # Only check decompilation if variable renames are requested | |
try: | |
print(f" aiDAPal DEBUG: [_perform_ida_updates] Pre-check: Attempting ida_hexrays.decompile(0x{func_ea:X}) for rename validation...") | |
cfunc = ida_hexrays.decompile(func_ea) | |
if cfunc: | |
print(f" aiDAPal DEBUG: [_perform_ida_updates] Pre-check OK: Decompilation successful for 0x{func_ea:X}.") | |
can_decompile_for_rename = True | |
else: | |
print(f" aiDAPal Warning: [_perform_ida_updates] Pre-check failed: ida_hexrays.decompile(0x{func_ea:X}) returned None. Variable rename will likely fail.") | |
all_updates_succeeded = False # Mark failure if rename is expected but pre-check fails | |
except ida_hexrays.DecompilationFailure as e: | |
print(f" aiDAPal Warning: [_perform_ida_updates] Pre-check failed: ida_hexrays.decompile(0x{func_ea:X}) raised DecompilationFailure: {e}. Variable rename will fail.") | |
all_updates_succeeded = False # Mark failure | |
except Exception as e_decomp: | |
print(f" aiDAPal Error: [_perform_ida_updates] Pre-check failed: Unexpected error during ida_hexrays.decompile(0x{func_ea:X}): {e_decomp}") | |
all_updates_succeeded = False # Mark failure | |
# --- Apply Comment --- | |
new_cmt_text = changes.get("comment") | |
if new_cmt_text: | |
if func_t_obj: # Check if we got the object earlier | |
wrapped_cmt = '\n'.join(textwrap.wrap(new_cmt_text, width=80)) | |
print(f" aiDAPal DEBUG: [_perform_ida_updates] Attempting to set comment (length: {len(wrapped_cmt)}) for {func_ea_str}...") | |
if ida_funcs.set_func_cmt(func_t_obj, wrapped_cmt, False): # Safe now | |
print(f" aiDAPal DEBUG: [_perform_ida_updates] Successfully set function comment for {func_ea_str}.") | |
func_changes_made = True | |
else: | |
print(f" aiDAPal DEBUG: [_perform_ida_updates] Failed to set function comment for {func_ea_str} (set_func_cmt returned False).") | |
all_updates_succeeded = False # Mark failure | |
else: | |
# Already logged the error above | |
print(f" aiDAPal DEBUG: [_perform_ida_updates] Skipping comment set for {func_ea_str} due to missing func_t object.") | |
all_updates_succeeded = False # Cannot apply comment without func_t | |
# else: print(f" aiDAPal DEBUG: [_perform_ida_updates] No comment to apply for {func_ea_str}.") | |
# --- Apply Function Name --- | |
new_name = changes.get("function_name") | |
if new_name: | |
print(f' aiDAPal DEBUG: [_perform_ida_updates] Attempting function name update to: {new_name} for {func_ea_str}') | |
# Check current name only if func_t_obj exists, otherwise assume different | |
current_name = ida_funcs.get_func_name(func_ea) if func_t_obj else None # Safe now | |
if func_t_obj and current_name == new_name: | |
print(f" aiDAPal DEBUG: [_perform_ida_updates] Function name '{new_name}' is already set for {func_ea_str}. Skipping.") | |
else: | |
print(f" aiDAPal DEBUG: [_perform_ida_updates] Calling set_name for {func_ea_str} with name '{new_name}'...") | |
# set_name might work even if get_func failed, try it | |
if ida_name.set_name(func_ea, new_name, ida_name.SN_CHECK | ida_name.SN_FORCE | ida_name.SN_NOWARN): # Safe now | |
print(f' aiDAPal DEBUG: [_perform_ida_updates] Successfully updated function name for {func_ea_str}.') | |
func_changes_made = True | |
else: | |
print(f' aiDAPal DEBUG: [_perform_ida_updates] Failed to update function name for {func_ea_str} (set_name returned False).') | |
all_updates_succeeded = False # Mark failure | |
# else: print(f" aiDAPal DEBUG: [_perform_ida_updates] No function name to apply for {func_ea_str}.") | |
# --- Apply Variable Renames --- | |
variables_to_apply = changes.get("variables", []) | |
if variables_to_apply: | |
if can_decompile_for_rename: # Check pre-check result | |
print(f" aiDAPal DEBUG: [_perform_ida_updates] Attempting {len(variables_to_apply)} variable renames for {func_ea_str}...") | |
renamed_count = 0 | |
failed_count = 0 | |
skipped_count = 0 | |
for var_data in variables_to_apply: | |
original_name = var_data.get('original_name') | |
new_name_suggested = var_data.get('new_name') | |
if not original_name or not new_name_suggested: | |
print(" aiDAPal DEBUG: [_perform_ida_updates] Skipping rename: Missing original or new name in data.") | |
failed_count += 1 | |
continue | |
if original_name == new_name_suggested: | |
print(f" aiDAPal DEBUG: [_perform_ida_updates] Skipping rename: Suggested name '{new_name_suggested}' is same as original.") | |
skipped_count += 1 | |
continue | |
print(f" aiDAPal DEBUG: [_perform_ida_updates] Renaming '{original_name}' -> '{new_name_suggested}' using ida_hexrays.rename_lvar for func {func_ea_str}") | |
if ida_hexrays.rename_lvar(func_ea, original_name, new_name_suggested): # Safe now | |
print(f" aiDAPal DEBUG: [_perform_ida_updates] Successfully renamed '{original_name}' to '{new_name_suggested}'.") | |
renamed_count += 1 | |
func_changes_made = True | |
func_refresh_needed = True # Variable renames require refresh | |
else: | |
print(f" aiDAPal DEBUG: [_perform_ida_updates] rename_lvar failed for '{original_name}' -> '{new_name_suggested}' in {func_ea_str}.") | |
failed_count += 1 | |
all_updates_succeeded = False # Mark failure | |
print(f" aiDAPal DEBUG: [_perform_ida_updates] Variable renaming for {func_ea_str} finished. Success: {renamed_count}, Failed: {failed_count}, Skipped (same name): {skipped_count}") | |
else: | |
# Already logged the decompilation failure reason | |
print(f" aiDAPal DEBUG: [_perform_ida_updates] Skipping variable renames for {func_ea_str} due to failed decompilation pre-check.") | |
# If variables were requested but couldn't be applied, mark as failure | |
all_updates_succeeded = False | |
# else: print(f" aiDAPal DEBUG: [_perform_ida_updates] No variable renames to apply for {func_ea_str}.") | |
# --- Mark for Refresh --- | |
if func_refresh_needed: | |
print(f" aiDAPal DEBUG: [_perform_ida_updates] Refresh needed for {func_ea_str}. Marking cfunc dirty.") | |
ida_hexrays.mark_cfunc_dirty(func_ea) # Safe now | |
overall_refresh_needed = True | |
elif func_changes_made: | |
print(f" aiDAPal DEBUG: [_perform_ida_updates] Changes made (name/comment) for {func_ea_str}, marking dirty.") | |
# Only mark dirty if the function is known to the decompiler | |
if can_decompile_for_rename or func_t_obj: # Check if it's a known/decompilable func | |
ida_hexrays.mark_cfunc_dirty(func_ea) # Safe now | |
overall_refresh_needed = True # Mark dirty might trigger refresh anyway | |
else: | |
print(f" aiDAPal DEBUG: [_perform_ida_updates] Skipping mark_cfunc_dirty for {func_ea_str} as it might not be a valid Hex-Rays function.") | |
if func_changes_made: | |
overall_changes_made = True | |
print(f"--- Finished update for {func_ea_str} ---") | |
# --- Final Status --- | |
if overall_changes_made: | |
print(f"aiDAPal DEBUG: [_perform_ida_updates] Overall changes were made.") | |
if overall_refresh_needed: | |
print(f"aiDAPal DEBUG: [_perform_ida_updates] Refresh needed for one or more functions.") | |
else: | |
print(f"aiDAPal DEBUG: [_perform_ida_updates] No changes were applied.") | |
print(f"aiDAPal DEBUG: [_perform_ida_updates] Multi-function update finished. Success status: {all_updates_succeeded}") | |
return all_updates_succeeded # Return True if all individual updates succeeded | |
def on_cancel_clicked(self): | |
"""Handles the 'Close' button click.""" | |
print("aiDAPal DEBUG: [aiDAPalUIForm.on_cancel_clicked] 'Close' button clicked. Closing form.") | |
self.Close(0) # Indicate cancellation | |
def OnClose(self, form): | |
"""Called when the IDA form is being closed.""" | |
primary_func_ea_str = f"0x{self.primary_func_ea:X}" if self.primary_func_ea else "None" | |
print(f"aiDAPal DEBUG: [aiDAPalUIForm.OnClose] Form closing for analysis triggered by {primary_func_ea_str}.") | |
# Clean up references if necessary (handled by UI wrapper now) | |
pass | |
# --- UI Wrapper Class --- | |
class aiDAPalUI: | |
"""Helper class to instantiate and show the UI form, preventing duplicates for the *same analysis trigger*.""" | |
open_forms = {} # Class variable: Key: primary_func_ea, Value: aiDAPalUIForm instance | |
def __init__(self, analysis_results_list=None, primary_func_ea=None): | |
print("aiDAPal DEBUG: [aiDAPalUI.__init__] Initializing UI wrapper.") | |
if primary_func_ea is None or primary_func_ea == idaapi.BADADDR: | |
print("aiDAPal Error: [aiDAPalUI.__init__] Cannot show UI without a valid primary function EA.") | |
return | |
func_ea_str = f"0x{primary_func_ea:X}" | |
print(f"aiDAPal DEBUG: [aiDAPalUI.__init__] Target primary function EA: {func_ea_str}") | |
# Check if a form for an analysis *triggered by this primary function* is already open | |
if primary_func_ea in aiDAPalUI.open_forms: | |
print(f"aiDAPal DEBUG: [aiDAPalUI.__init__] Form for analysis triggered by {func_ea_str} already open. Attempting to activate.") | |
existing_form = aiDAPalUI.open_forms[primary_func_ea] | |
try: | |
widget = existing_form.GetWidget() | |
if widget: | |
print(f"aiDAPal DEBUG: [aiDAPalUI.__init__] Activating existing widget for {func_ea_str}.") | |
ida_kernwin.activate_widget(widget, True) | |
else: | |
print(f"aiDAPal Warning: [aiDAPalUI.__init__] Could not get widget for existing form {func_ea_str}.") | |
except Exception as e: | |
print(f"aiDAPal Warning: [aiDAPalUI.__init__] Error activating existing widget for {func_ea_str}: {e}") | |
return # Don't create a new one | |
if analysis_results_list is None: | |
print("aiDAPal Warning: [aiDAPalUI.__init__] Initialized with no results list.") | |
self.analysis_results = [] | |
else: | |
self.analysis_results = analysis_results_list | |
print(f"aiDAPal DEBUG: [aiDAPalUI.__init__] UI wrapper initialized for analysis triggered by {func_ea_str} with {len(self.analysis_results)} results.") | |
# Create and show the form | |
print(f"aiDAPal DEBUG: [aiDAPalUI.__init__] Creating new aiDAPalUIForm instance for {func_ea_str}.") | |
self.plg = aiDAPalUIForm(self.analysis_results, primary_func_ea) # Pass results list and primary EA | |
aiDAPalUI.open_forms[primary_func_ea] = self.plg # Store instance before showing | |
print(f"aiDAPal DEBUG: [aiDAPalUI.__init__] Form instance stored in open_forms for {func_ea_str}.") | |
# Set a unique caption based on the primary function | |
func_name_str = ida_funcs.get_func_name(primary_func_ea) or f"sub_{primary_func_ea:X}" | |
form_caption = f"aiDAPal Suggestions: {func_name_str} & Callees" # Updated caption | |
print(f"aiDAPal DEBUG: [aiDAPalUI.__init__] Setting form caption: '{form_caption}'") | |
# Show as a dockable, persistent window | |
show_flags = ida_kernwin.WOPN_PERSIST | ida_kernwin.WOPN_RESTORE | |
print(f"aiDAPal DEBUG: [aiDAPalUI.__init__] Calling Show() for {func_ea_str} with flags {show_flags}.") | |
self.plg.Show(form_caption, show_flags) | |
# Override OnClose to remove from tracking dict | |
print(f"aiDAPal DEBUG: [aiDAPalUI.__init__] Wrapping OnClose for form triggered by {func_ea_str}.") | |
original_on_close = self.plg.OnClose | |
# Use a lambda that captures the primary_func_ea for removal | |
wrapped_on_close = lambda form, ea=primary_func_ea: self.on_close_handler(form, ea, original_on_close) | |
self.plg.OnClose = wrapped_on_close # Assign the wrapper | |
print(f"aiDAPal DEBUG: [aiDAPalUI.__init__] UI wrapper initialization complete for {func_ea_str}.") | |
# Separate handler function for OnClose logic | |
def on_close_handler(self, form, func_ea, original_on_close_func): | |
func_ea_str = f"0x{func_ea:X}" | |
print(f"aiDAPal DEBUG: [aiDAPalUI.on_close_handler] Wrapper called for form triggered by {func_ea_str}.") | |
print(f"aiDAPal DEBUG: [aiDAPalUI.on_close_handler] Removing form for {func_ea_str} from tracking.") | |
aiDAPalUI.open_forms.pop(func_ea, None) # Remove safely using the captured EA | |
print(f"aiDAPal DEBUG: [aiDAPalUI.on_close_handler] Calling original OnClose for {func_ea_str}.") | |
original_on_close_func(form) # Call original OnClose | |
print(f"aiDAPal DEBUG: [aiDAPalUI.on_close_handler] Original OnClose returned for {func_ea_str}.") | |
# --- Helper Functions (get_references_from_function, get_function_data_ref_comments, get_callee_decompilation are unchanged) --- | |
# NOTE: get_callee_decompilation MUST be called from the main thread via execute_sync | |
def get_references_from_function(func_ea): | |
""" | |
Walks a function's items and collects unique data reference target addresses. | |
MUST be called from the main IDA thread or via execute_sync. | |
""" | |
# print(f"aiDAPal DEBUG: [get_references_from_function] Starting for EA: 0x{func_ea:X}") # Noisy | |
refs = set() | |
func = ida_funcs.get_func(func_ea) # Main thread only | |
if not func: | |
print(f"aiDAPal Error: [get_references_from_function] Could not get function object for EA 0x{func_ea:X}") | |
return refs | |
fii = ida_funcs.func_item_iterator_t() | |
ok = fii.set_range(func.start_ea, func.end_ea) # Main thread only | |
if not ok: | |
print(f"aiDAPal Error: [get_references_from_function] Failed to initialize function item iterator for 0x{func_ea:X}") | |
return refs | |
item_count = 0 | |
dref_count = 0 | |
while fii.next_code(): # Main thread only | |
insn_ea = fii.current() | |
item_count += 1 | |
dref_ea = ida_xref.get_first_dref_from(insn_ea) # Main thread only | |
while dref_ea != idaapi.BADADDR: | |
refs.add(dref_ea) | |
dref_count += 1 | |
dref_ea = ida_xref.get_next_dref_from(insn_ea, dref_ea) # Main thread only | |
# print(f"aiDAPal DEBUG: [get_references_from_function] Finished for 0x{func_ea:X}. Scanned: {item_count}. Refs: {dref_count}. Unique: {len(refs)}.") # Noisy | |
return refs | |
def get_function_data_ref_comments(current_func_ea): | |
""" | |
Extracts comments from data locations referenced by the given function. | |
MUST be called from the main IDA thread or via execute_sync. | |
""" | |
ea_str = f"0x{current_func_ea:X}" if current_func_ea is not None else "None" | |
print(f"aiDAPal DEBUG: [get_function_data_ref_comments] Starting for EA: {ea_str}") | |
if current_func_ea is None: | |
print("aiDAPal DEBUG: [get_function_data_ref_comments] No function EA provided.") | |
return None | |
# This function itself calls main-thread-only functions | |
references = get_references_from_function(current_func_ea) | |
if not references: | |
print(f"aiDAPal DEBUG: [get_function_data_ref_comments] No data references found for {ea_str}.") | |
return None | |
print(f"aiDAPal DEBUG: [get_function_data_ref_comments] Found {len(references)} unique data references for {ea_str}. Checking for comments...") | |
comment_lines = [] | |
for ref in sorted(list(references)): | |
cmt_text = "" | |
cmt_r = ida_bytes.get_cmt(ref, True) # Main thread only | |
cmt_n = ida_bytes.get_cmt(ref, False) # Main thread only | |
if cmt_r: cmt_text = cmt_r.strip() | |
if cmt_n: | |
cmt_n_stripped = cmt_n.strip() | |
if cmt_text and cmt_text != cmt_n_stripped: cmt_text += f" // {cmt_n_stripped}" | |
elif not cmt_text: cmt_text = cmt_n_stripped | |
if cmt_text: | |
name = ida_name.get_name(ref) # Main thread only | |
display_ref = name if name else f"0x{ref:X}" | |
comment_lines.append(f" * {display_ref}: {cmt_text}") | |
if not comment_lines: | |
print(f"aiDAPal DEBUG: [get_function_data_ref_comments] No comments found for any references in {ea_str}.") | |
return None | |
else: | |
header = "/* Referenced Data Comments:" | |
footer = "*/" | |
full_comment = header + "\n" + "\n".join(comment_lines) + "\n" + footer | |
print(f"aiDAPal DEBUG: [get_function_data_ref_comments] Found {len(comment_lines)} comments for {ea_str}. Returning comment block (length: {len(full_comment)}).") | |
return full_comment | |
def get_callee_decompilation(start_ea, max_depth): | |
""" | |
Performs a depth-limited traversal of the call graph starting from start_ea, | |
decompiles functions up to max_depth, and returns their code. | |
MUST be called from the main IDA thread or via execute_sync. | |
""" | |
print(f"aiDAPal DEBUG: [get_callee_decompilation] Starting traversal from 0x{start_ea:X} with max_depth={max_depth} (MAIN THREAD)") | |
decompiled_funcs = {} | |
funcs_to_visit = set() | |
visited_ea = set() | |
# Check start_ea validity (already requires main thread) | |
if ida_funcs.get_func(start_ea) is None: | |
print(f"aiDAPal Error: [get_callee_decompilation] Invalid start_ea: 0x{start_ea:X}") | |
return decompiled_funcs | |
funcs_to_visit.add(start_ea) | |
visited_ea.add(start_ea) | |
for current_depth in range(max_depth + 1): | |
print(f"aiDAPal DEBUG: [get_callee_decompilation] Processing depth {current_depth}") | |
next_level_funcs = set() | |
if not funcs_to_visit: | |
print(f"aiDAPal DEBUG: [get_callee_decompilation] No more functions to visit at depth {current_depth}.") | |
break | |
for func_ea in funcs_to_visit: | |
# Decompile (requires main thread) | |
if func_ea not in decompiled_funcs: | |
print(f" aiDAPal DEBUG: [get_callee_decompilation] Decompiling 0x{func_ea:X} at depth {current_depth}") | |
func_name = ida_name.get_name(func_ea) or f"sub_{func_ea:X}" # Main thread | |
try: | |
cfunc = ida_hexrays.decompile(func_ea) # Main thread | |
if cfunc: | |
decompiled_code = str(cfunc) | |
decompiled_funcs[func_ea] = f"// --- Function: {func_name} (0x{func_ea:X}) ---\n{decompiled_code}\n// --- End Function: {func_name} (0x{func_ea:X}) ---\n" | |
# print(f" aiDAPal DEBUG: Decompilation successful for 0x{func_ea:X} (length: {len(decompiled_code)})") # Noisy | |
else: | |
decompiled_funcs[func_ea] = f"// --- Decompilation FAILED for {func_name} (0x{func_ea:X}) ---\n" | |
print(f" aiDAPal Warning: Decompilation returned None for 0x{func_ea:X}") | |
except ida_hexrays.DecompilationFailure as e: | |
decompiled_funcs[func_ea] = f"// --- Decompilation ERROR for {func_name} (0x{func_ea:X}): {e} ---\n" | |
print(f" aiDAPal Error: Decompilation failed for 0x{func_ea:X}: {e}") | |
except Exception as e: | |
decompiled_funcs[func_ea] = f"// --- Decompilation UNEXPECTED ERROR for {func_name} (0x{func_ea:X}): {e} ---\n" | |
print(f" aiDAPal Error: Unexpected error decompiling 0x{func_ea:X}: {e}") | |
traceback.print_exc() | |
# Find Callees (requires main thread) - Using operand iteration method | |
if current_depth < max_depth: | |
func = ida_funcs.get_func(func_ea) # Main thread | |
if not func: | |
print(f" aiDAPal Warning: [get_callee_decompilation] Could not get func object for 0x{func_ea:X} to find callees.") | |
continue | |
current_item_ea = func.start_ea | |
while current_item_ea < func.end_ea: | |
# All API calls here require main thread | |
flags = idaapi.get_flags(current_item_ea) | |
if idaapi.is_code(flags): | |
# *** CORRECTED is_call_insn usage *** | |
if idaapi.is_call_insn(current_item_ea): | |
# *** SWITCHED to operand iteration *** | |
for i in range(idaapi.UA_MAXOP): # Iterate through operands | |
op_type = idc.get_operand_type(current_item_ea, i) | |
# Check if the operand is a code address (near or far) | |
if op_type == idc.o_near or op_type == idc.o_far: | |
ref = idc.get_operand_value(current_item_ea, i) | |
if ref != idaapi.BADADDR: # Ensure we got a valid address | |
called_func = ida_funcs.get_func(ref) | |
# Ensure the reference is the start of a function | |
if called_func and called_func.start_ea == ref: | |
callee_ea = ref | |
if callee_ea not in visited_ea: | |
# print(f" aiDAPal DEBUG: Found callee 0x{callee_ea:X} from operand {i} at 0x{current_item_ea:X}.") # Debugging noise | |
next_level_funcs.add(callee_ea) | |
visited_ea.add(callee_ea) | |
# Assuming a direct call instruction targets only one function via operands | |
break # Move to next instruction once a valid call target is found in operands | |
# *** END operand iteration block *** | |
next_ea = idc.next_head(current_item_ea, func.end_ea) # Main thread | |
if next_ea <= current_item_ea: | |
print(f" aiDAPal Warning: [get_callee_decompilation] next_head did not advance from 0x{current_item_ea:X}. Breaking loop for 0x{func_ea:X}.") | |
break | |
current_item_ea = next_ea | |
if current_item_ea == idaapi.BADADDR: | |
break | |
funcs_to_visit = next_level_funcs | |
print(f"aiDAPal DEBUG: [get_callee_decompilation] Traversal finished. Found {len(decompiled_funcs)} functions up to depth {max_depth}.") | |
return decompiled_funcs | |
# --- Google AI Interaction (UPDATED for Schema Changes) --- | |
def do_google_ai_analysis(code_prompt, model_name): | |
""" | |
Sends the code prompt to Google AI (Gemini) and expects structured JSON output | |
containing analysis for multiple functions. Uses original_function_name for mapping. | |
Args: | |
code_prompt (str): The C code for multiple functions and any extra context. | |
model_name (str): The Gemini model name (e.g., "gemini-1.5-flash-latest"). | |
Returns: | |
list: A list of dictionaries, where each dictionary contains the analysis | |
results for one function (matching SingleFunctionAnalysis schema, but with | |
original_function_name instead of function_ea), or None on failure. | |
""" | |
print(f"aiDAPal DEBUG: [do_google_ai_analysis] Starting MULTI-FUNCTION analysis request to Google AI model: {model_name}") | |
# ... (API Key check remains the same) ... | |
if not GOOGLE_AI_API_KEY or GOOGLE_AI_API_KEY == "YOUR_API_KEY_HERE": | |
print("aiDAPal Error: [do_google_ai_analysis] GOOGLE_AI_API_KEY not set or is placeholder.") | |
ida_kernwin.warning("aiDAPal Error: Google AI API Key not configured.\nSet the GOOGLE_AI_API_KEY variable in the script or environment variable.") | |
return None | |
print("aiDAPal DEBUG: [do_google_ai_analysis] API Key seems present.") | |
try: | |
# ... (genai configure, GenerationConfig, GenerativeModel creation remain the same, using the updated schema) ... | |
print("aiDAPal DEBUG: [do_google_ai_analysis] Configuring genai client...") | |
genai.configure(api_key=GOOGLE_AI_API_KEY) | |
print("aiDAPal DEBUG: [do_google_ai_analysis] Genai client configured.") | |
print("aiDAPal DEBUG: [do_google_ai_analysis] Setting up GenerationConfig with MULTI-FUNCTION schema (using original_function_name)...") | |
generation_config = genai.GenerationConfig( | |
response_mime_type="application/json", | |
response_schema=explicit_multi_function_analysis_schema, # Use the schema updated above | |
temperature=0.0 | |
) | |
print("aiDAPal DEBUG: [do_google_ai_analysis] GenerationConfig set.") | |
print(f"aiDAPal DEBUG: [do_google_ai_analysis] Creating GenerativeModel instance for '{model_name}'...") | |
model = genai.GenerativeModel( | |
model_name=model_name, | |
generation_config=generation_config, | |
safety_settings=DEFAULT_SAFETY_SETTINGS | |
) | |
print("aiDAPal DEBUG: [do_google_ai_analysis] GenerativeModel instance created.") | |
prompt = code_prompt | |
print(f"aiDAPal DEBUG: [do_google_ai_analysis] Sending prompt (length: {len(prompt)} chars) to {model_name}...") | |
print("aiDAPal DEBUG: [do_google_ai_analysis] Calling model.generate_content()...") | |
response = model.generate_content(prompt) | |
print(f"aiDAPal DEBUG: [do_google_ai_analysis] Received response object from {model_name}.") | |
# --- Process Response --- | |
if not response.parts: | |
print("aiDAPal Error: [do_google_ai_analysis] Response was empty or blocked.") | |
# ... (error reporting for blocked response remains the same) ... | |
try: | |
finish_reason = "UNKNOWN"; safety_ratings = "N/A" | |
if response.candidates: | |
finish_reason = response.candidates[0].finish_reason.name | |
safety_ratings = str(response.candidates[0].safety_ratings) | |
else: print("aiDAPal DEBUG: [do_google_ai_analysis] No candidates found.") | |
print(f" aiDAPal DEBUG: Finish Reason: {finish_reason}"); print(f" aiDAPal DEBUG: Safety Ratings: {safety_ratings}") | |
ida_kernwin.warning(f"aiDAPal: Google AI response was empty or blocked.\nReason: {finish_reason}\nSafety: {safety_ratings}\nCheck safety settings or prompt.") | |
except Exception as e_info: print(f"aiDAPal Error: [do_google_ai_analysis] Error getting finish reason/safety: {e_info}"); ida_kernwin.warning("aiDAPal: Google AI response was empty or blocked.") | |
return None | |
print("aiDAPal DEBUG: [do_google_ai_analysis] Response has parts. Attempting to parse response.text...") | |
try: | |
raw_text = response.text | |
# ... (cleanup remains the same) ... | |
cleaned_text = raw_text.strip() | |
if cleaned_text.startswith("```json"): cleaned_text = cleaned_text[7:] | |
if cleaned_text.endswith("```"): cleaned_text = cleaned_text[:-3] | |
cleaned_text = cleaned_text.strip() | |
if cleaned_text.startswith("```"): cleaned_text = cleaned_text[3:] # Handle case with just ``` | |
if cleaned_text.endswith("```"): cleaned_text = cleaned_text[:-3] | |
cleaned_text = cleaned_text.strip() | |
print("aiDAPal DEBUG: [do_google_ai_analysis] Calling json.loads()...") | |
parsed_response = json.loads(cleaned_text) | |
print("aiDAPal DEBUG: [do_google_ai_analysis] JSON parsing successful.") | |
# --- UPDATED Validation for Multi-Function Schema (using original_function_name) --- | |
print("aiDAPal DEBUG: [do_google_ai_analysis] Validating parsed multi-function structure...") | |
if not isinstance(parsed_response, dict): | |
raise ValueError("Top level response is not a dictionary.") | |
if "function_analyses" not in parsed_response: | |
raise ValueError("Missing required top-level key: 'function_analyses'") | |
if not isinstance(parsed_response["function_analyses"], list): | |
raise ValueError("'function_analyses' field is not a list.") | |
validated_analyses = [] | |
# Define the required keys based on the updated schema | |
required_inner_keys = [ | |
# "function_ea", # REMOVED | |
"original_function_name", # ADDED | |
"observations", "function_name_reason", | |
"function_name_reason_findings", "function_name", "comment_reason", | |
"comment_reason_findings", "comment", "variables" | |
] | |
required_variable_keys = ["rename_reason", "rename_reason_findings", "original_name", "new_name"] | |
required_observation_keys = ["observation", "observation_impact"] | |
print(f"aiDAPal DEBUG: [do_google_ai_analysis] Validating {len(parsed_response['function_analyses'])} function analysis items...") | |
for i, func_analysis in enumerate(parsed_response["function_analyses"]): | |
if not isinstance(func_analysis, dict): | |
print(f"aiDAPal Warning: [do_google_ai_analysis] Item at index {i} in 'function_analyses' is not a dictionary. Skipping item: {func_analysis}") | |
continue | |
# Check required keys for the function analysis itself | |
valid_item = True | |
for key in required_inner_keys: | |
if key not in func_analysis: | |
print(f"aiDAPal Warning: [do_google_ai_analysis] Missing required key '{key}' in function analysis item at index {i}. Providing default/skipping.") | |
# Provide defaults for robustness, or skip if critical like original_function_name | |
if key == 'original_function_name': # Cannot proceed without name for mapping | |
valid_item = False; break | |
elif key == 'variables': func_analysis[key] = [] | |
elif key == 'observations': func_analysis[key] = [] | |
elif key == 'function_name': func_analysis[key] = f"suggested_name_{i}" | |
elif key == 'comment': func_analysis[key] = "(No comment suggested)" | |
else: func_analysis[key] = "" # Default for reason/findings strings | |
if not valid_item: continue # Skip if original_function_name was missing | |
# Validate 'observations' list structure (remains same) | |
if not isinstance(func_analysis.get('observations'), list): | |
print(f"aiDAPal Warning: [do_google_ai_analysis] 'observations' field is not a list in item {i}. Setting to empty list.") | |
func_analysis['observations'] = [] | |
else: | |
validated_observations = [] | |
for obs_idx, obs_item in enumerate(func_analysis['observations']): | |
if not isinstance(obs_item, dict): | |
print(f"aiDAPal Warning: [do_google_ai_analysis] Observation item at index {obs_idx} in function analysis {i} is not a dictionary. Skipping item: {obs_item}") | |
continue | |
valid_obs = True | |
for obs_key in required_observation_keys: | |
if obs_key not in obs_item: | |
print(f"aiDAPal Warning: [do_google_ai_analysis] Missing required key '{obs_key}' in observation item {obs_idx} (func analysis {i}). Skipping item: {obs_item}") | |
valid_obs = False; break | |
if valid_obs: | |
validated_observations.append(obs_item) | |
func_analysis['observations'] = validated_observations | |
# Validate 'variables' list structure (including new required keys - remains same) | |
if not isinstance(func_analysis.get('variables'), list): | |
print(f"aiDAPal Warning: [do_google_ai_analysis] 'variables' field is not a list in item {i}. Setting to empty list.") | |
func_analysis['variables'] = [] | |
else: | |
validated_variables = [] | |
for j, var_item in enumerate(func_analysis.get('variables', [])): | |
if not isinstance(var_item, dict): | |
print(f"aiDAPal Warning: [do_google_ai_analysis] Variable item at index {j} in function analysis {i} is not a dictionary. Skipping item: {var_item}") | |
continue | |
valid_var = True | |
for v_key in required_variable_keys: # Use updated required keys | |
if v_key not in var_item: | |
print(f"aiDAPal Warning: [do_google_ai_analysis] Missing required key '{v_key}' in variable item {j} (func analysis {i}). Skipping item: {var_item}") | |
valid_var = False; break | |
if valid_var: | |
validated_variables.append(var_item) | |
func_analysis['variables'] = validated_variables | |
validated_analyses.append(func_analysis) # Add the validated function analysis | |
print(f"aiDAPal DEBUG: [do_google_ai_analysis] Validation complete. {len(validated_analyses)} valid function analyses found.") | |
# Return the list of validated function analysis dictionaries (now containing original_function_name) | |
print("aiDAPal DEBUG: [do_google_ai_analysis] Successfully parsed and validated MULTI-FUNCTION JSON. Returning list of analyses.") | |
return validated_analyses # Return the list directly | |
# ... (Error handling for JSONDecodeError, ValueError, Exception remains the same) ... | |
except (json.JSONDecodeError, ValueError) as e: | |
print(f"aiDAPal Error: [do_google_ai_analysis] Failed to parse or validate MULTI-FUNCTION JSON: {e}") | |
print(f"aiDAPal DEBUG: Response text that failed parsing was:\n---\n{raw_text}\n---") | |
ida_kernwin.warning(f"aiDAPal failed to parse or validate JSON response from Google AI.\nError: {e}\n\nCheck Output window for response text.") | |
return None | |
except Exception as e_parse: | |
print(f"aiDAPal Error: [do_google_ai_analysis] Unexpected error processing response.text: {e_parse}") | |
traceback.print_exc() | |
ida_kernwin.warning("aiDAPal: Unexpected error processing Google AI response text.") | |
return None | |
# ... (Error handling for ImportError, Exception remains the same) ... | |
except ImportError: | |
print("aiDAPal Error: [do_google_ai_analysis] Required Python libraries not found (ImportError).") | |
ida_kernwin.warning("aiDAPal Error: Required Python libraries ('google-generativeai', 'pydantic') not found.") | |
return None | |
except Exception as e: | |
print(f"aiDAPal Error: [do_google_ai_analysis] An unexpected error occurred during API interaction: {e}") | |
traceback.print_exc() | |
ida_kernwin.warning(f"aiDAPal: An unexpected error occurred during Google AI interaction: {e}") | |
return None | |
# --- Asynchronous Task Handling --- | |
def do_show_ui(results_list, primary_func_ea): | |
""" | |
Callback function to show the UI form (with tabs) in the main IDA thread. | |
Args: | |
results_list (list): The list of analysis result dicts from the AI. | |
primary_func_ea (int): The EA of the function that triggered the analysis. | |
Returns: | |
bool: Always False (as required by execute_ui_requests). | |
""" | |
func_ea_str = f"0x{primary_func_ea:X}" if primary_func_ea else "None" | |
print(f"aiDAPal DEBUG: [do_show_ui] Entered. Will show multi-function UI triggered by {func_ea_str}.") | |
if primary_func_ea is None or primary_func_ea == idaapi.BADADDR: | |
print("aiDAPal Error: [do_show_ui] No valid primary function EA provided.") | |
return False # Must return bool | |
print(f"aiDAPal DEBUG: [do_show_ui] Preparing to show UI for analysis triggered by {func_ea_str}") | |
try: | |
print(f"aiDAPal DEBUG: [do_show_ui] Instantiating aiDAPalUI for {func_ea_str}...") | |
# Pass the list of results and the primary EA to the UI wrapper | |
aiDAPalUI(results_list, primary_func_ea) | |
print(f"aiDAPal DEBUG: [do_show_ui] aiDAPalUI instantiation complete. UI display initiated.") | |
except Exception as e: | |
print(f"aiDAPal Error: [do_show_ui] Error creating or showing UI: {e}") | |
traceback.print_exc() | |
print(f"aiDAPal DEBUG: [do_show_ui] Exiting for {func_ea_str}. Returning False.") | |
return False # Required for execute_ui_requests | |
# --- Analysis Task Wrapper (for concurrency control) --- | |
# Wrapper remains the same, but async_call now handles multiple functions | |
def analysis_task_wrapper(primary_cfunc, model, analyze_only_current, context): # MODIFIED: Added analyze_only_current | |
"""Wrapper for async_call to handle concurrency flag.""" | |
func_ea_local = primary_cfunc.entry_ea if primary_cfunc else idaapi.BADADDR | |
func_ea_str = f"0x{func_ea_local:X}" if func_ea_local != idaapi.BADADDR else "BADADDR" | |
analysis_type_str = "SINGLE" if analyze_only_current else "MULTI" # Added for logging | |
print(f"aiDAPal DEBUG: [analysis_task_wrapper] Thread started for {analysis_type_str} analysis triggered by {func_ea_str}, model {model}.") | |
try: | |
print(f"aiDAPal DEBUG: [analysis_task_wrapper] Calling async_call for {func_ea_str}...") | |
# Pass the primary cfunc object AND the flags/context | |
async_call(primary_cfunc, model, analyze_only_current, context) # MODIFIED: Pass flags/context | |
print(f"aiDAPal DEBUG: [analysis_task_wrapper] async_call finished for {func_ea_str}.") | |
except Exception as e: | |
print(f"aiDAPal Error: [analysis_task_wrapper] Exception during async_call for {func_ea_str}: {e}") | |
traceback.print_exc() | |
finally: | |
# Ensure the flag is cleared even if async_call fails or UI doesn't show | |
if func_ea_local != idaapi.BADADDR: | |
print(f"aiDAPal DEBUG: [analysis_task_wrapper] Acquiring lock to clear analysis flag for {func_ea_str}...") | |
with g_analysis_lock: | |
g_analysis_in_progress.discard(func_ea_local) # Use primary EA for the flag | |
print(f"aiDAPal DEBUG: [analysis_task_wrapper] Analysis flag cleared for {func_ea_str}. Current in progress: {g_analysis_in_progress}") | |
else: | |
print("aiDAPal DEBUG: [analysis_task_wrapper] Cannot clear flag, invalid primary func_ea.") | |
print(f"aiDAPal DEBUG: [analysis_task_wrapper] Thread finished for analysis triggered by {func_ea_str}.") | |
# --- MODIFIED ASYNC CALL (Using Name Mapping) --- | |
def async_call(primary_cfunc, model_name, analyze_only_current=False, extra_context=None): # Signature is correct | |
""" | |
Handles the asynchronous call to the AI analysis function for the primary | |
function and its callees, expecting multi-function results (using names), | |
maps results back to EAs, and schedules the UI update. | |
Args: | |
primary_cfunc (ida_hexrays.cfunc_t): The decompiled object of the primary function. | |
model_name (str): The Google AI model to use. | |
analyze_only_current (bool): If True, instruct the AI to only analyze the primary function. | |
extra_context (str | None): Optional string context (e.g., data references). | |
""" | |
primary_func_ea = idaapi.BADADDR | |
primary_func_name_for_prompt = "UNKNOWN_FUNCTION" # Fallback | |
if primary_cfunc: | |
primary_func_ea = primary_cfunc.entry_ea | |
# Get the name for the prompt here, before potentially going async | |
# Use execute_sync to safely get the name | |
name_container = [None] | |
def get_name_main(ea, container): | |
try: | |
container[0] = ida_funcs.get_func_name(ea) or f"sub_{ea:X}" | |
return 1 | |
except: | |
container[0] = f"sub_{ea:X}" # Fallback | |
return 0 | |
name_sync_status = ida_kernwin.execute_sync(lambda: get_name_main(primary_func_ea, name_container), ida_kernwin.MFF_READ) | |
if name_sync_status == 1: | |
primary_func_name_for_prompt = name_container[0] | |
else: | |
primary_func_name_for_prompt = f"sub_{primary_func_ea:X}" | |
primary_func_ea_str = f"0x{primary_func_ea:X}" if primary_func_ea != idaapi.BADADDR else "BADADDR" | |
analysis_type_str = "SINGLE-FUNCTION (with context)" if analyze_only_current else "MULTI-FUNCTION" | |
print(f"aiDAPal DEBUG: [async_call] Starting {analysis_type_str} analysis in background thread triggered by {primary_func_ea_str} ({primary_func_name_for_prompt}) using Google AI model: {model_name}") | |
if not primary_cfunc: | |
print("aiDAPal Error: [async_call] No primary function context (cfunc object) provided.") | |
return | |
# --- Get Decompiled Code (RUN IN MAIN THREAD) --- | |
all_decompiled_codes = None # {ea: code_string} | |
decomp_result_container = [None] | |
# ... (run_decompilation_in_main_thread and execute_sync call remain the same) ... | |
try: | |
max_callee_depth = 2 | |
print(f"aiDAPal DEBUG: [async_call] Preparing to run decompilation in main thread for {primary_func_ea_str}...") | |
def run_decompilation_in_main_thread(container): | |
print(f"aiDAPal DEBUG: [run_decompilation_in_main_thread] Executing in main thread for {primary_func_ea_str}...") | |
try: | |
ida_kernwin.show_wait_box(f"Decompiling primary & callees (depth {max_callee_depth}) for {model_name}...") | |
container[0] = get_callee_decompilation(primary_func_ea, max_callee_depth) | |
ida_kernwin.hide_wait_box() | |
print(f"aiDAPal DEBUG: [run_decompilation_in_main_thread] Decompilation finished. Stored result type: {type(container[0])}") | |
return 1 | |
except Exception as e_main: | |
print(f"aiDAPal Error: [run_decompilation_in_main_thread] Exception: {e_main}") | |
traceback.print_exc() | |
ida_kernwin.hide_wait_box() | |
container[0] = None | |
return 0 | |
print(f"aiDAPal DEBUG: [async_call] Calling execute_sync for decompilation...") | |
sync_status = ida_kernwin.execute_sync(lambda: run_decompilation_in_main_thread(decomp_result_container), ida_kernwin.MFF_WRITE) | |
print(f"aiDAPal DEBUG: [async_call] execute_sync for decompilation returned status: {sync_status}") | |
if sync_status == 1: | |
all_decompiled_codes = decomp_result_container[0] | |
if isinstance(all_decompiled_codes, dict): | |
print(f"aiDAPal DEBUG: [async_call] Retrieved {len(all_decompiled_codes)} decompiled functions via main thread.") | |
elif all_decompiled_codes is None: raise RuntimeError("Decompilation returned None from main thread.") | |
else: raise RuntimeError(f"Unexpected type returned from decompilation: {type(all_decompiled_codes)}") | |
else: raise RuntimeError(f"Decompilation failed or was cancelled in main thread (status: {sync_status}).") | |
if not all_decompiled_codes: | |
print(f"aiDAPal Warning: [async_call] No decompiled code returned. Cannot proceed.") | |
return | |
except Exception as e: | |
print(f"aiDAPal Error: [async_call] Failed during decompilation process orchestration for {primary_func_ea_str}: {e}") | |
traceback.print_exc() | |
return | |
# --- Create EA <-> Name Mapping (RUN IN MAIN THREAD) --- | |
ea_to_original_name = None # {ea: name_string} | |
name_map_container = [None] | |
try: | |
print(f"aiDAPal DEBUG: [async_call] Preparing to create EA->Name map in main thread...") | |
# Get EAs from the decompiled codes dictionary | |
target_eas = list(all_decompiled_codes.keys()) | |
def create_name_map_main(eas, container): | |
print(f"aiDAPal DEBUG: [create_name_map_main] Executing in main thread for {len(eas)} EAs...") | |
mapping = {} | |
try: | |
for ea in eas: | |
name = ida_funcs.get_func_name(ea) or f"sub_{ea:X}" | |
mapping[ea] = name | |
container[0] = mapping | |
print(f"aiDAPal DEBUG: [create_name_map_main] Created map with {len(mapping)} entries.") | |
return 1 | |
except Exception as e_map: | |
print(f"aiDAPal Error: [create_name_map_main] Exception: {e_map}") | |
traceback.print_exc() | |
container[0] = None | |
return 0 | |
print(f"aiDAPal DEBUG: [async_call] Calling execute_sync for name mapping...") | |
sync_status_map = ida_kernwin.execute_sync(lambda: create_name_map_main(target_eas, name_map_container), ida_kernwin.MFF_READ) | |
print(f"aiDAPal DEBUG: [async_call] execute_sync for name mapping returned status: {sync_status_map}") | |
if sync_status_map == 1: | |
ea_to_original_name = name_map_container[0] | |
if not ea_to_original_name: # Check if map is empty or None | |
raise RuntimeError("Failed to create EA to Name map in main thread.") | |
print(f"aiDAPal DEBUG: [async_call] Successfully created EA->Name map with {len(ea_to_original_name)} entries.") | |
else: | |
raise RuntimeError(f"Name mapping failed or was cancelled in main thread (status: {sync_status_map}).") | |
except Exception as e_map_orch: | |
print(f"aiDAPal Error: [async_call] Failed during name mapping orchestration: {e_map_orch}") | |
traceback.print_exc() | |
return # Cannot proceed without the map | |
# --- Prepare Prompt --- | |
full_code_context_str = "" | |
if all_decompiled_codes: | |
print(f"aiDAPal DEBUG: [async_call] Formatting {len(all_decompiled_codes)} functions for prompt...") | |
sorted_eas = sorted(all_decompiled_codes.keys()) | |
# Use the generated map to get the correct name for the header | |
code_context_list = [] | |
for ea in sorted_eas: | |
code_str = all_decompiled_codes[ea] | |
# Find the original header and replace the name if needed (more robust) | |
original_name_in_map = ea_to_original_name.get(ea, f"sub_{ea:X}") # Get name from map | |
# Reconstruct the header line for clarity in the prompt | |
header = f"// --- Function: {original_name_in_map} (0x{ea:X}) ---" | |
# Find the existing header in the code string (assuming it's the first line) | |
lines = code_str.split('\n', 1) | |
if len(lines) > 0 and lines[0].startswith("// --- Function:"): | |
code_context_list.append(header + "\n" + lines[1]) # Replace header | |
else: | |
code_context_list.append(header + "\n" + code_str) # Prepend header if missing | |
full_code_context_str = "\n\n---\n\n".join(code_context_list) | |
print(f"aiDAPal DEBUG: [async_call] Full code context string length: {len(full_code_context_str)}") | |
else: | |
print(f"aiDAPal Error: [async_call] No code context generated, cannot proceed.") | |
return | |
# Build the final prompt content | |
prompt_parts = [] | |
# ... (Persona prompt remains the same) ... | |
prompt_parts.append("""Adopt an extremely precise, methodical academic writing style characterized by absolute scientific rigor. | |
Use dense, technical language with meticulously structured arguments. Prioritize objectivity, clarity, and empirical evidence. | |
Construct sentences with surgical precision, eliminating any potential ambiguity. | |
Ensure every statement is backed by verifiable research, with clear citations and logical progression of ideas. | |
Maintain a completely impersonal, detached tone that focuses exclusively on empirical observations and analytical reasoning. | |
Avoid any colloquial expressions, rhetorical flourishes, or subjective interpretations. | |
Each paragraph must demonstrate a clear logical structure with explicit connections between claims and supporting evidence. | |
I need you to always reply comprehensively, completely, without any omission, fully and with scientific rigor.""") | |
# --- CHOOSE PROMPT INSTRUCTIONS BASED ON analysis_only_current FLAG --- | |
if analyze_only_current: | |
print(f"aiDAPal DEBUG: [async_call] Using SINGLE-FUNCTION analysis instructions for primary function {primary_func_name_for_prompt} ({primary_func_ea_str}).") | |
prompt_parts.append(f"\nAnalyze ONLY the primary function '{primary_func_name_for_prompt}' (at address {primary_func_ea_str}) from the following C-like pseudocode functions obtained from IDA Pro. Use the other functions provided SOLELY as context for understanding the primary function's interactions.") | |
if extra_context: | |
print(f"aiDAPal DEBUG: [async_call] Adding data reference context (for primary func {primary_func_ea_str}, length {len(extra_context)}).") | |
prompt_parts.append(f"\n/* Context: Data locations referenced by the *primary* function ({primary_func_name_for_prompt} at {primary_func_ea_str}) and their comments (if any):\n{extra_context}\n*/") | |
prompt_parts.append(f"\n--- START FUNCTION CODE CONTEXT ---\n{full_code_context_str}\n--- END FUNCTION CODE CONTEXT ---") | |
prompt_parts.append(f""" | |
Instructions for analyzing ONLY the primary function '{primary_func_name_for_prompt}' ({primary_func_ea_str}): | |
Focus your analysis exclusively on the function '{primary_func_name_for_prompt}' identified by its name and address {primary_func_ea_str}. | |
Provide the following analysis details ONLY for this primary function: | |
1. `original_function_name`: The original name of the primary function ('{primary_func_name_for_prompt}') EXACTLY as it appears in its '// --- Function: NAME (EA) ---' header. This is CRITICAL. | |
2. `observations`: An array of objects describing notable aspects of the primary function's logic, parameters, return value, or potential issues. Each object must have `observation` and `observation_impact` strings. | |
3. `function_name_reason`: A string explaining the reasoning behind the suggested function name for the primary function. | |
4. `function_name_reason_findings`: A string detailing specific code patterns or logic in the primary function that led to the name suggestion. | |
5. `function_name`: A concise and descriptive suggested function name for the primary function following C/C++ conventions. | |
6. `comment_reason`: A string explaining the reasoning behind the suggested comment for the primary function. | |
7. `comment_reason_findings`: A string detailing specific code patterns or logic in the primary function that informed the comment. | |
8. `comment`: A detailed multi-line comment (plain text, without /* */) explaining the primary function's purpose, parameters, return value, and logic. | |
9. `variables`: An array of objects for suggested variable renames within the primary function ONLY. Each object must have `rename_reason`, `rename_reason_findings`, `original_name`, and `new_name` strings. Only include variables where a rename significantly improves clarity. | |
Return the results as a single JSON object matching the standard multi-function schema, but the `function_analyses` array MUST contain EXACTLY ONE entry, corresponding ONLY to the primary function '{primary_func_name_for_prompt}' ({primary_func_ea_str}). Do NOT include analysis for any other functions, even though their code was provided for context. | |
JSON Response:""") | |
else: # Original multi-function analysis | |
print(f"aiDAPal DEBUG: [async_call] Using MULTI-FUNCTION analysis instructions.") | |
prompt_parts.append(f"\nAnalyze ALL of the following C-like pseudocode functions obtained from IDA Pro. The analysis was triggered for the function '{primary_func_name_for_prompt}' at {primary_func_ea_str}.") | |
if extra_context: | |
print(f"aiDAPal DEBUG: [async_call] Adding data reference context (for primary func {primary_func_ea_str}, length {len(extra_context)}).") | |
prompt_parts.append(f"\n/* Context: Data locations referenced by the *triggering* function ({primary_func_name_for_prompt} at {primary_func_ea_str}) and their comments (if any):\n{extra_context}\n*/") | |
prompt_parts.append(f"\n--- START FUNCTION CODE CONTEXT ---\n{full_code_context_str}\n--- END FUNCTION CODE CONTEXT ---") | |
# --- ORIGINAL MULTI-FUNCTION INSTRUCTIONS --- | |
prompt_parts.append(""" | |
Instructions: | |
Analyze EACH function provided in the 'FUNCTION CODE CONTEXT' section above. | |
For EACH function, provide the following analysis details: | |
1. `original_function_name`: The original name of the function (e.g., 'sub_140001000' or 'MyFunction') EXACTLY as it appears in the '// --- Function: NAME (EA) ---' header provided in the context for that function. This is CRITICAL for mapping the results back. | |
2. `observations`: An array of objects, where each object has: | |
* `observation`: A string describing a notable aspect of the function's logic, parameters, return value, or potential issues. | |
* `observation_impact`: A string explaining the significance or impact of the observation. | |
3. `function_name_reason`: A string explaining the reasoning behind the suggested function name. | |
4. `function_name_reason_findings`: A string detailing specific code patterns or logic that led to the name suggestion. | |
5. `function_name`: A concise and descriptive suggested function name following C/C++ conventions. | |
6. `comment_reason`: A string explaining the reasoning behind the suggested comment. | |
7. `comment_reason_findings`: A string detailing specific code patterns or logic that informed the comment. | |
8. `comment`: A detailed multi-line comment (plain text, without /* */) explaining the function's purpose, parameters, return value, and logic. | |
9. `variables`: An array of objects for suggested variable renames within this function. Each object must have: | |
* `rename_reason`: A string explaining the reason for this specific variable rename. | |
* `rename_reason_findings`: A string detailing the specific code usage that led to the rename suggestion. | |
* `original_name`: The original variable name as seen in the pseudocode. | |
* `new_name`: The suggested new, more descriptive variable name. | |
* (Only include variables where a rename significantly improves clarity). | |
Return the results as a single JSON object matching the schema provided previously (containing a `function_analyses` array where each item has all the fields listed above, including `original_function_name`). | |
Provide analysis for ALL functions presented in the context. | |
JSON Response:""") | |
# --- END INSTRUCTION SELECTION --- | |
prompt_content = "\n".join(prompt_parts) | |
print(f"aiDAPal DEBUG: [async_call] Final {analysis_type_str} prompt content length: {len(prompt_content)} chars for analysis triggered by {primary_func_ea_str}") | |
# --- Call AI Analysis --- | |
results_list_with_names = None # Expecting list with names | |
analysis_exception = None | |
try: | |
wait_box_msg = f"Asking Google AI ({model_name})... ({analysis_type_str})" | |
print(f"aiDAPal DEBUG: [async_call] Showing AI wait box for {primary_func_ea_str}: '{wait_box_msg}'") | |
ida_kernwin.execute_ui_requests([lambda: ida_kernwin.show_wait_box(wait_box_msg)]) | |
print(f"aiDAPal DEBUG: [async_call] Calling do_google_ai_analysis for {primary_func_ea_str}...") | |
results_list_with_names = do_google_ai_analysis(prompt_content, model_name) # Expects list or None | |
print(f"aiDAPal DEBUG: [async_call] do_google_ai_analysis returned. Result is None: {results_list_with_names is None}. Count: {len(results_list_with_names) if results_list_with_names else 0}") | |
except Exception as e: | |
print(f"aiDAPal Error: [async_call] Exception during AI analysis call for {primary_func_ea_str}: {e}") | |
analysis_exception = e | |
traceback.print_exc() | |
results_list_with_names = None | |
finally: | |
print(f"aiDAPal DEBUG: [async_call] Scheduling hide_wait_box for {primary_func_ea_str} via execute_ui_requests.") | |
ida_kernwin.execute_ui_requests([ida_kernwin.hide_wait_box]) | |
print(f"aiDAPal DEBUG: [async_call] hide_wait_box request submitted for {primary_func_ea_str}.") | |
# --- Map Results Back to EA --- | |
results_list_with_ea = [] | |
if results_list_with_names is not None: | |
print(f"aiDAPal DEBUG: [async_call] Mapping {len(results_list_with_names)} results back to EAs...") | |
# Create reverse map (Name -> EA). Handle potential duplicates crudely by taking the first encountered. | |
name_to_ea = {} | |
duplicate_names = set() | |
for ea, name in ea_to_original_name.items(): | |
if name in name_to_ea: | |
if name not in duplicate_names: | |
print(f"aiDAPal Warning: [async_call] Duplicate function name '{name}' encountered in context. Mapping might be ambiguous.") | |
duplicate_names.add(name) | |
else: | |
name_to_ea[name] = ea | |
mapped_count = 0 | |
unmapped_count = 0 | |
for analysis in results_list_with_names: | |
original_name = analysis.get("original_function_name") | |
if not original_name: | |
print(f"aiDAPal Warning: [async_call] Skipping analysis result missing 'original_function_name': {analysis.get('function_name', 'N/A')}") | |
unmapped_count += 1 | |
continue | |
found_ea = name_to_ea.get(original_name) | |
if found_ea is not None: | |
# --- Sanity Check for Single-Function Mode --- | |
# If we requested single-function analysis, ensure the result matches the primary EA | |
if analyze_only_current and found_ea != primary_func_ea: | |
print(f"aiDAPal Warning: [async_call] Requested single-function analysis for {primary_func_ea_str} ('{primary_func_name_for_prompt}'), but AI returned analysis for '{original_name}' (0x{found_ea:X}). Discarding.") | |
unmapped_count += 1 | |
continue | |
# --- End Sanity Check --- | |
analysis['function_ea'] = found_ea # Add the EA back to the dictionary | |
results_list_with_ea.append(analysis) | |
mapped_count += 1 | |
# print(f" Mapped '{original_name}' back to EA 0x{found_ea:X}") # Noisy | |
else: | |
print(f"aiDAPal Warning: [async_call] Could not map analysis result for name '{original_name}' back to an EA. Discarding result.") | |
unmapped_count += 1 | |
print(f"aiDAPal DEBUG: [async_call] Mapping complete. Mapped: {mapped_count}, Unmapped/Discarded: {unmapped_count}") | |
# --- Schedule UI Update --- | |
# Pass the list containing dictionaries *with EAs* | |
if results_list_with_ea: # Only show UI if we successfully mapped at least one result | |
print(f"aiDAPal DEBUG: [async_call] Analysis successful (mapped {len(results_list_with_ea)} items) for {primary_func_ea_str}. Preparing UI update.") | |
call_do_show_ui = partial(do_show_ui, results_list_with_ea, primary_func_ea) | |
print(f"aiDAPal DEBUG: [async_call] Scheduling UI update via execute_ui_requests for {primary_func_ea_str}.") | |
ida_kernwin.execute_ui_requests([call_do_show_ui]) | |
print(f"aiDAPal DEBUG: [async_call] UI update request submitted for {primary_func_ea_str}.") | |
else: | |
# Handle cases where AI failed, returned nothing, or mapping failed for all items | |
print(f"aiDAPal DEBUG: [async_call] Analysis failed, returned no results, or failed to map results for {primary_func_ea_str}.") | |
if analysis_exception: | |
print(f"aiDAPal Error: [async_call] Analysis failed due to exception: {analysis_exception}") | |
# Optionally show an info message if mapping failed but AI call succeeded | |
elif results_list_with_names is not None and not results_list_with_ea: | |
ida_kernwin.warning("aiDAPal: Received analysis from AI, but could not map results back to functions based on names, or the AI analyzed the wrong function. Check AI response format or function names.") | |
print(f"aiDAPal DEBUG: [async_call] Exiting background thread analysis function for {primary_func_ea_str}.") | |
# --- END MODIFIED ASYNC CALL --- | |
# --- IDA Plugin Integration (Action Handler, Hooks, Plugin Class) --- | |
# These parts remain largely the same, as the trigger point (right-click on primary function) | |
# and the mechanism for adding menu items don't change. | |
# The Action Handler now passes the primary cfunc to the wrapper. | |
class MyActionHandler(ida_kernwin.action_handler_t): | |
"""Handles the activation of the context menu actions.""" | |
def __init__(self, model_name, analyze_only_current=False): # ADDED analyze_only_current flag | |
self.model = model_name # Store the specific model for this handler instance | |
self.analyze_only_current = analyze_only_current # Store the analysis type | |
ida_kernwin.action_handler_t.__init__(self) | |
analysis_type_str = "SINGLE" if analyze_only_current else "MULTI" | |
print(f"aiDAPal DEBUG: [MyActionHandler.__init__] Action handler created for model '{self.model}', type: {analysis_type_str}") | |
def activate(self, ctx): | |
"""Called when the context menu item is clicked.""" | |
analysis_type_str = "SINGLE" if self.analyze_only_current else "MULTI" | |
print(f"aiDAPal DEBUG: [MyActionHandler.activate] Action '{self.model}' ({analysis_type_str}) activated.") | |
widget = ctx.widget | |
widget_type = ida_kernwin.get_widget_type(widget) | |
print(f"aiDAPal DEBUG: [MyActionHandler.activate] Context widget type: {widget_type}") | |
if widget_type != ida_kernwin.BWN_PSEUDOCODE: | |
print("aiDAPal DEBUG: [MyActionHandler.activate] Action activated outside pseudocode view. Ignoring.") | |
return 1 # Indicate action not handled | |
print("aiDAPal DEBUG: [MyActionHandler.activate] Getting vdui from pseudocode widget...") | |
vu = ida_hexrays.get_widget_vdui(widget) | |
if not vu: | |
print("aiDAPal Error: [MyActionHandler.activate] Could not get vdui for the pseudocode widget.") | |
ida_kernwin.warning("aiDAPal: Could not get pseudocode view details (vdui).") | |
return 1 | |
if not vu.cfunc: # Check if cfunc exists (decompilation successful) | |
print("aiDAPal Error: [MyActionHandler.activate] vdui exists, but vu.cfunc is None. Decompilation might have failed.") | |
ida_kernwin.warning("aiDAPal: Decompilation failed or not available for this function.") | |
return 1 | |
print("aiDAPal DEBUG: [MyActionHandler.activate] Got vdui and cfunc.") | |
# This is the PRIMARY function the user clicked on | |
primary_cfunc = vu.cfunc | |
primary_func_ea = primary_cfunc.entry_ea | |
primary_func_ea_str = f"0x{primary_func_ea:X}" | |
print(f"aiDAPal DEBUG: [MyActionHandler.activate] Target primary function EA: {primary_func_ea_str}") | |
# --- Concurrency Check (using primary function EA) --- | |
# Use a slightly different key for the check if needed, but primary EA should suffice | |
# as we don't want *any* analysis running for the same function simultaneously. | |
concurrency_key = primary_func_ea | |
print(f"aiDAPal DEBUG: [MyActionHandler.activate] Acquiring lock to check analysis status for {primary_func_ea_str}...") | |
with g_analysis_lock: | |
print(f"aiDAPal DEBUG: [MyActionHandler.activate] Lock acquired. Checking if {primary_func_ea_str} is in {g_analysis_in_progress}") | |
if concurrency_key in g_analysis_in_progress: | |
print(f"aiDAPal DEBUG: [MyActionHandler.activate] Analysis already in progress for {primary_func_ea_str}. Showing warning.") | |
ida_kernwin.warning(f"aiDAPal: Analysis already running for {ida_funcs.get_func_name(primary_func_ea)}. Please wait.") | |
print(f"aiDAPal DEBUG: [MyActionHandler.activate] Releasing lock and exiting activate for {primary_func_ea_str}.") | |
return 1 # Indicate action handled (by showing warning) | |
# Mark as in progress using the primary EA | |
g_analysis_in_progress.add(concurrency_key) | |
print(f"aiDAPal DEBUG: [MyActionHandler.activate] Marked {primary_func_ea_str} as in progress. Current: {g_analysis_in_progress}") | |
print(f"aiDAPal DEBUG: [MyActionHandler.activate] Lock released for {primary_func_ea_str}.") | |
# --- Get Context (Data References for the PRIMARY function - RUN IN MAIN THREAD) --- | |
print(f"aiDAPal DEBUG: [MyActionHandler.activate] Preparing to get data reference comments for primary function {primary_func_ea_str} in main thread...") | |
dref_comments = None # Initialize actual result variable | |
dref_result_container = [None] # Mutable container to hold the result | |
try: | |
# Define helper to run in main thread, modifying the container | |
def get_dref_comments_main(container): | |
print(f"aiDAPal DEBUG: [get_dref_comments_main] Running in main thread for {primary_func_ea_str}") | |
try: | |
# Store the actual result (string or None) in the container | |
container[0] = get_function_data_ref_comments(primary_func_ea) | |
print(f"aiDAPal DEBUG: [get_dref_comments_main] Stored result in container: Type={type(container[0])}") | |
return 1 # Indicate success to execute_sync | |
except Exception as e_inner: | |
print(f"aiDAPal Error: [get_dref_comments_main] Exception inside main thread execution: {e_inner}") | |
traceback.print_exc() | |
container[0] = None # Ensure container holds None on error | |
return 0 # Indicate failure to execute_sync | |
# Execute synchronously, passing the container | |
# Use a lambda to pass the container argument | |
sync_status = ida_kernwin.execute_sync( | |
lambda: get_dref_comments_main(dref_result_container), | |
ida_kernwin.MFF_READ | |
) | |
print(f"aiDAPal DEBUG: [MyActionHandler.activate] execute_sync for dref comments returned status: {sync_status}") | |
# Check the status code returned by execute_sync | |
if sync_status == 1: | |
# Success: retrieve the actual result from the container | |
dref_comments = dref_result_container[0] | |
print(f"aiDAPal DEBUG: [MyActionHandler.activate] Successfully retrieved dref comments from main thread. Type: {type(dref_comments)}") | |
# Now check the retrieved value | |
if dref_comments is not None: # Check if it's not None before using len() | |
print(f"aiDAPal DEBUG: [MyActionHandler.activate] Found data reference comments for {primary_func_ea_str} (length: {len(dref_comments)}).") | |
else: | |
print(f"aiDAPal DEBUG: [MyActionHandler.activate] No data reference comments found or returned as None for {primary_func_ea_str}.") | |
else: | |
# Failure or Cancelled | |
print(f"aiDAPal Error: [MyActionHandler.activate] execute_sync for dref comments failed or was cancelled (status: {sync_status}).") | |
# dref_comments remains None | |
# Release concurrency flag as we cannot proceed | |
with g_analysis_lock: g_analysis_in_progress.discard(concurrency_key) | |
return 1 # Indicate error occurred | |
except Exception as e_dref_sync: | |
print(f"aiDAPal Error: [MyActionHandler.activate] Exception occurred during execute_sync call for dref comments: {e_dref_sync}") | |
traceback.print_exc() | |
# dref_comments remains None | |
# Release concurrency flag | |
with g_analysis_lock: g_analysis_in_progress.discard(concurrency_key) | |
return 1 # Indicate error occurred | |
# --- Start Analysis Thread --- | |
# Pass the potentially None dref_comments value AND the analyze_only_current flag | |
print(f"aiDAPal DEBUG: [MyActionHandler.activate] Preparing analysis thread ({analysis_type_str}) for {primary_func_ea_str} using model '{self.model}'.") | |
# Use partial to pass the analyze_only_current flag to the wrapper | |
caller = partial(analysis_task_wrapper, primary_cfunc, self.model, self.analyze_only_current, dref_comments) | |
analysis_thread = threading.Thread(target=caller) | |
analysis_thread.start() | |
print(f"aiDAPal DEBUG: [MyActionHandler.activate] Analysis thread started for {primary_func_ea_str}, model '{self.model}'. Exiting activate.") | |
return 1 # Indicate action was initiated | |
def update(self, ctx): | |
"""Enables the action only when in a pseudocode view with successful decompilation.""" | |
if ctx.widget_type == ida_kernwin.BWN_PSEUDOCODE: | |
vu = ida_hexrays.get_widget_vdui(ctx.widget) | |
if vu and vu.cfunc: | |
return ida_kernwin.AST_ENABLE_FOR_WIDGET | |
else: | |
return ida_kernwin.AST_DISABLE_FOR_WIDGET | |
else: | |
return ida_kernwin.AST_DISABLE_FOR_WIDGET | |
class MyHooks(ida_kernwin.UI_Hooks): | |
"""Hooks into IDA's UI to add context menu items.""" | |
def finish_populating_widget_popup(self, widget, popup_handle, ctx=None): | |
"""Adds menu items to the widget's context menu.""" | |
widget_type = ida_kernwin.get_widget_type(widget) | |
if widget_type == ida_kernwin.BWN_PSEUDOCODE: | |
# print(f"aiDAPal DEBUG: [MyHooks.finish_populating_widget_popup] Adding menu items to Pseudocode popup.") # Noisy | |
try: | |
submenu_name = "aiDAPal Analysis" | |
action_added = False | |
for model_name in MODELS_TO_REGISTER: | |
# --- Add MULTI function analysis action --- | |
action_name_multi = f"aidapal:googleai:multi:{model_name}" | |
# print(f"aiDAPal DEBUG: [MyHooks.finish_populating_widget_popup] Attaching action '{action_name_multi}' to submenu '{submenu_name}'...") # Noisy | |
if ida_kernwin.attach_action_to_popup(widget, popup_handle, action_name_multi, f"{submenu_name}/", ida_kernwin.SETMENU_INS): | |
action_added = True | |
# print(f"aiDAPal DEBUG: [MyHooks.finish_populating_widget_popup] Successfully attached action '{action_name_multi}'.") # Noisy | |
else: | |
print(f"aiDAPal Warning: [MyHooks.finish_populating_widget_popup] Failed to attach action '{action_name_multi}' to popup.") | |
# --- Add SINGLE function analysis action --- | |
action_name_single = f"aidapal:googleai:single:{model_name}" | |
# print(f"aiDAPal DEBUG: [MyHooks.finish_populating_widget_popup] Attaching action '{action_name_single}' to submenu '{submenu_name}'...") # Noisy | |
if ida_kernwin.attach_action_to_popup(widget, popup_handle, action_name_single, f"{submenu_name}/", ida_kernwin.SETMENU_INS): | |
action_added = True | |
# print(f"aiDAPal DEBUG: [MyHooks.finish_populating_widget_popup] Successfully attached action '{action_name_single}'.") # Noisy | |
else: | |
print(f"aiDAPal Warning: [MyHooks.finish_populating_widget_popup] Failed to attach action '{action_name_single}' to popup.") | |
if action_added: | |
# print(f"aiDAPal DEBUG: [MyHooks.finish_populating_widget_popup] Adding separators to submenu '{submenu_name}'.") # Noisy | |
ida_kernwin.attach_action_to_popup(widget, popup_handle, "-", f"{submenu_name}/", ida_kernwin.SETMENU_INS | ida_kernwin.SETMENU_FIRST) | |
# else: print(f"aiDAPal DEBUG: [MyHooks.finish_populating_widget_popup] No actions were added, skipping separators.") # Noisy | |
except Exception as e: | |
print(f"aiDAPal Error: [MyHooks.finish_populating_widget_popup] Exception attaching actions to popup: {e}") | |
traceback.print_exc() | |
class aidapal_t(idaapi.plugin_t): | |
"""The main IDA Pro plugin class.""" | |
flags = idaapi.PLUGIN_PROC | idaapi.PLUGIN_HIDE | |
comment = "aiDAPal: Google AI assistance for Hex-Rays (Multi/Single Function Analysis)" # Updated comment | |
help = "Right-click in Pseudocode view to analyze the function (and optionally callees) with Google AI." # Updated help | |
wanted_name = "aiDAPal (Google AI - Multi/Single Func)" # Updated name | |
wanted_hotkey = "" | |
hooks = None # Class attribute to hold hook instance | |
registered_actions = [] # Class attribute | |
def init(self): | |
"""Called by IDA when loading the plugin.""" | |
print("-" * 60) | |
print(f"aiDAPal DEBUG: {self.wanted_name} plugin initializing...") | |
# Check prerequisites (unchanged) | |
print("aiDAPal DEBUG: [init] Checking prerequisites...") | |
if not GOOGLE_AI_API_KEY or GOOGLE_AI_API_KEY == "YOUR_API_KEY_HERE": | |
print("aiDAPal Warning: [init] GOOGLE_AI_API_KEY environment variable not set or is placeholder.") | |
print(" Plugin actions will not function until the key is set and IDA is restarted.") | |
print("aiDAPal DEBUG: [init] Initializing Hex-Rays...") | |
if not ida_hexrays.init_hexrays_plugin(): | |
print("aiDAPal Error: [init] Hex-Rays decompiler is not available. Plugin cannot run.") | |
print("-" * 60) | |
return idaapi.PLUGIN_SKIP | |
print("aiDAPal DEBUG: [init] Hex-Rays initialized successfully.") | |
aidapal_t.registered_actions = [] | |
print("aiDAPal DEBUG: [init] Cleared registered actions list.") | |
try: | |
# Register actions (tooltip updated) | |
print(f"aiDAPal DEBUG: [init] Registering actions for models: {MODELS_TO_REGISTER}") | |
for model_name in MODELS_TO_REGISTER: | |
# --- Register MULTI function action --- | |
action_name_multi = f"aidapal:googleai:multi:{model_name}" | |
action_label_multi = f'Analyze Func & Callees with {model_name}' # Original label | |
action_tooltip_multi = f'Send current function and callees (depth 2) to Google AI ({model_name}) for analysis of ALL functions' # Original tooltip | |
print(f"aiDAPal DEBUG: [init] Creating action description for '{action_name_multi}'...") | |
action_desc_multi = ida_kernwin.action_desc_t( | |
action_name_multi, action_label_multi, MyActionHandler(model_name, analyze_only_current=False), None, action_tooltip_multi, 199) # analyze_only_current=False | |
print(f"aiDAPal DEBUG: [init] Registering action '{action_name_multi}'...") | |
if ida_kernwin.register_action(action_desc_multi): | |
print(f" aiDAPal DEBUG: [init] Successfully registered action: '{action_name_multi}'") | |
aidapal_t.registered_actions.append(action_name_multi) | |
else: | |
print(f" aiDAPal Error: [init] Failed to register action: '{action_name_multi}'") | |
# --- Register SINGLE function action --- | |
action_name_single = f"aidapal:googleai:single:{model_name}" | |
action_label_single = f'Analyze Current Func Only with {model_name}' # New label | |
action_tooltip_single = f'Send current function and callees (depth 2) to Google AI ({model_name}) for analysis of ONLY the current function' # New tooltip | |
print(f"aiDAPal DEBUG: [init] Creating action description for '{action_name_single}'...") | |
action_desc_single = ida_kernwin.action_desc_t( | |
action_name_single, action_label_single, MyActionHandler(model_name, analyze_only_current=True), None, action_tooltip_single, 199) # analyze_only_current=True | |
print(f"aiDAPal DEBUG: [init] Registering action '{action_name_single}'...") | |
if ida_kernwin.register_action(action_desc_single): | |
print(f" aiDAPal DEBUG: [init] Successfully registered action: '{action_name_single}'") | |
aidapal_t.registered_actions.append(action_name_single) | |
else: | |
print(f" aiDAPal Error: [init] Failed to register action: '{action_name_single}'") | |
if not aidapal_t.registered_actions: | |
print("aiDAPal Error: [init] No actions were registered successfully. Skipping plugin load.") | |
print("-" * 60) | |
return idaapi.PLUGIN_SKIP | |
print(f"aiDAPal DEBUG: [init] Total actions registered: {len(aidapal_t.registered_actions)}") | |
# Install UI hooks (unchanged) | |
print("aiDAPal DEBUG: [init] Installing UI hooks...") | |
aidapal_t.hooks = MyHooks() | |
if aidapal_t.hooks.hook(): | |
print(" aiDAPal DEBUG: [init] UI hooks installed successfully.") | |
else: | |
print(" aiDAPal Error: [init] Failed to install UI hooks. Unregistering actions and skipping plugin load.") | |
self._unregister_actions() | |
aidapal_t.hooks = None | |
print("-" * 60) | |
return idaapi.PLUGIN_SKIP | |
except Exception as e: | |
print(f"aiDAPal Error: [init] Exception during initialization: {e}") | |
traceback.print_exc() | |
print("aiDAPal DEBUG: [init] Cleaning up due to exception...") | |
self._unregister_actions() | |
if aidapal_t.hooks: | |
try: aidapal_t.hooks.unhook() | |
except Exception as unhook_e: print(f"aiDAPal Error: [init] Exception during unhooking after error: {unhook_e}") | |
aidapal_t.hooks = None | |
print("-" * 60) | |
return idaapi.PLUGIN_SKIP | |
print(f"aiDAPal DEBUG: {self.wanted_name} plugin initialization complete.") | |
print("-" * 60) | |
return idaapi.PLUGIN_KEEP | |
def run(self, arg): | |
"""Called by IDA when running the plugin from the menu (not used here).""" | |
print(f"aiDAPal DEBUG: {self.wanted_name} run() called (arg={arg}). This plugin uses the context menu.") | |
ida_kernwin.info(f"{self.wanted_name}: Use the right-click context menu in the Pseudocode view to analyze functions.") | |
def term(self): | |
"""Called by IDA when unloading the plugin.""" | |
print("-" * 60) | |
print(f"aiDAPal DEBUG: {self.wanted_name} plugin terminating...") | |
# Unhook UI hooks (unchanged) | |
print("aiDAPal DEBUG: [term] Uninstalling UI hooks...") | |
try: | |
if aidapal_t.hooks: | |
aidapal_t.hooks.unhook() | |
print(" aiDAPal DEBUG: [term] UI hooks uninstalled successfully.") | |
aidapal_t.hooks = None | |
else: | |
print(" aiDAPal DEBUG: [term] No UI hooks instance to uninstall.") | |
except Exception as e: | |
print(f" aiDAPal Error: [term] Exception during unhooking: {e}") | |
traceback.print_exc() | |
# Unregister actions (unchanged) | |
print("aiDAPal DEBUG: [term] Unregistering actions...") | |
self._unregister_actions() | |
# Clean up any open UI forms (unchanged logic, uses primary EA key) | |
print(f"aiDAPal DEBUG: [term] Closing any tracked UI forms ({len(aiDAPalUI.open_forms)} found)...") | |
forms_to_close = list(aiDAPalUI.open_forms.items()) | |
for func_ea, form_instance in forms_to_close: | |
func_ea_str = f"0x{func_ea:X}" | |
print(f" aiDAPal DEBUG: [term] Attempting to close tracked UI form triggered by {func_ea_str}") | |
try: | |
form_instance.Close(0) | |
print(f" aiDAPal DEBUG: [term] Close() called for form {func_ea_str}.") | |
except Exception as e: | |
print(f" aiDAPal Error: [term] Exception closing form for {func_ea_str}: {e}") | |
aiDAPalUI.open_forms.pop(func_ea, None) # Manual removal on error | |
aiDAPalUI.open_forms.clear() | |
print("aiDAPal DEBUG: [term] Cleared open_forms tracking dictionary.") | |
# Clear concurrency state (unchanged) | |
print("aiDAPal DEBUG: [term] Clearing concurrency state...") | |
with g_analysis_lock: | |
g_analysis_in_progress.clear() | |
print("aiDAPal DEBUG: [term] Concurrency state cleared.") | |
print(f"aiDAPal DEBUG: {self.wanted_name} plugin termination complete.") | |
print("-" * 60) | |
def _unregister_actions(self): | |
"""Helper to unregister all actions this plugin registered.""" | |
print(f"aiDAPal DEBUG: [_unregister_actions] Unregistering {len(aidapal_t.registered_actions)} actions...") | |
if hasattr(aidapal_t, 'registered_actions'): | |
actions_to_unregister = list(aidapal_t.registered_actions) | |
for action_name in actions_to_unregister: | |
try: | |
# print(f" aiDAPal DEBUG: [_unregister_actions] Unregistering action: '{action_name}'...") # Noisy | |
if ida_kernwin.unregister_action(action_name): | |
# print(f" aiDAPal DEBUG: [_unregister_actions] Successfully unregistered '{action_name}'.") # Noisy | |
pass | |
else: | |
print(f" aiDAPal Warning: [_unregister_actions] Failed to unregister action '{action_name}' (API returned False).") | |
except Exception as e: | |
print(f" aiDAPal Error: [_unregister_actions] Exception unregistering action '{action_name}': {e}") | |
aidapal_t.registered_actions = [] | |
print("aiDAPal DEBUG: [_unregister_actions] Cleared registered actions list.") | |
else: | |
print("aiDAPal DEBUG: [_unregister_actions] No 'registered_actions' attribute found.") | |
# --- Plugin Entry Point --- | |
def PLUGIN_ENTRY(): | |
"""Required entry point for IDA Pro plugins.""" | |
print("aiDAPal DEBUG: PLUGIN_ENTRY() called. Returning aidapal_t instance.") | |
return aidapal_t() | |
# --- End of Script --- | |
print("aiDAPal DEBUG: Script loaded.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment