Skip to content

Instantly share code, notes, and snippets.

@nullenc0de
Created November 8, 2024 17:18
Show Gist options
  • Save nullenc0de/8ffec84ee2a8822c6865f0048db32bad to your computer and use it in GitHub Desktop.
Save nullenc0de/8ffec84ee2a8822c6865f0048db32bad to your computer and use it in GitHub Desktop.
netexec smb TARGET -u ADMIN -p PASS -M task_explorer -o EXPORT_XML=True OUTPUT_DIR=./tasks SCAN_CREDS=True
from datetime import datetime
import xml.etree.ElementTree as ET
from typing import List, Dict, Optional
import os
import re
from impacket.dcerpc.v5.dcom.wmi import WBEMSTATUS
from nxc.helpers.logger import highlight
class TaskVulnerability:
def __init__(self, name: str, path: str, command: str, author: str,
creation_date: str, vuln_type: str, description: str):
self.name = name
self.path = path
self.command = command
self.author = author
self.creation_date = creation_date
self.vuln_type = vuln_type
self.description = description
class NXCModule:
name = "task_explorer"
description = "Analyze scheduled tasks for vulnerabilities and privilege escalation opportunities"
supported_protocols = ["smb", "wmi"]
opsec_safe = True
multiple_hosts = True
def options(self, context, module_options):
"""
EXPORT_XML Export full task XML to file (True/False)
OUTPUT_DIR Directory to save results (default: current)
IGNORE_SYSTEM Ignore SYSTEM tasks (True/False)
CHECK_PARENT Check parent directory permissions (True/False)
SCAN_CREDS Look for credentials in task actions (True/False)
"""
self.export_xml = bool(module_options.get('EXPORT_XML', False))
self.output_dir = module_options.get('OUTPUT_DIR', '.')
self.ignore_system = bool(module_options.get('IGNORE_SYSTEM', False))
self.check_parent = bool(module_options.get('CHECK_PARENT', True))
self.scan_creds = bool(module_options.get('SCAN_CREDS', True))
if self.export_xml:
os.makedirs(self.output_dir, exist_ok=True)
def check_writable_path(self, connection, path: str) -> bool:
"""Check if path is writable by current user"""
try:
test_file = f"{path}\\nxc_test.txt"
connection.conn.putFile("C$", test_file, b"test")
connection.conn.deleteFile("C$", test_file)
return True
except:
return False
def analyze_task_security(self, task_xml: str) -> List[str]:
"""Analyze task XML for security issues"""
vulnerabilities = []
try:
root = ET.fromstring(task_xml)
# Check for unquoted paths
actions = root.findall(".//Exec/Command")
for action in actions:
if action.text and ' ' in action.text and not (
action.text.startswith('"') or
action.text.startswith("'")
):
vulnerabilities.append(
f"Unquoted command path: {action.text}"
)
# Check for weak file permissions
for action in actions:
if action.text:
path = action.text.strip('"\'')
if any(weak_perm in path for weak_perm in [
"Everyone", "Authenticated Users", "Users"
]):
vulnerabilities.append(
f"Weak file permissions in path: {path}"
)
# Check for credential exposure
if self.scan_creds:
task_str = ET.tostring(root, encoding='unicode')
cred_patterns = [
r'(?i)password["\s]*=[\s"]*([^"]+)',
r'(?i)user["\s]*=[\s"]*([^"]+)',
r'(?:-p|-pass|-password)\s+([^\s]+)',
r'(?:-u|-user|-username)\s+([^\s]+)'
]
for pattern in cred_patterns:
matches = re.finditer(pattern, task_str)
for match in matches:
vulnerabilities.append(
f"Potential credential exposure: {match.group()}"
)
# Check for risky task settings
principals = root.findall(".//Principal")
for principal in principals:
if principal.get('RunLevel') == 'HighestAvailable':
vulnerabilities.append(
"Task runs with highest available privileges"
)
except ET.ParseError:
vulnerabilities.append("Failed to parse task XML")
return vulnerabilities
def get_task_details(self, connection, task_path: str) -> Optional[Dict]:
"""Get detailed task information using WMI"""
try:
wmi = connection.getWmiConnection()
task_query = f"SELECT * FROM Win32_ScheduledTask WHERE TaskName = '{task_path}'"
tasks = wmi.ExecQuery(task_query)
if tasks:
task = tasks[0]
return {
"Name": task.TaskName,
"Path": task.TaskPath,
"Enabled": task.Enabled,
"LastRunTime": task.LastRunTime,
"NextRunTime": task.NextRunTime,
"Status": task.Status,
"Author": task.Author,
"CreationDate": task.CreationDate
}
except Exception:
pass
return None
def on_admin_login(self, context, connection):
context.log.display("Starting scheduled task analysis...")
if connection.protocol == "smb":
try:
# Get tasks directory listing
tasks_path = "\\Windows\\System32\\Tasks"
file_list = connection.conn.listPath("C$", tasks_path)
for file_info in file_list:
if file_info.get_longname() in ['.', '..']:
continue
task_path = f"{tasks_path}\\{file_info.get_longname()}"
# Skip system tasks if configured
if self.ignore_system and task_path.startswith("\\Microsoft\\"):
continue
try:
# Read task XML
fh = connection.conn.openFile("C$", task_path)
task_xml = fh.read().decode('utf-8')
fh.close()
# Get task details via WMI
task_details = self.get_task_details(connection, task_path)
# Analyze security
vulnerabilities = self.analyze_task_security(task_xml)
# Check parent directory permissions if enabled
if self.check_parent:
parent_dir = os.path.dirname(task_path)
if self.check_writable_path(connection, parent_dir):
vulnerabilities.append(
f"Parent directory is writable: {parent_dir}"
)
if vulnerabilities:
context.log.success(
f"Found vulnerable task: {highlight(file_info.get_longname())}"
)
if task_details:
context.log.highlight("Task Details:")
for k, v in task_details.items():
context.log.highlight(f" {k}: {v}")
context.log.highlight("Vulnerabilities Found:")
for vuln in vulnerabilities:
context.log.highlight(f" - {vuln}")
# Export XML if configured
if self.export_xml:
output_file = os.path.join(
self.output_dir,
f"task_{file_info.get_longname()}.xml"
)
with open(output_file, 'w') as f:
f.write(task_xml)
context.log.success(
f"Exported task XML to: {output_file}"
)
except Exception as e:
context.log.debug(
f"Error analyzing task {task_path}: {str(e)}"
)
except Exception as e:
context.log.fail(f"Failed to enumerate tasks: {str(e)}")
elif connection.protocol == "wmi":
try:
wmi = connection.getWmiConnection()
tasks = wmi.ExecQuery("SELECT * FROM Win32_ScheduledTask")
for task in tasks:
if self.ignore_system and task.TaskPath.startswith("\\Microsoft\\"):
continue
# WMI doesn't give us the XML directly, so we focus on other checks
vulnerabilities = []
# Check for concerning patterns in the command
if hasattr(task, 'TaskCommand'):
if ' ' in task.TaskCommand and not (
task.TaskCommand.startswith('"') or
task.TaskCommand.startswith("'")
):
vulnerabilities.append(
f"Unquoted command path: {task.TaskCommand}"
)
if self.scan_creds:
cred_patterns = [
r'(?i)password["\s]*=[\s"]*([^"]+)',
r'(?:-p|-pass|-password)\s+([^\s]+)'
]
for pattern in cred_patterns:
if re.search(pattern, task.TaskCommand):
vulnerabilities.append(
"Potential credential exposure in command"
)
if vulnerabilities:
context.log.success(
f"Found vulnerable task: {highlight(task.TaskName)}"
)
context.log.highlight("Task Details:")
for prop in [
'TaskName', 'TaskPath', 'Enabled', 'LastRunTime',
'NextRunTime', 'Status', 'Author', 'CreationDate'
]:
if hasattr(task, prop):
context.log.highlight(
f" {prop}: {getattr(task, prop)}"
)
context.log.highlight("Vulnerabilities Found:")
for vuln in vulnerabilities:
context.log.highlight(f" - {vuln}")
except Exception as e:
context.log.fail(f"Failed to enumerate tasks via WMI: {str(e)}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment