Last active
December 24, 2023 06:08
-
-
Save lotabout/82186f697b49718b5b33c62878431f03 to your computer and use it in GitHub Desktop.
Simple ping implementation in python3 for practicing TCP/IP
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os | |
import struct | |
import socket | |
import time | |
def checksum(bytestr): | |
# ref | |
# - https://en.wikipedia.org/wiki/IPv4_header_checksum | |
# - https://gist.github.com/pklaus/856268/b7194182270c816dee69438b54e42116ab31e53b | |
sum = 0 | |
count_to = (len(bytestr)//2)*2 | |
count = 0 | |
while count < count_to: | |
this_val = (bytestr[count+1] << 8) + bytestr[count] | |
sum += this_val | |
sum &= 0xFFFFFFFF | |
count += 2 | |
# last (odd) byte | |
if count_to < len(bytestr): | |
sum += bytestr[len(bytestr) -1] | |
sum &= 0xFFFFFFFF | |
sum = (sum >> 16) + (sum & 0xFFFF) | |
sum = sum + (sum >> 16) | |
answer = ~sum | |
answer = answer & 0xffff | |
# Swap bytes. Bugger me if I know why. | |
answer = answer >> 8 | (answer << 8 & 0xff00) | |
return answer | |
def echo_request(packet_id, sequence=1, data_size=56): | |
"""create a new echo request package given id | |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | |
| Type | Code | Checksum | | |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | |
| Identifier | Sequence Number | | |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | |
| Data ... | |
+-+-+-+-+- | |
""" | |
ICMP_ECHO = 8 | |
# header is type(8), code(8), checksum(16), id(16), sequence(16) | |
header = struct.pack('!BBHHH', ICMP_ECHO, 0, 0, packet_id, sequence) | |
# pad random bytes | |
# - https://stackoverflow.com/a/34616738 | |
padBytes = [] | |
startVal = 0x42 | |
for i in range(startVal, startVal + data_size): | |
padBytes += [(i & 0xff)] # Keep chars in the 0-255 range | |
data = bytes(padBytes) | |
my_checksum = checksum(header + data) | |
header = struct.pack('!BBHHH', ICMP_ECHO, 0, my_checksum, packet_id, sequence) | |
return header + data | |
def receive_ping(sock, packet_id, time_sent, timeout): | |
import select | |
time_left = timeout | |
while True: | |
select_start = time.time() | |
ready = select.select([sock], [], [], time_left) | |
how_long_in_select = time.time() - select_start | |
if len(ready[0]) == 0: # timeout | |
return None | |
time_received = time.time() | |
rec_packet, addr = sock.recvfrom(1024) | |
icmp_header = rec_packet[20:28] | |
data_size = 8 + len(rec_packet[28:]) | |
type, code, checksum, p_id, seq = struct.unpack('!BBHHH', icmp_header) | |
if p_id == packet_id: | |
ttl = int(rec_packet[8]) | |
return (ttl, time_received - time_sent, data_size) | |
# receive other package | |
time_left -= time_received - time_sent | |
if time_left <= 0: | |
return None | |
def send_request(sock, ip_addr, packet): | |
while packet: | |
sent = my_sock.sendto(packet, (ip_addr, 1)) | |
packet = packet[sent:] | |
if __name__ == '__main__': | |
import argparse | |
parser = argparse.ArgumentParser(description='Ping') | |
parser.add_argument('-s', '--packetsize', help='packetsize', type=int, default='56') | |
parser.add_argument('-t', '--timeout', help='timeout seconds for a single request', type=int, default='1') | |
parser.add_argument('host', help='host') | |
args = parser.parse_args() | |
timeout = args.timeout | |
my_sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname('icmp')) | |
ip_addr = socket.gethostbyname(args.host) | |
packet_id = os.getpid() % 65536 | |
print(f'PING {args.host} ({ip_addr}): {args.packetsize} data bytes') | |
for seq in range(1, 11): | |
packet = echo_request(packet_id, seq, data_size=args.packetsize) | |
send_request(my_sock, ip_addr, packet) | |
ping_response = receive_ping(my_sock, packet_id, time.time(), timeout) | |
if ping_response == None: | |
print(f'failed. Timeout within {timeout} seconds') | |
else: | |
ttl, delay, payload_bytes = ping_response | |
delay = round(delay * 1000.0, 4) | |
print(f'{payload_bytes} bytes from {ip_addr}: icmp_seq={seq} ttl={ttl} time={delay} ms') | |
time.sleep(1) | |
my_sock.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment