Created
April 5, 2025 11:37
-
-
Save ioxorg/4a83b17b013971e38cf00d95498378f3 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from scapy.all import IP, ICMP, sr1 | |
import time | |
from datetime import datetime | |
import statistics | |
from rich.console import Console | |
from rich.table import Table | |
from rich.panel import Panel | |
from rich.progress import Progress, TextColumn, BarColumn, TimeElapsedColumn | |
import threading | |
console = Console() | |
DNS_PROVIDERS = [ | |
{ | |
"name": "Google", | |
"primary": "8.8.8.8", | |
"secondary": "8.8.4.4" | |
}, | |
{ | |
"name": "Cloudflare", | |
"primary": "1.1.1.1", | |
"secondary": "1.0.0.1" | |
}, | |
{ | |
"name": "Quad9", | |
"primary": "9.9.9.9", | |
"secondary": "149.112.112.112" | |
}, | |
{ | |
"name": "OpenDNS", | |
"primary": "208.67.222.222", | |
"secondary": "208.67.220.220" | |
}, | |
{ | |
"name": "Shelter", | |
"primary": "78.157.59.99", | |
"secondary": "78.157.59.69" | |
}, | |
{ | |
"name": "Radar", | |
"primary": "10.202.10.10", | |
"secondary": "10.202.10.11" | |
}, | |
{ | |
"name": "Shecan", | |
"primary": "185.51.200.2", | |
"secondary": "178.22.122.100" | |
} | |
] | |
def icmp_ping(ip_address, count=5, timeout=2): | |
results = { | |
'packets_sent': count, | |
'packets_received': 0, | |
'rtts': [], | |
'errors': [] | |
} | |
for i in range(count): | |
try: | |
# Create an ICMP Echo Request | |
start_time = time.time() | |
reply = sr1(IP(dst=ip_address)/ICMP(), timeout=timeout, verbose=0) | |
end_time = time.time() | |
# Check if we got a response | |
if reply: | |
rtt = (end_time - start_time) * 1000 # Convert to milliseconds | |
results['packets_received'] += 1 | |
results['rtts'].append(rtt) | |
else: | |
results['errors'].append("No response") | |
# Small delay between pings | |
time.sleep(0.2) | |
except Exception as e: | |
results['errors'].append(str(e)) | |
# Calculate statistics if we have any successful pings | |
if results['packets_received'] > 0: | |
results['min_ms'] = min(results['rtts']) | |
results['max_ms'] = max(results['rtts']) | |
results['avg_ms'] = statistics.mean(results['rtts']) | |
if len(results['rtts']) > 1: | |
results['mdev_ms'] = statistics.stdev(results['rtts']) | |
else: | |
results['mdev_ms'] = 0 | |
else: | |
results['min_ms'] = None | |
results['max_ms'] = None | |
results['avg_ms'] = None | |
results['mdev_ms'] = None | |
# Calculate packet loss | |
if results['packets_sent'] > 0: | |
results['packet_loss'] = round(100 - (results['packets_received'] / results['packets_sent'] * 100), 1) | |
else: | |
results['packet_loss'] = 100 | |
return results | |
def ping_dns_server(dns_ip, count=5): | |
try: | |
# Perform the ping | |
ping_results = icmp_ping(dns_ip, count=count) | |
# Format the results | |
stats = { | |
'dns_ip': dns_ip, | |
'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
'successful': ping_results['packets_received'] > 0, | |
'packets_sent': ping_results['packets_sent'], | |
'packets_received': ping_results['packets_received'], | |
'packet_loss': ping_results['packet_loss'], | |
'min_ms': ping_results['min_ms'], | |
'avg_ms': ping_results['avg_ms'], | |
'max_ms': ping_results['max_ms'], | |
'mdev_ms': ping_results['mdev_ms'], | |
} | |
return stats | |
except Exception as e: | |
return { | |
'dns_ip': dns_ip, | |
'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
'successful': False, | |
'error': str(e), | |
'packets_sent': count, | |
'packets_received': 0, | |
'packet_loss': 100.0, | |
'min_ms': None, | |
'avg_ms': None, | |
'max_ms': None, | |
'mdev_ms': None | |
} | |
def main(): | |
console.print(Panel.fit("[bold blue]DNS Ping Test using Scapy[/bold blue]", border_style="blue")) | |
console.print("[yellow]Note: This script requires root/sudo privileges to run with Scapy[/yellow]\n") | |
all_results = [] | |
# Create progress display | |
with Progress( | |
TextColumn("[bold blue]{task.description}[/bold blue]"), | |
BarColumn(), | |
TextColumn("[bold green]{task.completed}/{task.total}[/bold green]"), | |
TimeElapsedColumn(), | |
console=console | |
) as progress: | |
task = progress.add_task("Testing DNS Servers...", total=len(DNS_PROVIDERS) * 2) | |
# Test each DNS server (primary and secondary) | |
for provider in DNS_PROVIDERS: | |
# Test primary DNS | |
primary_result = ping_dns_server(provider['primary']) | |
primary_result['provider'] = provider['name'] | |
primary_result['type'] = 'Primary' | |
all_results.append(primary_result) | |
progress.update(task, advance=1) | |
# Test secondary DNS | |
secondary_result = ping_dns_server(provider['secondary']) | |
secondary_result['provider'] = provider['name'] | |
secondary_result['type'] = 'Secondary' | |
all_results.append(secondary_result) | |
progress.update(task, advance=1) | |
# Create results table | |
table = Table(title="DNS Server Ping Results") | |
# Add columns | |
table.add_column("Provider", style="cyan") | |
table.add_column("Type", style="magenta") | |
table.add_column("IP Address", style="blue") | |
table.add_column("Status", style="bold") | |
table.add_column("Loss %", justify="right") | |
table.add_column("Min (ms)", justify="right") | |
table.add_column("Avg (ms)", justify="right") | |
table.add_column("Max (ms)", justify="right") | |
# Add rows | |
for result in all_results: | |
# Determine status style | |
if result.get('successful', False): | |
status_text = "[bold green]✓[/bold green]" | |
else: | |
status_text = "[bold red]✗[/bold red]" | |
# Format response times | |
min_ms = f"{result.get('min_ms', 'N/A'):.2f}" if result.get('min_ms') is not None else "N/A" | |
avg_ms = f"{result.get('avg_ms', 'N/A'):.2f}" if result.get('avg_ms') is not None else "N/A" | |
max_ms = f"{result.get('max_ms', 'N/A'):.2f}" if result.get('max_ms') is not None else "N/A" | |
# Add colored formatting based on response times | |
if result.get('avg_ms') is not None: | |
if result['avg_ms'] < 20: | |
avg_ms = f"[green]{avg_ms}[/green]" | |
elif result['avg_ms'] < 50: | |
avg_ms = f"[yellow]{avg_ms}[/yellow]" | |
else: | |
avg_ms = f"[red]{avg_ms}[/red]" | |
# Format packet loss with color | |
packet_loss = result.get('packet_loss', 'N/A') | |
if packet_loss != 'N/A': | |
if packet_loss == 0: | |
packet_loss_text = f"[green]{packet_loss}%[/green]" | |
elif packet_loss < 20: | |
packet_loss_text = f"[yellow]{packet_loss}%[/yellow]" | |
else: | |
packet_loss_text = f"[red]{packet_loss}%[/red]" | |
else: | |
packet_loss_text = "N/A" | |
# Add row to table | |
table.add_row( | |
result['provider'], | |
result['type'], | |
result['dns_ip'], | |
status_text, | |
packet_loss_text, | |
min_ms, | |
avg_ms, | |
max_ms | |
) | |
# Print the table | |
console.print(table) | |
# Print summary stats | |
console.print("\n[bold]Performance Summary:[/bold]") | |
successful_results = [r for r in all_results if r.get('successful', False) and r.get('avg_ms') is not None] | |
if successful_results: | |
avg_times = [r['avg_ms'] for r in successful_results] | |
fastest = min(successful_results, key=lambda x: x['avg_ms']) | |
slowest = max(successful_results, key=lambda x: x['avg_ms']) | |
console.print(f" [green]Fastest:[/green] {fastest['provider']} ({fastest['type']}) - {fastest['dns_ip']} - {fastest['avg_ms']:.2f}ms") | |
console.print(f" [red]Slowest:[/red] {slowest['provider']} ({slowest['type']}) - {slowest['dns_ip']} - {slowest['avg_ms']:.2f}ms") | |
console.print(f" [blue]Overall Average:[/blue] {statistics.mean(avg_times):.2f}ms") | |
else: | |
console.print(" [yellow]No successful pings to calculate statistics[/yellow]") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment