Created
November 8, 2024 17:18
-
-
Save nullenc0de/8ffec84ee2a8822c6865f0048db32bad to your computer and use it in GitHub Desktop.
netexec smb TARGET -u ADMIN -p PASS -M task_explorer -o EXPORT_XML=True OUTPUT_DIR=./tasks SCAN_CREDS=True
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime | |
import xml.etree.ElementTree as ET | |
from typing import List, Dict, Optional | |
import os | |
import re | |
from impacket.dcerpc.v5.dcom.wmi import WBEMSTATUS | |
from nxc.helpers.logger import highlight | |
class TaskVulnerability: | |
def __init__(self, name: str, path: str, command: str, author: str, | |
creation_date: str, vuln_type: str, description: str): | |
self.name = name | |
self.path = path | |
self.command = command | |
self.author = author | |
self.creation_date = creation_date | |
self.vuln_type = vuln_type | |
self.description = description | |
class NXCModule: | |
name = "task_explorer" | |
description = "Analyze scheduled tasks for vulnerabilities and privilege escalation opportunities" | |
supported_protocols = ["smb", "wmi"] | |
opsec_safe = True | |
multiple_hosts = True | |
def options(self, context, module_options): | |
""" | |
EXPORT_XML Export full task XML to file (True/False) | |
OUTPUT_DIR Directory to save results (default: current) | |
IGNORE_SYSTEM Ignore SYSTEM tasks (True/False) | |
CHECK_PARENT Check parent directory permissions (True/False) | |
SCAN_CREDS Look for credentials in task actions (True/False) | |
""" | |
self.export_xml = bool(module_options.get('EXPORT_XML', False)) | |
self.output_dir = module_options.get('OUTPUT_DIR', '.') | |
self.ignore_system = bool(module_options.get('IGNORE_SYSTEM', False)) | |
self.check_parent = bool(module_options.get('CHECK_PARENT', True)) | |
self.scan_creds = bool(module_options.get('SCAN_CREDS', True)) | |
if self.export_xml: | |
os.makedirs(self.output_dir, exist_ok=True) | |
def check_writable_path(self, connection, path: str) -> bool: | |
"""Check if path is writable by current user""" | |
try: | |
test_file = f"{path}\\nxc_test.txt" | |
connection.conn.putFile("C$", test_file, b"test") | |
connection.conn.deleteFile("C$", test_file) | |
return True | |
except: | |
return False | |
def analyze_task_security(self, task_xml: str) -> List[str]: | |
"""Analyze task XML for security issues""" | |
vulnerabilities = [] | |
try: | |
root = ET.fromstring(task_xml) | |
# Check for unquoted paths | |
actions = root.findall(".//Exec/Command") | |
for action in actions: | |
if action.text and ' ' in action.text and not ( | |
action.text.startswith('"') or | |
action.text.startswith("'") | |
): | |
vulnerabilities.append( | |
f"Unquoted command path: {action.text}" | |
) | |
# Check for weak file permissions | |
for action in actions: | |
if action.text: | |
path = action.text.strip('"\'') | |
if any(weak_perm in path for weak_perm in [ | |
"Everyone", "Authenticated Users", "Users" | |
]): | |
vulnerabilities.append( | |
f"Weak file permissions in path: {path}" | |
) | |
# Check for credential exposure | |
if self.scan_creds: | |
task_str = ET.tostring(root, encoding='unicode') | |
cred_patterns = [ | |
r'(?i)password["\s]*=[\s"]*([^"]+)', | |
r'(?i)user["\s]*=[\s"]*([^"]+)', | |
r'(?:-p|-pass|-password)\s+([^\s]+)', | |
r'(?:-u|-user|-username)\s+([^\s]+)' | |
] | |
for pattern in cred_patterns: | |
matches = re.finditer(pattern, task_str) | |
for match in matches: | |
vulnerabilities.append( | |
f"Potential credential exposure: {match.group()}" | |
) | |
# Check for risky task settings | |
principals = root.findall(".//Principal") | |
for principal in principals: | |
if principal.get('RunLevel') == 'HighestAvailable': | |
vulnerabilities.append( | |
"Task runs with highest available privileges" | |
) | |
except ET.ParseError: | |
vulnerabilities.append("Failed to parse task XML") | |
return vulnerabilities | |
def get_task_details(self, connection, task_path: str) -> Optional[Dict]: | |
"""Get detailed task information using WMI""" | |
try: | |
wmi = connection.getWmiConnection() | |
task_query = f"SELECT * FROM Win32_ScheduledTask WHERE TaskName = '{task_path}'" | |
tasks = wmi.ExecQuery(task_query) | |
if tasks: | |
task = tasks[0] | |
return { | |
"Name": task.TaskName, | |
"Path": task.TaskPath, | |
"Enabled": task.Enabled, | |
"LastRunTime": task.LastRunTime, | |
"NextRunTime": task.NextRunTime, | |
"Status": task.Status, | |
"Author": task.Author, | |
"CreationDate": task.CreationDate | |
} | |
except Exception: | |
pass | |
return None | |
def on_admin_login(self, context, connection): | |
context.log.display("Starting scheduled task analysis...") | |
if connection.protocol == "smb": | |
try: | |
# Get tasks directory listing | |
tasks_path = "\\Windows\\System32\\Tasks" | |
file_list = connection.conn.listPath("C$", tasks_path) | |
for file_info in file_list: | |
if file_info.get_longname() in ['.', '..']: | |
continue | |
task_path = f"{tasks_path}\\{file_info.get_longname()}" | |
# Skip system tasks if configured | |
if self.ignore_system and task_path.startswith("\\Microsoft\\"): | |
continue | |
try: | |
# Read task XML | |
fh = connection.conn.openFile("C$", task_path) | |
task_xml = fh.read().decode('utf-8') | |
fh.close() | |
# Get task details via WMI | |
task_details = self.get_task_details(connection, task_path) | |
# Analyze security | |
vulnerabilities = self.analyze_task_security(task_xml) | |
# Check parent directory permissions if enabled | |
if self.check_parent: | |
parent_dir = os.path.dirname(task_path) | |
if self.check_writable_path(connection, parent_dir): | |
vulnerabilities.append( | |
f"Parent directory is writable: {parent_dir}" | |
) | |
if vulnerabilities: | |
context.log.success( | |
f"Found vulnerable task: {highlight(file_info.get_longname())}" | |
) | |
if task_details: | |
context.log.highlight("Task Details:") | |
for k, v in task_details.items(): | |
context.log.highlight(f" {k}: {v}") | |
context.log.highlight("Vulnerabilities Found:") | |
for vuln in vulnerabilities: | |
context.log.highlight(f" - {vuln}") | |
# Export XML if configured | |
if self.export_xml: | |
output_file = os.path.join( | |
self.output_dir, | |
f"task_{file_info.get_longname()}.xml" | |
) | |
with open(output_file, 'w') as f: | |
f.write(task_xml) | |
context.log.success( | |
f"Exported task XML to: {output_file}" | |
) | |
except Exception as e: | |
context.log.debug( | |
f"Error analyzing task {task_path}: {str(e)}" | |
) | |
except Exception as e: | |
context.log.fail(f"Failed to enumerate tasks: {str(e)}") | |
elif connection.protocol == "wmi": | |
try: | |
wmi = connection.getWmiConnection() | |
tasks = wmi.ExecQuery("SELECT * FROM Win32_ScheduledTask") | |
for task in tasks: | |
if self.ignore_system and task.TaskPath.startswith("\\Microsoft\\"): | |
continue | |
# WMI doesn't give us the XML directly, so we focus on other checks | |
vulnerabilities = [] | |
# Check for concerning patterns in the command | |
if hasattr(task, 'TaskCommand'): | |
if ' ' in task.TaskCommand and not ( | |
task.TaskCommand.startswith('"') or | |
task.TaskCommand.startswith("'") | |
): | |
vulnerabilities.append( | |
f"Unquoted command path: {task.TaskCommand}" | |
) | |
if self.scan_creds: | |
cred_patterns = [ | |
r'(?i)password["\s]*=[\s"]*([^"]+)', | |
r'(?:-p|-pass|-password)\s+([^\s]+)' | |
] | |
for pattern in cred_patterns: | |
if re.search(pattern, task.TaskCommand): | |
vulnerabilities.append( | |
"Potential credential exposure in command" | |
) | |
if vulnerabilities: | |
context.log.success( | |
f"Found vulnerable task: {highlight(task.TaskName)}" | |
) | |
context.log.highlight("Task Details:") | |
for prop in [ | |
'TaskName', 'TaskPath', 'Enabled', 'LastRunTime', | |
'NextRunTime', 'Status', 'Author', 'CreationDate' | |
]: | |
if hasattr(task, prop): | |
context.log.highlight( | |
f" {prop}: {getattr(task, prop)}" | |
) | |
context.log.highlight("Vulnerabilities Found:") | |
for vuln in vulnerabilities: | |
context.log.highlight(f" - {vuln}") | |
except Exception as e: | |
context.log.fail(f"Failed to enumerate tasks via WMI: {str(e)}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment