Skip to content

Instantly share code, notes, and snippets.

@nullenc0de
Created November 8, 2024 17:20
Show Gist options
  • Save nullenc0de/8982abd3bb7b479a7ff72f59d5e49467 to your computer and use it in GitHub Desktop.
Save nullenc0de/8982abd3bb7b479a7ff72f59d5e49467 to your computer and use it in GitHub Desktop.
netexec smb TARGET -u ADMIN -p PASS -M dll_hijack_hunter -o CHECK_PERMISSIONS=True EXPORT_RESULTS=True OUTPUT_FILE=results.json
from typing import List, Dict, Optional
import os
import json
from datetime import datetime
import threading
from queue import Queue
from nxc.helpers.logger import highlight
import re
class ServiceInfo:
def __init__(self, name: str, display_name: str, binary_path: str,
start_type: str, account: str, description: str = None):
self.name = name
self.display_name = display_name
self.binary_path = binary_path
self.start_type = start_type
self.account = account
self.description = description
self.vulnerabilities = []
self.missing_dlls = []
self.writable_paths = []
class NXCModule:
name = "dll_hijack_hunter"
description = "Hunt for service DLL hijacking opportunities and vulnerable service configurations"
supported_protocols = ["smb", "wmi"]
opsec_safe = True
multiple_hosts = True
def options(self, context, module_options):
"""
CHECK_PERMISSIONS Check write permissions (True/False)
EXPORT_RESULTS Export results to JSON file (True/False)
OUTPUT_FILE Path to output JSON file
MAX_THREADS Maximum number of threads for permission checks (default: 4)
ANALYZE_DLLS Analyze DLL dependencies (True/False)
SKIP_MICROSOFT Skip Microsoft services (True/False)
"""
self.check_perms = bool(module_options.get('CHECK_PERMISSIONS', True))
self.export_results = bool(module_options.get('EXPORT_RESULTS', False))
self.output_file = module_options.get('OUTPUT_FILE', 'dll_hijack_results.json')
self.max_threads = int(module_options.get('MAX_THREADS', 4))
self.analyze_dlls = bool(module_options.get('ANALYZE_DLLS', True))
self.skip_microsoft = bool(module_options.get('SKIP_MICROSOFT', False))
# Known DLL search order directories
self.dll_search_paths = [
"\\Windows\\System32",
"\\Windows\\SysWOW64",
"\\Windows",
"\\Windows\\System32\\wbem",
"\\Windows\\System32\\WindowsPowerShell\\v1.0"
]
# Initialize results
self.results = {}
self.permission_queue = Queue()
self.results_lock = threading.Lock()
def is_path_writable(self, connection, path: str) -> bool:
"""Check if a path is writable by the current user"""
try:
test_file = f"{path}\\nxc_test_{threading.get_ident()}.tmp"
connection.conn.putFile("C$", test_file.replace("C:\\", ""), b"test")
connection.conn.deleteFile("C$", test_file.replace("C:\\", ""))
return True
except:
return False
def check_path_permissions_worker(self, connection, context):
"""Worker thread for checking path permissions"""
while True:
try:
service_info, path = self.permission_queue.get_nowait()
if self.is_path_writable(connection, path):
with self.results_lock:
service_info.writable_paths.append(path)
service_info.vulnerabilities.append(
f"Writable path found: {path}"
)
except Queue.Empty:
break
except Exception as e:
context.log.debug(f"Error checking permissions: {str(e)}")
finally:
self.permission_queue.task_done()
def extract_dlls_from_binary(self, connection, path: str) -> List[str]:
"""Extract DLL dependencies from a binary file"""
dlls = []
try:
# Read first 2MB of the file for PE analysis
fh = connection.conn.openFile(
"C$",
path.replace("C:\\", "")
)
data = fh.read(2*1024*1024)
fh.close()
# Simple regex pattern for DLL names in PE imports
dll_pattern = rb'[a-zA-Z0-9_-]+\.dll'
matches = re.finditer(dll_pattern, data, re.IGNORECASE)
for match in matches:
dll_name = match.group().decode('ascii')
if dll_name not in dlls:
dlls.append(dll_name)
except Exception:
pass
return dlls
def analyze_service(self, connection, service_info: ServiceInfo) -> None:
"""Analyze a service for potential DLL hijacking vulnerabilities"""
# Check if binary path exists
try:
binary_path = service_info.binary_path.strip('"').split()[0]
binary_dir = os.path.dirname(binary_path)
# Check if binary directory is writable
if self.check_perms:
self.permission_queue.put((service_info, binary_dir))
# Analyze DLL dependencies
if self.analyze_dlls:
missing_dlls = []
dlls = self.extract_dlls_from_binary(connection, binary_path)
for dll in dlls:
dll_found = False
# Check standard search paths
for search_path in self.dll_search_paths:
try:
full_path = f"{search_path}\\{dll}"
connection.conn.getFile("C$", full_path.replace("C:\\", ""))
dll_found = True
break
except:
continue
if not dll_found:
missing_dlls.append(dll)
if self.check_perms:
# Check if we can write to any location in the DLL search path
self.permission_queue.put((
service_info,
binary_dir
))
service_info.missing_dlls = missing_dlls
if missing_dlls:
service_info.vulnerabilities.append(
f"Missing DLLs that could be hijacked: {', '.join(missing_dlls)}"
)
# Check for unquoted service paths
if ' ' in binary_path and not binary_path.startswith('"'):
service_info.vulnerabilities.append(
"Unquoted service path could allow DLL hijacking"
)
# Check service account
if service_info.account.lower() in [
'localsystem', 'networkservice', 'localservice'
]:
service_info.vulnerabilities.append(
f"Service runs as {service_info.account}"
)
except Exception:
service_info.vulnerabilities.append("Unable to analyze service binary")
def get_services_wmi(self, connection) -> List[ServiceInfo]:
"""Get services information using WMI"""
services = []
try:
wmi = connection.getWmiConnection()
query = "SELECT * FROM Win32_Service"
if self.skip_microsoft:
query += " WHERE Manufacturer != 'Microsoft Corporation'"
for svc in wmi.ExecQuery(query):
service = ServiceInfo(
name=svc.Name,
display_name=svc.DisplayName,
binary_path=svc.PathName,
start_type=svc.StartMode,
account=svc.StartName,
description=svc.Description
)
services.append(service)
except Exception:
pass
return services
def get_services_smb(self, connection) -> List[ServiceInfo]:
"""Get services information using registry access via SMB"""
services = []
try:
# Read services from registry
key = "SYSTEM\\CurrentControlSet\\Services"
for svc in connection.conn.enumKey(key):
try:
service_key = f"{key}\\{svc}"
values = connection.conn.enumValues(service_key)
# Extract service information
image_path = next((v['value'] for v in values
if v['name'] == 'ImagePath'), None)
obj_name = next((v['value'] for v in values
if v['name'] == 'ObjectName'), None)
start_type = next((v['value'] for v in values
if v['name'] == 'Start'), None)
display_name = next((v['value'] for v in values
if v['name'] == 'DisplayName'), svc)
if image_path:
service = ServiceInfo(
name=svc,
display_name=display_name,
binary_path=image_path,
start_type=str(start_type),
account=obj_name or 'LocalSystem'
)
services.append(service)
except Exception:
continue
except Exception:
pass
return services
def export_results(self, hostname: str):
"""Export results to JSON file"""
if self.export_results:
try:
output = {
'scan_time': datetime.now().isoformat(),
'hostname': hostname,
'services': []
}
for service in self.results.get(hostname, []):
output['services'].append({
'name': service.name,
'display_name': service.display_name,
'binary_path': service.binary_path,
'start_type': service.start_type,
'account': service.account,
'description': service.description,
'vulnerabilities': service.vulnerabilities,
'missing_dlls': service.missing_dlls,
'writable_paths': service.writable_paths
})
with open(self.output_file, 'w') as f:
json.dump(output, f, indent=2)
except Exception:
pass
def on_admin_login(self, context, connection):
hostname = f"{connection.hostname}.{connection.domain}"
context.log.display(f"Starting DLL hijacking analysis on {hostname}")
# Get services based on protocol
services = []
if connection.protocol == "wmi":
services = self.get_services_wmi(connection)
else:
services = self.get_services_smb(connection)
if not services:
context.log.fail("No services found")
return
context.log.success(f"Found {len(services)} services to analyze")
# Start permission check threads if enabled
if self.check_perms:
threads = []
for _ in range(self.max_threads):
t = threading.Thread(
target=self.check_path_permissions_worker,
args=(connection, context)
)
t.daemon = True
t.start()
threads.append(t)
# Analyze each service
for service in services:
self.analyze_service(connection, service)
# Wait for permission checks to complete
if self.check_perms:
self.permission_queue.join()
# Store results
self.results[hostname] = []
for service in services:
if service.vulnerabilities:
self.results[hostname].append(service)
context.log.success(
f"Found vulnerable service: {highlight(service.name)}"
)
context.log.highlight("Binary Path: " + service.binary_path)
context.log.highlight("Account: " + service.account)
context.log.highlight("Vulnerabilities Found:")
for vuln in service.vulnerabilities:
context.log.highlight(f" - {vuln}")
if service.writable_paths:
context.log.highlight("Writable Paths:")
for path in service.writable_paths:
context.log.highlight(f" - {path}")
# Export results if configured
if self.export_results:
self.export_results(hostname)
context.log.success(f"Results exported to {self.output_file}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment