Created
November 8, 2024 17:16
-
-
Save nullenc0de/77ba07d364e609dd8afaed3c69452ee3 to your computer and use it in GitHub Desktop.
netexec smb TARGET -u USER -p PASS -M sensitive_search -o MAX_DEPTH=5 EXTENSIONS=.txt,.log,.config OUTPUT_FILE=findings.json
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime | |
from typing import List, Dict | |
import re | |
from nxc.helpers.logger import highlight | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
import json | |
class NXCModule: | |
name = "sensitive_search" | |
description = "Search for files containing sensitive data patterns in shares with custom regex support" | |
supported_protocols = ["smb"] | |
opsec_safe = True # Read-only operations | |
multiple_hosts = True | |
def options(self, context, module_options): | |
""" | |
PATTERNS File containing regex patterns to search for (default built-in patterns) | |
MAX_SIZE Maximum file size to scan in bytes (default 1MB) | |
MAX_DEPTH Maximum directory depth to search (default 3) | |
SHARES Comma-separated list of shares to search (default all readable) | |
THREADS Number of concurrent file searches (default 2) | |
EXTENSIONS File extensions to search (default: .txt,.cfg,.config,.xml,.json,.ini) | |
OUTPUT_FILE Save results to JSON file (optional) | |
""" | |
self.max_size = int(module_options.get('MAX_SIZE', 1024 * 1024)) # 1MB default | |
self.max_depth = int(module_options.get('MAX_DEPTH', 3)) | |
self.thread_count = int(module_options.get('THREADS', 2)) | |
self.output_file = module_options.get('OUTPUT_FILE', None) | |
# Parse shares to search | |
shares_opt = module_options.get('SHARES', '') | |
self.shares = [s.strip() for s in shares_opt.split(',')] if shares_opt else [] | |
# Parse file extensions | |
ext_opt = module_options.get('EXTENSIONS', '.txt,.cfg,.config,.xml,.json,.ini') | |
self.extensions = [e.strip() for e in ext_opt.split(',')] | |
# Load patterns | |
self.patterns = self.load_patterns(module_options.get('PATTERNS', None)) | |
self.results: Dict[str, List[Dict]] = {} | |
def load_patterns(self, pattern_file=None) -> Dict[str, str]: | |
"""Load search patterns from file or use defaults""" | |
default_patterns = { | |
'aws_key': r'(?i)aws[_\-\s]*key[_\-\s]*=\s*[\'"][\w]{20,}[\'"]', | |
'password': r'(?i)password[_\-\s]*=\s*[\'"][^\'"]{8,}[\'"]', | |
'api_key': r'(?i)api[_\-\s]*key[_\-\s]*=\s*[\'"][\w\-]{32,}[\'"]', | |
'private_key': r'-----BEGIN (?:RSA|OPENSSH|DSA|EC) PRIVATE KEY-----', | |
'connection_string': r'(?i)(?:server|database|uid|pwd)\s*=\s*[^;]+;', | |
'secret_key': r'(?i)secret[_\-\s]*key[_\-\s]*=\s*[\'"][\w\-]{32,}[\'"]', | |
'token': r'(?i)token[_\-\s]*=\s*[\'"][\w\-]{32,}[\'"]', | |
} | |
if pattern_file: | |
try: | |
with open(pattern_file, 'r') as f: | |
return {k.strip(): v.strip() for k, v in (line.split(':', 1) | |
for line in f if line.strip() and not line.startswith('#'))} | |
except Exception as e: | |
context.log.debug(f"Error loading patterns file: {e}. Using defaults.") | |
return default_patterns | |
def search_file(self, conn, share, path, patterns) -> List[Dict]: | |
"""Search a single file for sensitive patterns""" | |
findings = [] | |
try: | |
with conn.openFile(share, path) as file_obj: | |
content = file_obj.read(self.max_size).decode('utf-8', errors='ignore') | |
for pattern_name, pattern in patterns.items(): | |
matches = re.finditer(pattern, content) | |
for match in matches: | |
# Get context around the match | |
start = max(0, match.start() - 20) | |
end = min(len(content), match.end() + 20) | |
context = content[start:end].replace('\n', ' ') | |
findings.append({ | |
'pattern': pattern_name, | |
'path': path, | |
'context': context, | |
'line_number': content[:match.start()].count('\n') + 1 | |
}) | |
except Exception as e: | |
pass # Skip files we can't read | |
return findings | |
def search_directory(self, conn, share, path, depth=0): | |
"""Recursively search directory for sensitive data""" | |
if depth > self.max_depth: | |
return [] | |
findings = [] | |
try: | |
listing = conn.listPath(share, path) | |
files = [f for f in listing if f.get_longname() not in ['.', '..']] | |
with ThreadPoolExecutor(max_workers=self.thread_count) as executor: | |
futures = [] | |
for f in files: | |
full_path = f"{path}\\{f.get_longname()}" | |
if f.is_directory(): | |
# Recursive search | |
findings.extend(self.search_directory(conn, share, full_path, depth + 1)) | |
else: | |
# Check if file extension matches | |
if any(f.get_longname().lower().endswith(ext) for ext in self.extensions): | |
future = executor.submit(self.search_file, conn, share, full_path, self.patterns) | |
futures.append(future) | |
for future in as_completed(futures): | |
findings.extend(future.result()) | |
except Exception as e: | |
pass # Skip directories we can't access | |
return findings | |
def on_admin_login(self, context, connection): | |
"""Main module execution""" | |
context.log.display(f"Searching for sensitive data with {len(self.patterns)} patterns") | |
shares_to_search = self.shares | |
if not shares_to_search: | |
try: | |
shares = connection.shares() | |
shares_to_search = [share['name'] for share in shares if share['access'] == 'READ'] | |
except Exception as e: | |
context.log.fail(f"Failed to enumerate shares: {e}") | |
return | |
host_findings = [] | |
for share in shares_to_search: | |
try: | |
context.log.display(f"Searching share: {share}") | |
findings = self.search_directory(connection.conn, share, "\\") | |
if findings: | |
host_findings.extend(findings) | |
except Exception as e: | |
context.log.debug(f"Error searching share {share}: {e}") | |
# Process and display results | |
if host_findings: | |
hostname = f"{connection.hostname}.{connection.domain}" | |
self.results[hostname] = host_findings | |
context.log.success(f"Found {highlight(len(host_findings))} matches on {highlight(hostname)}") | |
for finding in host_findings: | |
context.log.highlight( | |
f"[{finding['pattern']}] {finding['path']} (line {finding['line_number']})\n" | |
f"Context: {finding['context']}\n" | |
) | |
# Save to output file if specified | |
if self.output_file: | |
try: | |
with open(self.output_file, 'w') as f: | |
json.dump(self.results, f, indent=2) | |
context.log.success(f"Results saved to {self.output_file}") | |
except Exception as e: | |
context.log.debug(f"Failed to save results: {e}") | |
else: | |
context.log.display("No sensitive data patterns found") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment