Skip to content

Instantly share code, notes, and snippets.

@alonsoir
Created January 28, 2025 12:44
Show Gist options
  • Save alonsoir/a04183fcab22a79518c9b39882b20632 to your computer and use it in GitHub Desktop.
Save alonsoir/a04183fcab22a79518c9b39882b20632 to your computer and use it in GitHub Desktop.
import multiprocessing
import ipaddress
import subprocess
from scapy.all import sniff, IP
# Rango de IPs asignadas a China
china_ip_ranges = [
"36.0.0.0/8", "42.0.0.0/8", "58.0.0.0/7", "101.0.0.0/8",
"110.0.0.0/7", "120.0.0.0/8", "202.0.0.0/7"
]
# Convertir rangos CIDR a objetos IPNetwork
china_networks = [ipaddress.IPv4Network(r) for r in china_ip_ranges]
# Función para verificar si una IP pertenece a China
def is_china_ip(ip):
ip_obj = ipaddress.IPv4Address(ip)
for network in china_networks:
if ip_obj in network:
return True
return False
# Función para capturar los paquetes de la red
def packet_handler(pkt):
if IP in pkt:
ip_src = pkt[IP].src
ip_dst = pkt[IP].dst
if is_china_ip(ip_dst): # Verificar si la IP de destino es china
print(f"Tráfico detectado hacia China: {ip_src} -> {ip_dst}")
# Función que ejecuta el sniffer en paralelo
def run_sniffer():
print("Iniciando captura de paquetes...")
sniff(prn=packet_handler, store=0, filter="ip", timeout=60) # Analiza 60 segundos por defecto
# Función para ejecutar el sniffer de forma paralelizada
def parallel_sniff(num_processes=4):
# Crear un pool de procesos
processes = []
for _ in range(num_processes):
p = multiprocessing.Process(target=run_sniffer)
processes.append(p)
p.start()
# Esperar a que todos los procesos terminen
for p in processes:
p.join()
# Iniciar la captura en paralelo
if __name__ == "__main__":
parallel_sniff(num_processes=4)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment