Created
June 21, 2025 08:49
-
-
Save random-robbie/11906b6ae9153c28e22e2bb80876f0f8 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Ruby Deserialization Security Scanner | |
===================================== | |
A safe security testing tool for identifying Ruby deserialization vulnerabilities. | |
DISCLAIMER: This tool is for authorized security testing only. | |
- Only use on systems you own or have explicit permission to test | |
- Do not use for malicious purposes | |
- Follow responsible disclosure practices | |
Author: Security Research Tool | |
License: Educational Use Only | |
""" | |
import requests | |
import base64 | |
import json | |
import argparse | |
import sys | |
import time | |
import re | |
from urllib.parse import urlparse, urljoin | |
from typing import List, Dict, Tuple, Optional | |
class RubyDeserializationScanner: | |
def __init__(self, target_url: str, verbose: bool = False): | |
self.target_url = target_url | |
self.verbose = verbose | |
self.session = requests.Session() | |
self.session.headers.update({ | |
'User-Agent': 'Ruby-Deserialization-Scanner/1.0 (Security Testing)' | |
}) | |
# Safe test payloads - these are benign and only test for vulnerability patterns | |
self.test_payloads = self._generate_safe_payloads() | |
def _generate_safe_payloads(self) -> Dict[str, Dict]: | |
"""Generate safe test payloads that won't cause harm but can detect vulnerabilities""" | |
# Ruby Marshal binary header signature | |
marshal_header = b'\x04\x08' | |
# Safe Ruby object serialization patterns for testing | |
safe_payloads = { | |
'marshal_string_test': { | |
'description': 'Safe Ruby Marshal string test', | |
'payload': marshal_header + b'I"\x0bhello\x06:\x06ET', # Just serializes "hello" | |
'content_type': 'application/octet-stream', | |
'encoding': 'binary' | |
}, | |
'marshal_array_test': { | |
'description': 'Safe Ruby Marshal array test', | |
'payload': marshal_header + b'[\x06I"\x09test\x06:\x06ET', # Just serializes ["test"] | |
'content_type': 'application/octet-stream', | |
'encoding': 'binary' | |
}, | |
'yaml_object_test': { | |
'description': 'Safe YAML Ruby object test', | |
'payload': '--- !ruby/object:TestClass\nname: safe_test\n', | |
'content_type': 'application/x-yaml', | |
'encoding': 'text' | |
}, | |
'base64_marshal_test': { | |
'description': 'Base64 encoded safe Marshal test', | |
'payload': base64.b64encode(marshal_header + b'I"\x0bsafe\x06:\x06ET').decode(), | |
'content_type': 'text/plain', | |
'encoding': 'base64' | |
} | |
} | |
return safe_payloads | |
def _log(self, message: str, level: str = "INFO"): | |
"""Safe logging function""" | |
if self.verbose or level in ["ERROR", "WARNING"]: | |
timestamp = time.strftime("%Y-%m-%d %H:%M:%S") | |
print(f"[{timestamp}] [{level}] {message}") | |
def check_target_accessibility(self) -> bool: | |
"""Check if target is accessible and responding""" | |
try: | |
response = self.session.get(self.target_url, timeout=10) | |
self._log(f"Target accessible - Status: {response.status_code}") | |
return True | |
except requests.exceptions.RequestException as e: | |
self._log(f"Target not accessible: {e}", "ERROR") | |
return False | |
def analyze_response_headers(self, response: requests.Response) -> List[str]: | |
"""Analyze response headers for Ruby serialization indicators""" | |
indicators = [] | |
# Patterns that might indicate Ruby serialization | |
suspicious_patterns = { | |
'marshal_binary': rb'\x04\x08', | |
'ruby_yaml': rb'!ruby/', | |
'marshal_class': rb'C:', | |
'marshal_object': rb'o:', | |
'ruby_struct': rb'S:' | |
} | |
# Check all headers for suspicious content | |
for header_name, header_value in response.headers.items(): | |
header_bytes = header_value.encode('utf-8', errors='ignore') | |
for pattern_name, pattern in suspicious_patterns.items(): | |
if pattern in header_bytes: | |
indicators.append(f"Header '{header_name}' contains {pattern_name} pattern") | |
# Check response body for patterns (first 1KB only for safety) | |
body_sample = response.content[:1024] | |
for pattern_name, pattern in suspicious_patterns.items(): | |
if pattern in body_sample: | |
indicators.append(f"Response body contains {pattern_name} pattern") | |
return indicators | |
def test_parameter_injection(self, endpoint: str) -> Dict: | |
"""Test various parameters for deserialization vulnerabilities""" | |
results = { | |
'endpoint': endpoint, | |
'vulnerable_parameters': [], | |
'response_indicators': [], | |
'error_patterns': [] | |
} | |
# Common parameter names that might accept serialized data | |
test_params = ['data', 'object', 'payload', 'session', 'user', 'config'] | |
for param_name in test_params: | |
for payload_name, payload_info in self.test_payloads.items(): | |
try: | |
# Test GET parameters | |
self._test_get_parameter(endpoint, param_name, payload_info, results) | |
# Test POST parameters | |
self._test_post_parameter(endpoint, param_name, payload_info, results) | |
# Small delay to be respectful | |
time.sleep(0.5) | |
except Exception as e: | |
self._log(f"Error testing {param_name} with {payload_name}: {e}", "ERROR") | |
return results | |
def _test_get_parameter(self, endpoint: str, param_name: str, payload_info: Dict, results: Dict): | |
"""Test GET parameter injection""" | |
payload = payload_info['payload'] | |
if payload_info['encoding'] == 'binary': | |
# URL encode binary data | |
payload = base64.b64encode(payload).decode() | |
params = {param_name: payload} | |
try: | |
response = self.session.get(endpoint, params=params, timeout=10) | |
self._analyze_response(response, f"GET {param_name}", payload_info, results) | |
except requests.exceptions.RequestException as e: | |
self._log(f"GET request failed for {param_name}: {e}", "WARNING") | |
def _test_post_parameter(self, endpoint: str, param_name: str, payload_info: Dict, results: Dict): | |
"""Test POST parameter injection""" | |
payload = payload_info['payload'] | |
# Test form data | |
try: | |
if payload_info['encoding'] == 'binary': | |
data = {param_name: base64.b64encode(payload).decode()} | |
else: | |
data = {param_name: payload} | |
response = self.session.post(endpoint, data=data, timeout=10) | |
self._analyze_response(response, f"POST form {param_name}", payload_info, results) | |
except requests.exceptions.RequestException: | |
pass | |
# Test JSON payload | |
try: | |
if payload_info['encoding'] == 'binary': | |
json_data = {param_name: base64.b64encode(payload).decode()} | |
else: | |
json_data = {param_name: payload} | |
response = self.session.post( | |
endpoint, | |
json=json_data, | |
headers={'Content-Type': 'application/json'}, | |
timeout=10 | |
) | |
self._analyze_response(response, f"POST JSON {param_name}", payload_info, results) | |
except requests.exceptions.RequestException: | |
pass | |
def _analyze_response(self, response: requests.Response, test_type: str, payload_info: Dict, results: Dict): | |
"""Analyze response for vulnerability indicators""" | |
# Check for serialization indicators in headers/body | |
indicators = self.analyze_response_headers(response) | |
if indicators: | |
results['response_indicators'].extend([ | |
f"{test_type}: {indicator}" for indicator in indicators | |
]) | |
# Check for error patterns that might indicate deserialization attempts | |
error_patterns = [ | |
'Marshal', 'unmarshal', 'deserialization', | |
'YAML', 'ruby/object', 'undefined class', | |
'ArgumentError', 'NoMethodError' | |
] | |
response_text = response.text.lower() | |
for pattern in error_patterns: | |
if pattern.lower() in response_text: | |
results['error_patterns'].append(f"{test_type}: Found '{pattern}' in response") | |
# Log unusual status codes | |
if response.status_code >= 500: | |
self._log(f"{test_type}: Server error {response.status_code} - possible processing issue", "WARNING") | |
def test_common_endpoints(self) -> List[Dict]: | |
"""Test common endpoints that might accept serialized data""" | |
base_url = self.target_url.rstrip('/') | |
# Common endpoints that might process serialized data | |
test_endpoints = [ | |
'/', | |
'/api/v1/data', | |
'/api/data', | |
'/session', | |
'/login', | |
'/user', | |
'/config', | |
'/upload', | |
'/import', | |
'/export', | |
'/webhook' | |
] | |
results = [] | |
for endpoint_path in test_endpoints: | |
endpoint = urljoin(base_url, endpoint_path) | |
self._log(f"Testing endpoint: {endpoint}") | |
result = self.test_parameter_injection(endpoint) | |
if (result['vulnerable_parameters'] or | |
result['response_indicators'] or | |
result['error_patterns']): | |
results.append(result) | |
return results | |
def generate_curl_commands(self, findings: List[Dict]) -> List[str]: | |
"""Generate safe curl commands for manual testing""" | |
curl_commands = [] | |
for finding in findings: | |
endpoint = finding['endpoint'] | |
# Generate basic reconnaissance curl command | |
curl_commands.append(f"# Test endpoint: {endpoint}") | |
curl_commands.append(f"curl -v '{endpoint}'") | |
curl_commands.append("") | |
# Generate parameter testing commands | |
for payload_name, payload_info in self.test_payloads.items(): | |
payload = payload_info['payload'] | |
if payload_info['encoding'] == 'binary': | |
payload_b64 = base64.b64encode(payload).decode() | |
curl_commands.append(f"# Test {payload_name} (base64 encoded)") | |
curl_commands.append(f"curl -X POST '{endpoint}' -d 'data={payload_b64}'") | |
else: | |
curl_commands.append(f"# Test {payload_name}") | |
curl_commands.append(f"curl -X POST '{endpoint}' -d 'data={payload}'") | |
curl_commands.append("") | |
return curl_commands | |
def run_comprehensive_scan(self) -> Dict: | |
"""Run comprehensive Ruby deserialization vulnerability scan""" | |
self._log("Starting Ruby deserialization security scan") | |
self._log(f"Target: {self.target_url}") | |
if not self.check_target_accessibility(): | |
return {'error': 'Target not accessible'} | |
# Test common endpoints | |
findings = self.test_common_endpoints() | |
# Generate curl commands for manual testing | |
curl_commands = self.generate_curl_commands(findings) | |
report = { | |
'target': self.target_url, | |
'scan_timestamp': time.strftime("%Y-%m-%d %H:%M:%S"), | |
'findings': findings, | |
'curl_commands': curl_commands, | |
'recommendations': self._generate_recommendations(findings) | |
} | |
self._log(f"Scan completed. Found {len(findings)} potentially interesting endpoints.") | |
return report | |
def _generate_recommendations(self, findings: List[Dict]) -> List[str]: | |
"""Generate security recommendations based on findings""" | |
recommendations = [ | |
"IMMEDIATE ACTIONS:", | |
"1. Review all identified endpoints for deserialization vulnerabilities", | |
"2. Audit Ruby code for unsafe Marshal.load() and YAML.load() usage", | |
"3. Implement input validation and sanitization", | |
"4. Update Ruby and gems to latest secure versions", | |
"", | |
"SECURE CODING PRACTICES:", | |
"1. Use JSON instead of Ruby serialization when possible", | |
"2. Implement allowlists for deserialization classes", | |
"3. Add monitoring for suspicious deserialization attempts", | |
"4. Regular security code reviews and testing" | |
] | |
if findings: | |
recommendations.insert(1, f"⚠️ FOUND {len(findings)} endpoints with potential issues") | |
return recommendations | |
def main(): | |
parser = argparse.ArgumentParser( | |
description='Ruby Deserialization Security Scanner - For authorized testing only' | |
) | |
parser.add_argument('url', help='Target URL to scan') | |
parser.add_argument('-v', '--verbose', action='store_true', help='Verbose output') | |
parser.add_argument('-o', '--output', help='Output file for results (JSON)') | |
parser.add_argument('--curl-only', action='store_true', help='Generate curl commands only') | |
args = parser.parse_args() | |
# Validate URL | |
parsed_url = urlparse(args.url) | |
if not parsed_url.scheme or not parsed_url.netloc: | |
print("Error: Invalid URL provided", file=sys.stderr) | |
sys.exit(1) | |
print("=" * 60) | |
print("Ruby Deserialization Security Scanner") | |
print("=" * 60) | |
print("⚠️ WARNING: For authorized security testing only!") | |
print("⚠️ Only use on systems you own or have permission to test!") | |
print("=" * 60) | |
scanner = RubyDeserializationScanner(args.url, verbose=args.verbose) | |
if args.curl_only: | |
# Generate curl commands for manual testing | |
curl_commands = [] | |
for payload_name, payload_info in scanner.test_payloads.items(): | |
payload = payload_info['payload'] | |
if payload_info['encoding'] == 'binary': | |
payload = base64.b64encode(payload).decode() | |
curl_commands.append(f"# {payload_info['description']}") | |
curl_commands.append(f"curl -X POST '{args.url}' \\") | |
curl_commands.append(f" -H 'Content-Type: {payload_info['content_type']}' \\") | |
curl_commands.append(f" -d 'data={payload}'") | |
curl_commands.append("") | |
print("\n".join(curl_commands)) | |
else: | |
# Run full scan | |
report = scanner.run_comprehensive_scan() | |
if args.output: | |
with open(args.output, 'w') as f: | |
json.dump(report, f, indent=2) | |
print(f"\nResults saved to: {args.output}") | |
else: | |
# Print results to console | |
print("\n" + "=" * 60) | |
print("SCAN RESULTS") | |
print("=" * 60) | |
if 'error' in report: | |
print(f"Error: {report['error']}") | |
else: | |
print(f"Target: {report['target']}") | |
print(f"Scan Time: {report['scan_timestamp']}") | |
print(f"Findings: {len(report['findings'])}") | |
if report['findings']: | |
print("\nPOTENTIAL ISSUES FOUND:") | |
for i, finding in enumerate(report['findings'], 1): | |
print(f"\n{i}. Endpoint: {finding['endpoint']}") | |
if finding['response_indicators']: | |
print(" Response indicators:") | |
for indicator in finding['response_indicators']: | |
print(f" - {indicator}") | |
if finding['error_patterns']: | |
print(" Error patterns:") | |
for error in finding['error_patterns']: | |
print(f" - {error}") | |
print(f"\nRECOMMENDATIONS:") | |
for rec in report['recommendations']: | |
print(rec) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment