Skip to content

Instantly share code, notes, and snippets.

@random-robbie
Created June 21, 2025 08:49
Show Gist options
  • Save random-robbie/11906b6ae9153c28e22e2bb80876f0f8 to your computer and use it in GitHub Desktop.
Save random-robbie/11906b6ae9153c28e22e2bb80876f0f8 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
Ruby Deserialization Security Scanner
=====================================
A safe security testing tool for identifying Ruby deserialization vulnerabilities.
DISCLAIMER: This tool is for authorized security testing only.
- Only use on systems you own or have explicit permission to test
- Do not use for malicious purposes
- Follow responsible disclosure practices
Author: Security Research Tool
License: Educational Use Only
"""
import requests
import base64
import json
import argparse
import sys
import time
import re
from urllib.parse import urlparse, urljoin
from typing import List, Dict, Tuple, Optional
class RubyDeserializationScanner:
def __init__(self, target_url: str, verbose: bool = False):
self.target_url = target_url
self.verbose = verbose
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Ruby-Deserialization-Scanner/1.0 (Security Testing)'
})
# Safe test payloads - these are benign and only test for vulnerability patterns
self.test_payloads = self._generate_safe_payloads()
def _generate_safe_payloads(self) -> Dict[str, Dict]:
"""Generate safe test payloads that won't cause harm but can detect vulnerabilities"""
# Ruby Marshal binary header signature
marshal_header = b'\x04\x08'
# Safe Ruby object serialization patterns for testing
safe_payloads = {
'marshal_string_test': {
'description': 'Safe Ruby Marshal string test',
'payload': marshal_header + b'I"\x0bhello\x06:\x06ET', # Just serializes "hello"
'content_type': 'application/octet-stream',
'encoding': 'binary'
},
'marshal_array_test': {
'description': 'Safe Ruby Marshal array test',
'payload': marshal_header + b'[\x06I"\x09test\x06:\x06ET', # Just serializes ["test"]
'content_type': 'application/octet-stream',
'encoding': 'binary'
},
'yaml_object_test': {
'description': 'Safe YAML Ruby object test',
'payload': '--- !ruby/object:TestClass\nname: safe_test\n',
'content_type': 'application/x-yaml',
'encoding': 'text'
},
'base64_marshal_test': {
'description': 'Base64 encoded safe Marshal test',
'payload': base64.b64encode(marshal_header + b'I"\x0bsafe\x06:\x06ET').decode(),
'content_type': 'text/plain',
'encoding': 'base64'
}
}
return safe_payloads
def _log(self, message: str, level: str = "INFO"):
"""Safe logging function"""
if self.verbose or level in ["ERROR", "WARNING"]:
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] [{level}] {message}")
def check_target_accessibility(self) -> bool:
"""Check if target is accessible and responding"""
try:
response = self.session.get(self.target_url, timeout=10)
self._log(f"Target accessible - Status: {response.status_code}")
return True
except requests.exceptions.RequestException as e:
self._log(f"Target not accessible: {e}", "ERROR")
return False
def analyze_response_headers(self, response: requests.Response) -> List[str]:
"""Analyze response headers for Ruby serialization indicators"""
indicators = []
# Patterns that might indicate Ruby serialization
suspicious_patterns = {
'marshal_binary': rb'\x04\x08',
'ruby_yaml': rb'!ruby/',
'marshal_class': rb'C:',
'marshal_object': rb'o:',
'ruby_struct': rb'S:'
}
# Check all headers for suspicious content
for header_name, header_value in response.headers.items():
header_bytes = header_value.encode('utf-8', errors='ignore')
for pattern_name, pattern in suspicious_patterns.items():
if pattern in header_bytes:
indicators.append(f"Header '{header_name}' contains {pattern_name} pattern")
# Check response body for patterns (first 1KB only for safety)
body_sample = response.content[:1024]
for pattern_name, pattern in suspicious_patterns.items():
if pattern in body_sample:
indicators.append(f"Response body contains {pattern_name} pattern")
return indicators
def test_parameter_injection(self, endpoint: str) -> Dict:
"""Test various parameters for deserialization vulnerabilities"""
results = {
'endpoint': endpoint,
'vulnerable_parameters': [],
'response_indicators': [],
'error_patterns': []
}
# Common parameter names that might accept serialized data
test_params = ['data', 'object', 'payload', 'session', 'user', 'config']
for param_name in test_params:
for payload_name, payload_info in self.test_payloads.items():
try:
# Test GET parameters
self._test_get_parameter(endpoint, param_name, payload_info, results)
# Test POST parameters
self._test_post_parameter(endpoint, param_name, payload_info, results)
# Small delay to be respectful
time.sleep(0.5)
except Exception as e:
self._log(f"Error testing {param_name} with {payload_name}: {e}", "ERROR")
return results
def _test_get_parameter(self, endpoint: str, param_name: str, payload_info: Dict, results: Dict):
"""Test GET parameter injection"""
payload = payload_info['payload']
if payload_info['encoding'] == 'binary':
# URL encode binary data
payload = base64.b64encode(payload).decode()
params = {param_name: payload}
try:
response = self.session.get(endpoint, params=params, timeout=10)
self._analyze_response(response, f"GET {param_name}", payload_info, results)
except requests.exceptions.RequestException as e:
self._log(f"GET request failed for {param_name}: {e}", "WARNING")
def _test_post_parameter(self, endpoint: str, param_name: str, payload_info: Dict, results: Dict):
"""Test POST parameter injection"""
payload = payload_info['payload']
# Test form data
try:
if payload_info['encoding'] == 'binary':
data = {param_name: base64.b64encode(payload).decode()}
else:
data = {param_name: payload}
response = self.session.post(endpoint, data=data, timeout=10)
self._analyze_response(response, f"POST form {param_name}", payload_info, results)
except requests.exceptions.RequestException:
pass
# Test JSON payload
try:
if payload_info['encoding'] == 'binary':
json_data = {param_name: base64.b64encode(payload).decode()}
else:
json_data = {param_name: payload}
response = self.session.post(
endpoint,
json=json_data,
headers={'Content-Type': 'application/json'},
timeout=10
)
self._analyze_response(response, f"POST JSON {param_name}", payload_info, results)
except requests.exceptions.RequestException:
pass
def _analyze_response(self, response: requests.Response, test_type: str, payload_info: Dict, results: Dict):
"""Analyze response for vulnerability indicators"""
# Check for serialization indicators in headers/body
indicators = self.analyze_response_headers(response)
if indicators:
results['response_indicators'].extend([
f"{test_type}: {indicator}" for indicator in indicators
])
# Check for error patterns that might indicate deserialization attempts
error_patterns = [
'Marshal', 'unmarshal', 'deserialization',
'YAML', 'ruby/object', 'undefined class',
'ArgumentError', 'NoMethodError'
]
response_text = response.text.lower()
for pattern in error_patterns:
if pattern.lower() in response_text:
results['error_patterns'].append(f"{test_type}: Found '{pattern}' in response")
# Log unusual status codes
if response.status_code >= 500:
self._log(f"{test_type}: Server error {response.status_code} - possible processing issue", "WARNING")
def test_common_endpoints(self) -> List[Dict]:
"""Test common endpoints that might accept serialized data"""
base_url = self.target_url.rstrip('/')
# Common endpoints that might process serialized data
test_endpoints = [
'/',
'/api/v1/data',
'/api/data',
'/session',
'/login',
'/user',
'/config',
'/upload',
'/import',
'/export',
'/webhook'
]
results = []
for endpoint_path in test_endpoints:
endpoint = urljoin(base_url, endpoint_path)
self._log(f"Testing endpoint: {endpoint}")
result = self.test_parameter_injection(endpoint)
if (result['vulnerable_parameters'] or
result['response_indicators'] or
result['error_patterns']):
results.append(result)
return results
def generate_curl_commands(self, findings: List[Dict]) -> List[str]:
"""Generate safe curl commands for manual testing"""
curl_commands = []
for finding in findings:
endpoint = finding['endpoint']
# Generate basic reconnaissance curl command
curl_commands.append(f"# Test endpoint: {endpoint}")
curl_commands.append(f"curl -v '{endpoint}'")
curl_commands.append("")
# Generate parameter testing commands
for payload_name, payload_info in self.test_payloads.items():
payload = payload_info['payload']
if payload_info['encoding'] == 'binary':
payload_b64 = base64.b64encode(payload).decode()
curl_commands.append(f"# Test {payload_name} (base64 encoded)")
curl_commands.append(f"curl -X POST '{endpoint}' -d 'data={payload_b64}'")
else:
curl_commands.append(f"# Test {payload_name}")
curl_commands.append(f"curl -X POST '{endpoint}' -d 'data={payload}'")
curl_commands.append("")
return curl_commands
def run_comprehensive_scan(self) -> Dict:
"""Run comprehensive Ruby deserialization vulnerability scan"""
self._log("Starting Ruby deserialization security scan")
self._log(f"Target: {self.target_url}")
if not self.check_target_accessibility():
return {'error': 'Target not accessible'}
# Test common endpoints
findings = self.test_common_endpoints()
# Generate curl commands for manual testing
curl_commands = self.generate_curl_commands(findings)
report = {
'target': self.target_url,
'scan_timestamp': time.strftime("%Y-%m-%d %H:%M:%S"),
'findings': findings,
'curl_commands': curl_commands,
'recommendations': self._generate_recommendations(findings)
}
self._log(f"Scan completed. Found {len(findings)} potentially interesting endpoints.")
return report
def _generate_recommendations(self, findings: List[Dict]) -> List[str]:
"""Generate security recommendations based on findings"""
recommendations = [
"IMMEDIATE ACTIONS:",
"1. Review all identified endpoints for deserialization vulnerabilities",
"2. Audit Ruby code for unsafe Marshal.load() and YAML.load() usage",
"3. Implement input validation and sanitization",
"4. Update Ruby and gems to latest secure versions",
"",
"SECURE CODING PRACTICES:",
"1. Use JSON instead of Ruby serialization when possible",
"2. Implement allowlists for deserialization classes",
"3. Add monitoring for suspicious deserialization attempts",
"4. Regular security code reviews and testing"
]
if findings:
recommendations.insert(1, f"⚠️ FOUND {len(findings)} endpoints with potential issues")
return recommendations
def main():
parser = argparse.ArgumentParser(
description='Ruby Deserialization Security Scanner - For authorized testing only'
)
parser.add_argument('url', help='Target URL to scan')
parser.add_argument('-v', '--verbose', action='store_true', help='Verbose output')
parser.add_argument('-o', '--output', help='Output file for results (JSON)')
parser.add_argument('--curl-only', action='store_true', help='Generate curl commands only')
args = parser.parse_args()
# Validate URL
parsed_url = urlparse(args.url)
if not parsed_url.scheme or not parsed_url.netloc:
print("Error: Invalid URL provided", file=sys.stderr)
sys.exit(1)
print("=" * 60)
print("Ruby Deserialization Security Scanner")
print("=" * 60)
print("⚠️ WARNING: For authorized security testing only!")
print("⚠️ Only use on systems you own or have permission to test!")
print("=" * 60)
scanner = RubyDeserializationScanner(args.url, verbose=args.verbose)
if args.curl_only:
# Generate curl commands for manual testing
curl_commands = []
for payload_name, payload_info in scanner.test_payloads.items():
payload = payload_info['payload']
if payload_info['encoding'] == 'binary':
payload = base64.b64encode(payload).decode()
curl_commands.append(f"# {payload_info['description']}")
curl_commands.append(f"curl -X POST '{args.url}' \\")
curl_commands.append(f" -H 'Content-Type: {payload_info['content_type']}' \\")
curl_commands.append(f" -d 'data={payload}'")
curl_commands.append("")
print("\n".join(curl_commands))
else:
# Run full scan
report = scanner.run_comprehensive_scan()
if args.output:
with open(args.output, 'w') as f:
json.dump(report, f, indent=2)
print(f"\nResults saved to: {args.output}")
else:
# Print results to console
print("\n" + "=" * 60)
print("SCAN RESULTS")
print("=" * 60)
if 'error' in report:
print(f"Error: {report['error']}")
else:
print(f"Target: {report['target']}")
print(f"Scan Time: {report['scan_timestamp']}")
print(f"Findings: {len(report['findings'])}")
if report['findings']:
print("\nPOTENTIAL ISSUES FOUND:")
for i, finding in enumerate(report['findings'], 1):
print(f"\n{i}. Endpoint: {finding['endpoint']}")
if finding['response_indicators']:
print(" Response indicators:")
for indicator in finding['response_indicators']:
print(f" - {indicator}")
if finding['error_patterns']:
print(" Error patterns:")
for error in finding['error_patterns']:
print(f" - {error}")
print(f"\nRECOMMENDATIONS:")
for rec in report['recommendations']:
print(rec)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment