Created
November 8, 2024 17:20
-
-
Save nullenc0de/8982abd3bb7b479a7ff72f59d5e49467 to your computer and use it in GitHub Desktop.
netexec smb TARGET -u ADMIN -p PASS -M dll_hijack_hunter -o CHECK_PERMISSIONS=True EXPORT_RESULTS=True OUTPUT_FILE=results.json
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import List, Dict, Optional | |
import os | |
import json | |
from datetime import datetime | |
import threading | |
from queue import Queue | |
from nxc.helpers.logger import highlight | |
import re | |
class ServiceInfo: | |
def __init__(self, name: str, display_name: str, binary_path: str, | |
start_type: str, account: str, description: str = None): | |
self.name = name | |
self.display_name = display_name | |
self.binary_path = binary_path | |
self.start_type = start_type | |
self.account = account | |
self.description = description | |
self.vulnerabilities = [] | |
self.missing_dlls = [] | |
self.writable_paths = [] | |
class NXCModule: | |
name = "dll_hijack_hunter" | |
description = "Hunt for service DLL hijacking opportunities and vulnerable service configurations" | |
supported_protocols = ["smb", "wmi"] | |
opsec_safe = True | |
multiple_hosts = True | |
def options(self, context, module_options): | |
""" | |
CHECK_PERMISSIONS Check write permissions (True/False) | |
EXPORT_RESULTS Export results to JSON file (True/False) | |
OUTPUT_FILE Path to output JSON file | |
MAX_THREADS Maximum number of threads for permission checks (default: 4) | |
ANALYZE_DLLS Analyze DLL dependencies (True/False) | |
SKIP_MICROSOFT Skip Microsoft services (True/False) | |
""" | |
self.check_perms = bool(module_options.get('CHECK_PERMISSIONS', True)) | |
self.export_results = bool(module_options.get('EXPORT_RESULTS', False)) | |
self.output_file = module_options.get('OUTPUT_FILE', 'dll_hijack_results.json') | |
self.max_threads = int(module_options.get('MAX_THREADS', 4)) | |
self.analyze_dlls = bool(module_options.get('ANALYZE_DLLS', True)) | |
self.skip_microsoft = bool(module_options.get('SKIP_MICROSOFT', False)) | |
# Known DLL search order directories | |
self.dll_search_paths = [ | |
"\\Windows\\System32", | |
"\\Windows\\SysWOW64", | |
"\\Windows", | |
"\\Windows\\System32\\wbem", | |
"\\Windows\\System32\\WindowsPowerShell\\v1.0" | |
] | |
# Initialize results | |
self.results = {} | |
self.permission_queue = Queue() | |
self.results_lock = threading.Lock() | |
def is_path_writable(self, connection, path: str) -> bool: | |
"""Check if a path is writable by the current user""" | |
try: | |
test_file = f"{path}\\nxc_test_{threading.get_ident()}.tmp" | |
connection.conn.putFile("C$", test_file.replace("C:\\", ""), b"test") | |
connection.conn.deleteFile("C$", test_file.replace("C:\\", "")) | |
return True | |
except: | |
return False | |
def check_path_permissions_worker(self, connection, context): | |
"""Worker thread for checking path permissions""" | |
while True: | |
try: | |
service_info, path = self.permission_queue.get_nowait() | |
if self.is_path_writable(connection, path): | |
with self.results_lock: | |
service_info.writable_paths.append(path) | |
service_info.vulnerabilities.append( | |
f"Writable path found: {path}" | |
) | |
except Queue.Empty: | |
break | |
except Exception as e: | |
context.log.debug(f"Error checking permissions: {str(e)}") | |
finally: | |
self.permission_queue.task_done() | |
def extract_dlls_from_binary(self, connection, path: str) -> List[str]: | |
"""Extract DLL dependencies from a binary file""" | |
dlls = [] | |
try: | |
# Read first 2MB of the file for PE analysis | |
fh = connection.conn.openFile( | |
"C$", | |
path.replace("C:\\", "") | |
) | |
data = fh.read(2*1024*1024) | |
fh.close() | |
# Simple regex pattern for DLL names in PE imports | |
dll_pattern = rb'[a-zA-Z0-9_-]+\.dll' | |
matches = re.finditer(dll_pattern, data, re.IGNORECASE) | |
for match in matches: | |
dll_name = match.group().decode('ascii') | |
if dll_name not in dlls: | |
dlls.append(dll_name) | |
except Exception: | |
pass | |
return dlls | |
def analyze_service(self, connection, service_info: ServiceInfo) -> None: | |
"""Analyze a service for potential DLL hijacking vulnerabilities""" | |
# Check if binary path exists | |
try: | |
binary_path = service_info.binary_path.strip('"').split()[0] | |
binary_dir = os.path.dirname(binary_path) | |
# Check if binary directory is writable | |
if self.check_perms: | |
self.permission_queue.put((service_info, binary_dir)) | |
# Analyze DLL dependencies | |
if self.analyze_dlls: | |
missing_dlls = [] | |
dlls = self.extract_dlls_from_binary(connection, binary_path) | |
for dll in dlls: | |
dll_found = False | |
# Check standard search paths | |
for search_path in self.dll_search_paths: | |
try: | |
full_path = f"{search_path}\\{dll}" | |
connection.conn.getFile("C$", full_path.replace("C:\\", "")) | |
dll_found = True | |
break | |
except: | |
continue | |
if not dll_found: | |
missing_dlls.append(dll) | |
if self.check_perms: | |
# Check if we can write to any location in the DLL search path | |
self.permission_queue.put(( | |
service_info, | |
binary_dir | |
)) | |
service_info.missing_dlls = missing_dlls | |
if missing_dlls: | |
service_info.vulnerabilities.append( | |
f"Missing DLLs that could be hijacked: {', '.join(missing_dlls)}" | |
) | |
# Check for unquoted service paths | |
if ' ' in binary_path and not binary_path.startswith('"'): | |
service_info.vulnerabilities.append( | |
"Unquoted service path could allow DLL hijacking" | |
) | |
# Check service account | |
if service_info.account.lower() in [ | |
'localsystem', 'networkservice', 'localservice' | |
]: | |
service_info.vulnerabilities.append( | |
f"Service runs as {service_info.account}" | |
) | |
except Exception: | |
service_info.vulnerabilities.append("Unable to analyze service binary") | |
def get_services_wmi(self, connection) -> List[ServiceInfo]: | |
"""Get services information using WMI""" | |
services = [] | |
try: | |
wmi = connection.getWmiConnection() | |
query = "SELECT * FROM Win32_Service" | |
if self.skip_microsoft: | |
query += " WHERE Manufacturer != 'Microsoft Corporation'" | |
for svc in wmi.ExecQuery(query): | |
service = ServiceInfo( | |
name=svc.Name, | |
display_name=svc.DisplayName, | |
binary_path=svc.PathName, | |
start_type=svc.StartMode, | |
account=svc.StartName, | |
description=svc.Description | |
) | |
services.append(service) | |
except Exception: | |
pass | |
return services | |
def get_services_smb(self, connection) -> List[ServiceInfo]: | |
"""Get services information using registry access via SMB""" | |
services = [] | |
try: | |
# Read services from registry | |
key = "SYSTEM\\CurrentControlSet\\Services" | |
for svc in connection.conn.enumKey(key): | |
try: | |
service_key = f"{key}\\{svc}" | |
values = connection.conn.enumValues(service_key) | |
# Extract service information | |
image_path = next((v['value'] for v in values | |
if v['name'] == 'ImagePath'), None) | |
obj_name = next((v['value'] for v in values | |
if v['name'] == 'ObjectName'), None) | |
start_type = next((v['value'] for v in values | |
if v['name'] == 'Start'), None) | |
display_name = next((v['value'] for v in values | |
if v['name'] == 'DisplayName'), svc) | |
if image_path: | |
service = ServiceInfo( | |
name=svc, | |
display_name=display_name, | |
binary_path=image_path, | |
start_type=str(start_type), | |
account=obj_name or 'LocalSystem' | |
) | |
services.append(service) | |
except Exception: | |
continue | |
except Exception: | |
pass | |
return services | |
def export_results(self, hostname: str): | |
"""Export results to JSON file""" | |
if self.export_results: | |
try: | |
output = { | |
'scan_time': datetime.now().isoformat(), | |
'hostname': hostname, | |
'services': [] | |
} | |
for service in self.results.get(hostname, []): | |
output['services'].append({ | |
'name': service.name, | |
'display_name': service.display_name, | |
'binary_path': service.binary_path, | |
'start_type': service.start_type, | |
'account': service.account, | |
'description': service.description, | |
'vulnerabilities': service.vulnerabilities, | |
'missing_dlls': service.missing_dlls, | |
'writable_paths': service.writable_paths | |
}) | |
with open(self.output_file, 'w') as f: | |
json.dump(output, f, indent=2) | |
except Exception: | |
pass | |
def on_admin_login(self, context, connection): | |
hostname = f"{connection.hostname}.{connection.domain}" | |
context.log.display(f"Starting DLL hijacking analysis on {hostname}") | |
# Get services based on protocol | |
services = [] | |
if connection.protocol == "wmi": | |
services = self.get_services_wmi(connection) | |
else: | |
services = self.get_services_smb(connection) | |
if not services: | |
context.log.fail("No services found") | |
return | |
context.log.success(f"Found {len(services)} services to analyze") | |
# Start permission check threads if enabled | |
if self.check_perms: | |
threads = [] | |
for _ in range(self.max_threads): | |
t = threading.Thread( | |
target=self.check_path_permissions_worker, | |
args=(connection, context) | |
) | |
t.daemon = True | |
t.start() | |
threads.append(t) | |
# Analyze each service | |
for service in services: | |
self.analyze_service(connection, service) | |
# Wait for permission checks to complete | |
if self.check_perms: | |
self.permission_queue.join() | |
# Store results | |
self.results[hostname] = [] | |
for service in services: | |
if service.vulnerabilities: | |
self.results[hostname].append(service) | |
context.log.success( | |
f"Found vulnerable service: {highlight(service.name)}" | |
) | |
context.log.highlight("Binary Path: " + service.binary_path) | |
context.log.highlight("Account: " + service.account) | |
context.log.highlight("Vulnerabilities Found:") | |
for vuln in service.vulnerabilities: | |
context.log.highlight(f" - {vuln}") | |
if service.writable_paths: | |
context.log.highlight("Writable Paths:") | |
for path in service.writable_paths: | |
context.log.highlight(f" - {path}") | |
# Export results if configured | |
if self.export_results: | |
self.export_results(hostname) | |
context.log.success(f"Results exported to {self.output_file}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment