Skip to content

Instantly share code, notes, and snippets.

@joshfinley
Last active March 4, 2025 15:22
Show Gist options
  • Save joshfinley/370f7ba2d9f905a686807e843c922eed to your computer and use it in GitHub Desktop.
Save joshfinley/370f7ba2d9f905a686807e843c922eed to your computer and use it in GitHub Desktop.
import boto3
import re
import json
import sys
import argparse
import mimetypes
import os
import yaml
class SecretScanner:
"""
Advanced secret scanner with flexible rule-based detection
"""
def __init__(self, rules_file=None):
"""
Initialize the secret scanner with optional custom rules
:param rules_file: Path to YAML file with secret detection rules
"""
# Refined default rules with more precise secret detection
self.default_rules = [
# AWS Credentials
{
'id': 'aws-access-key',
'regex': r'\b(AKIA[A-Z0-9]{16}|ASIA[A-Z0-9]{16})\b',
'description': 'AWS Access Key ID',
'severity': 'high'
},
# GitHub Personal Access Tokens
{
'id': 'github-pat',
'regex': r'\bghp_[a-zA-Z0-9]{36}\b',
'description': 'GitHub Personal Access Token',
'severity': 'high'
},
# Stripe Secret Keys (specific format)
{
'id': 'stripe-secret-key',
'regex': r'\bsk_live_[0-9a-zA-Z]{24}\b',
'description': 'Stripe Secret Key',
'severity': 'high'
},
# Slack Tokens (specific pattern)
{
'id': 'slack-token',
'regex': r'\bxox[pboa]-[0-9]{12}-[0-9]{12}-[0-9]{12}-[a-z0-9]{32}\b',
'description': 'Slack Token',
'severity': 'high'
},
# MongoDB Connection Strings
{
'id': 'mongodb-connection',
'regex': r'mongodb(\\+srv)?://[^:@\s]+:[^@\s]+@[^/\s]+/[^\s]+',
'description': 'MongoDB Connection String',
'severity': 'high'
},
# PostgreSQL Connection Strings
{
'id': 'postgres-connection',
'regex': r'postgresql://[^:@\s]+:[^@\s]+@[^/\s]+/[^\s]+',
'description': 'PostgreSQL Connection String',
'severity': 'high'
},
# Generic API Key Patterns
{
'id': 'generic-api-key',
'regex': r'\b(?:api_?key\s*[=:]\s*)[\'"]?[a-zA-Z0-9_-]{32,}\b',
'description': 'Potential API Key',
'severity': 'medium'
}
]
# Load custom rules if provided
self.rules = self.default_rules.copy()
if rules_file and os.path.exists(rules_file):
try:
with open(rules_file, 'r') as f:
custom_rules = yaml.safe_load(f)
# Merge or replace default rules
if custom_rules:
self.rules.extend(custom_rules)
except Exception as e:
print(f"[WARNING] Could not load custom rules: {e}")
# Compile regex patterns
self.compiled_rules = []
for rule in self.rules:
try:
compiled_rule = {
'regex': re.compile(rule['regex']),
'id': rule.get('id', 'unknown'),
'description': rule.get('description', 'Potential secret'),
'severity': rule.get('severity', 'medium')
}
self.compiled_rules.append(compiled_rule)
except Exception as e:
print(f"[WARNING] Could not compile regex for rule {rule.get('id', 'unknown')}: {e}")
def scan_content(self, content):
"""
Scan content for potential secrets
:param content: Text content to scan
:return: List of detected secrets
"""
detected_secrets = []
for rule in self.compiled_rules:
matches = rule['regex'].findall(content)
if matches:
for match in matches:
# Ensure single string for context extraction
if isinstance(match, tuple):
match = match[0]
# Extract context
context = self._extract_match_context(content, match)
detected_secrets.append({
'match': match,
'rule_id': rule['id'],
'description': rule['description'],
'severity': rule['severity'],
'context': context
})
return detected_secrets
def _extract_match_context(self, content, match, context_chars=100):
"""
Extract context around the secret match
:param content: Full content
:param match: Matched secret
:param context_chars: Number of characters to show around the match
:return: Context dictionary
"""
# Find the match position
match_pos = content.find(match)
if match_pos == -1:
return {
'full_context': 'Match position not found',
'context_start': 0,
'context_end': 0
}
# Calculate context start and end
start = max(0, match_pos - context_chars)
end = min(len(content), match_pos + len(match) + context_chars)
return {
'full_context': content[start:end],
'context_start': start,
'context_end': end
}
def should_skip_file(file_key, skip_extensions=None, skip_mimetypes=None):
"""
Determine if a file should be skipped based on file extension or MIME type
:param file_key: S3 object key
:param skip_extensions: List of file extensions to skip
:param skip_mimetypes: List of MIME types to skip
:return: Boolean indicating whether to skip the file
"""
if not skip_extensions and not skip_mimetypes:
return False
# Get file extension
file_ext = os.path.splitext(file_key)[1].lower().lstrip('.')
# Guess MIME type
mime_type, _ = mimetypes.guess_type(file_key)
# Convert skip lists to lowercase
skip_extensions = [ext.lower().lstrip('.') for ext in (skip_extensions or [])]
skip_mimetypes = [mime.lower() for mime in (skip_mimetypes or [])]
# Skip if extension matches
if file_ext in skip_extensions:
return True
# Skip if MIME type matches
if mime_type and mime_type.lower() in skip_mimetypes:
return True
return False
def scan_s3_for_secrets(
profile_name=None,
specific_bucket=None,
skip_extensions=None,
skip_mimetypes=None,
rules_file=None
):
"""
Scan S3 buckets for potential secrets
:param profile_name: AWS profile to use
:param specific_bucket: Specific bucket to scan
:param skip_extensions: File extensions to skip
:param skip_mimetypes: MIME types to skip
:param rules_file: Custom rules file path
:return: Dictionary of findings
"""
# Initialize secret scanner
secret_scanner = SecretScanner(rules_file)
# Create a session with the specified profile
session = boto3.Session(profile_name=profile_name)
# Create S3 client
s3_client = session.client('s3')
# Results dictionary
findings = {
'buckets_scanned': 0,
'objects_with_secrets': [],
'skipped_objects': [],
'statistics': {
'total_objects_scanned': 0,
'total_secret_objects': 0,
'secrets_by_severity': {}
}
}
try:
# Determine buckets to scan
if specific_bucket:
# Remove 'arn:aws:s3:::' if provided in ARN
bucket_name = specific_bucket.split(':::')[-1]
buckets_to_scan = [{'Name': bucket_name}]
print(f"[STATUS] Scanning specific bucket: {bucket_name}")
else:
# List all buckets
response = s3_client.list_buckets()
buckets_to_scan = response['Buckets']
print(f"[STATUS] Total buckets found: {len(buckets_to_scan)}")
# Print configuration
print("[CONFIG] Secret Detection Rules:")
for rule in secret_scanner.rules:
print(f" - {rule['id']}: {rule.get('description', 'No description')}")
# Iterate through buckets
for bucket in buckets_to_scan:
bucket_name = bucket['Name']
findings['buckets_scanned'] += 1
print(f"[STATUS] Scanning bucket: {bucket_name}")
try:
# List objects in the bucket
paginator = s3_client.get_paginator('list_objects_v2')
object_count = 0
secret_object_count = 0
skipped_count = 0
for page in paginator.paginate(Bucket=bucket_name):
if 'Contents' not in page:
continue
# Check each object
for obj in page['Contents']:
object_count += 1
findings['statistics']['total_objects_scanned'] += 1
print(f"[PROGRESS] Checking object: {obj['Key']}", end='\r')
# Check if file should be skipped
if should_skip_file(obj['Key'], skip_extensions, skip_mimetypes):
skipped_count += 1
findings['skipped_objects'].append({
'bucket': bucket_name,
'key': obj['Key']
})
continue
try:
# Download object content
obj_response = s3_client.get_object(
Bucket=bucket_name,
Key=obj['Key']
)
# Read object content
content = obj_response['Body'].read().decode('utf-8', errors='ignore')
# Scan for secrets
secrets = secret_scanner.scan_content(content)
if secrets:
secret_object_count += 1
findings['statistics']['total_secret_objects'] += 1
# Aggregate secrets by severity
for secret in secrets:
severity = secret['severity']
findings['statistics']['secrets_by_severity'][severity] = \
findings['statistics']['secrets_by_severity'].get(severity, 0) + 1
findings['objects_with_secrets'].append({
'bucket': bucket_name,
'key': obj['Key'],
'secrets': secrets
})
# Print secrets to console
s3_url = f"s3://{bucket_name}/{obj['Key']}"
print(f"\n[SECRETS FOUND] In {s3_url}:")
for secret in secrets:
print(f" - Type: {secret['rule_id']} ({secret['severity']})")
print(f" Secret: {secret['match']}")
print(f" Context: ...{secret['context']['full_context']}...")
print(f" Full S3 URL: {s3_url}")
except Exception as obj_error:
# Skip objects that can't be read (e.g., binary files)
continue
print(f"\n[SUMMARY] Bucket {bucket_name}: {object_count} objects scanned, "
f"{secret_object_count} objects with secrets, "
f"{skipped_count} objects skipped")
except Exception as bucket_error:
print(f"[ERROR] Error scanning bucket {bucket_name}: {bucket_error}")
# Final summary
print("\n[FINAL SUMMARY]")
print(f"Total buckets scanned: {findings['buckets_scanned']}")
print(f"Total objects scanned: {findings['statistics']['total_objects_scanned']}")
print(f"Total objects with secrets: {findings['statistics']['total_secret_objects']}")
print("Secrets by Severity:")
for severity, count in findings['statistics']['secrets_by_severity'].items():
print(f" - {severity.upper()}: {count}")
return findings
except Exception as e:
print(f"[CRITICAL ERROR] During S3 scan: {e}")
return findings
def main():
"""
Main function to run the S3 secret scanner from command line
"""
# Set up argument parser
parser = argparse.ArgumentParser(description='Scan S3 buckets for potential secrets')
parser.add_argument('--profile',
default='default',
help='AWS profile to use (default: default)')
parser.add_argument('--bucket',
help='Specific S3 bucket name or ARN to scan')
parser.add_argument('--skip-ext',
nargs='+',
help='File extensions to skip (e.g., jpg png pdf)')
parser.add_argument('--skip-mime',
nargs='+',
help='MIME types to skip (e.g., image/jpeg application/pdf)')
parser.add_argument('--rules',
help='Path to custom rules YAML file')
# Parse arguments
args = parser.parse_args()
# Run the scan
results = scan_s3_for_secrets(
profile_name=args.profile,
specific_bucket=args.bucket,
skip_extensions=args.skip_ext,
skip_mimetypes=args.skip_mime,
rules_file=args.rules
)
# Print detailed results
print("\n[DETAILED RESULTS]")
print(json.dumps(results, indent=2))
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment