Created
March 13, 2021 07:13
-
-
Save lotabout/6c8bdadc485a2445cc613849135cb898 to your computer and use it in GitHub Desktop.
Simple traceroute implementation in Python3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import struct | |
import socket | |
import time | |
# Need to run with root permission cause RAW socket is used | |
# ref: | |
# - https://dnaeon.github.io/traceroute-in-python/ | |
# - https://github.com/openbsd/src/blob/master/usr.sbin/traceroute/traceroute.c | |
def create_sender(ttl): | |
"""A socket for sending UDP packet (with TTL set)""" | |
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
# use socket option cause we don't want to construct packet by ourselves | |
sock.setsockopt(socket.SOL_IP, socket.IP_TTL, ttl) | |
return sock | |
def create_receiver(timeout=1): | |
"""Create a socket for receiving ICMP response packet""" | |
sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname('icmp')) | |
sock.settimeout(timeout) | |
return sock | |
def is_ttl(icmp_packet): | |
# IP header(20B), the first byte of ICMP is type and 11 is TTL | |
return icmp_packet[20] == 11 | |
def is_destination_unreacheable(icmp_packet): | |
# IP header(20B), the first byte of ICMP is type and 3 is TTL | |
return icmp_packet[20] == 3 | |
def extract_port(icmp_packet): | |
# IP header(20), ICMP header(8B), orig IP header(20B) and source port(2B) | |
target_port = icmp_packet[50:52] | |
return struct.unpack('!H', target_port)[0] | |
def udp_ports(): | |
while True: | |
for port in range(33434, 33535): | |
yield port | |
def wait_for_reply(icmp_sock, expected_port, timeout=1): | |
"""wait for ICMP reply and return the source ip address""" | |
start_time = time.time() | |
while True: | |
if time.time() - start_time > timeout: | |
raise socket.timeout | |
rec_packet, addr = icmp_sock.recvfrom(1024) | |
if not is_ttl(rec_packet) and not is_destination_unreacheable(rec_packet): | |
continue | |
if extract_port(rec_packet) != expected_port: | |
continue | |
return addr[0] | |
def trace_once(ip_addr, ttl, ports=udp_ports()): | |
"""Send a request and return (ip, time) or None on timeout""" | |
sender = create_sender(ttl) | |
receiver = create_receiver() | |
try: | |
next_port = next(ports) | |
start_time = time.time() | |
sender.sendto(b'', (ip_addr, next_port)) | |
addr = wait_for_reply(receiver, next_port) | |
end_time = time.time() | |
return (addr, end_time - start_time) | |
finally: | |
if sender is not None: | |
sender.close() | |
if receiver is not None: | |
receiver.close() | |
def format_delay(delay): | |
if delay == None: | |
return '*' | |
return f'{delay * 1000:.3f}ms' | |
def trace(ip_addr, ttl, num_req=3): | |
"""send multiple trace requests and return their results""" | |
traces = [] | |
for _i in range(num_req): | |
try: | |
addr, delay = trace_once(ip_addr, ttl) | |
traces.append((addr, format_delay(delay))) | |
except socket.timeout: | |
traces.append(('*', format_delay(None))) | |
return traces | |
from collections import defaultdict | |
def print_traces(hop, traces): | |
# print the traces | |
merged = defaultdict(list) | |
for addr, delay in traces: | |
merged[addr].append(delay) | |
for idx, (addr, delays) in enumerate(merged.items()): | |
prefix = f'{hop:2}' if idx == 0 else ' ' | |
print(f'{prefix} {addr:15} {" ".join(delays)}') | |
if __name__ == '__main__': | |
import argparse | |
parser = argparse.ArgumentParser(description='traceroute') | |
parser.add_argument('host', help='host') | |
args = parser.parse_args() | |
ip_addr = socket.gethostbyname(args.host) | |
print(f"traceroute to {args.host}({ip_addr})") | |
for hop in range(1, 64): | |
traces = trace(ip_addr, hop) | |
print_traces(hop, traces) | |
if traces[0][0] == ip_addr: | |
break |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment