Skip to content

Instantly share code, notes, and snippets.

@jan-swiecki
Last active January 2, 2023 08:29
Show Gist options
  • Save jan-swiecki/4e7d9cb6e1d8d2bc4ac7dab399118854 to your computer and use it in GitHub Desktop.
Save jan-swiecki/4e7d9cb6e1d8d2bc4ac7dab399118854 to your computer and use it in GitHub Desktop.
python-raw-packet-tools
from __future__ import annotations
from argparse import ArgumentParser
from dataclasses import dataclass
from math import ceil, floor
from pathlib import Path
import re
from typing import Callable
RLEN = 50
class AbstractPacket:
def __init__(self, payload, significant_bits, word_size):
self._fields = {}
self.significant_bits = significant_bits
self.word_size = word_size
self.payload = payload
self.packet_size = int(len(payload)*4/8)
def drop_fields(self):
# print(f'{self.__class__.__name__}: drop fields')
for field_name in self._fields.keys():
delattr(self, field_name)
self._fields = {}
def calc(self):
self.header_size = int(self.significant_bits/8)
self.higher_layer_size = self.packet_size - int(self.significant_bits/8)
def extr_field(self, field_name, start_bit, end_bit, word=None, offset=0, fmt='int'):
offset = 32*(word - 1) if word is not None else offset
f = 2/8
x1 = int((offset + start_bit) * f)
x2 = int((offset + end_bit) * f)
hex_word = self.payload[x1:x2]
# value = int(hex_word, 16) if hex_word else None
value = int(hex_word, 16) if len(hex_word) > 0 else None
# print(f'{self.__class__.__name__}: parsing field {field_name:20s} {offset+start_bit:4d}-{offset+end_bit:4d} --> "{hex_word}" --> "{value}"')
assert value is not None, f'{self.__class__.__name__}: Cannot extract value from field {field_name} - empty data, x1:x2 = {x1}:{x2}'
setattr(self, field_name, value)
self._fields[field_name] = {
'x1': x1,
'x2': x2,
'value': value,
'fmt_value': fmt_value(hex_word, fmt=fmt)
}
# def __repr__(self):
# items = (" %s = %r,\n" % (k, v) for k, v in self.__dict__.items())
# return "%s[\n%s]" % (self.__class__.__name__, ''.join(items))
def __repr__(self):
ret = ''
if self.word_size == 32:
ret += fmt_32bit_ranges(ceil(self.significant_bits/32))+'\n'
ret += f"{' '*RLEN}{pretty_hex_stream(self.payload[:int(self.significant_bits/4)])}"
ret += f' [... {self.higher_layer_size} bytes payload ...]\n'
ret += self.print_fields()
ignore_fields = {'payload', 'significant_bits', 'word_size'}
computed_fields = [" %s = %r\n" % (k, v) for k, v in self.__dict__.items() if k not in self._fields and not k.startswith('_') and k not in ignore_fields]
computed_fields = [' Computed fields:\n'] + computed_fields
return "%s[\n%s%s]" % (
self.__class__.__name__,
''.join([f' {line}\n' for line in ret.split('\n')]),
''.join(computed_fields)
)
def print_fields(self):
ret = ''
ret += ' '*RLEN
line = ''
for vertical_field in self._fields.values():
size = vertical_field['x2'] - vertical_field['x1']
if size == 1:
line += '^'
elif size == 2:
line += '\|'
else:
line += '\\'
line += '_' * (size - 2)
line += '/'
ret += fill_blanks(pretty_hex_stream(line))+'\n'
def fmt_arrow(i, j, vertical_field):
ret = ''
size = vertical_field['x2'] - vertical_field['x1']
if j < i:
ret += '-' * size
elif size == 1:
ret += '/' if i == j else '|'
elif size == 2:
ret += '-/' if i == j else ' |'
elif size == 3:
ret += '-/ ' if i == j else ' | '
else:
half = floor(size/2)
ret += ('-' if i == j else ' ') * half
ret += '/' if i == j else '|'
# line += ' ' * (half - (half % 2))
ret += ' ' * (half-1)
return ret
line = ''
for j, vertical_field in enumerate(self._fields.values()):
line += fmt_arrow(0, 1, vertical_field)
ret += (' ' * RLEN)
ret += fill_blanks(pretty_hex_stream(line))+'\n'
for i, (field_name, horizontal_field) in enumerate(self._fields.items()):
half_RLEN = int(RLEN/2)
ret += (field_name+' ').ljust(half_RLEN-1, '.')+' '
ret += f"{horizontal_field['fmt_value']} ".ljust(half_RLEN, '-')
line = ''
for j, vertical_field in enumerate(self._fields.values()):
line += fmt_arrow(i, j, vertical_field)
ret += fill_blanks(pretty_hex_stream(line))+'\n'
# print('')
return ret
# def print_64bit_ranges(num_32bit_words):
# prnt(f"{' '*RLEN}")
# line = ''
# for i in range(ceil(num_32bit_words/2)):
# line += '|------------64-bits-----------|'
# print(fill_blanks(pretty_hex_stream(line)))
def fmt_32bit_ranges(num_32bit_words):
ret = ''
ret += f"{' '*RLEN}"
line = ''
for i in range(num_32bit_words):
line += '|--32b-|'
return ret + fill_blanks(pretty_hex_stream(line))
# def print_16bit_ranges(num_32bit_words):
# prnt(f"{' '*RLEN}")
# line = ''
# for i in range(num_32bit_words*4):
# line += '|16|'
# print(fill_blanks(pretty_hex_stream(line)))
def pretty_hex_stream(hex_stream):
def gen():
for i in range(0, len(hex_stream), 2):
yield hex_stream[i:i+2]
return ' '.join(list(gen()))
def fill_blanks(line):
in_range = False
ret_line = ''
fill_char = ''
for h in line:
if h == '\\' or h == '-':
if h == '\\':
fill_char = '_'
elif h == '-':
fill_char = '-'
in_range = True
elif h == '/' or h == '|':
in_range = False
elif h == ' ' and in_range:
h = fill_char
ret_line += h
def fix(m):
return m.group(1) + '-' + m.group(3) + '-'*(len(m.group(2)) - 1)
ret_line = re.sub(r'([a-zA-Z0-9]+)-([a-zA-Z0-9]+)', r'\1\2-', ret_line)
ret_line = re.sub(r'([a-zA-Z0-9]+)([-]+)([a-zA-Z0-9]+)', lambda m: fix(m), ret_line)
return ret_line
def fmt_value(hex_word: str | Callable[[str], str], fmt: str):
if not hex_word:
return None
if fmt == 'int':
return int(hex_word, 16) if hex_word else None
elif fmt == 'hex':
return '0x'+hex_word
elif fmt == 'bin':
return bin(int(hex_word, 16))
elif fmt == 'ipv4':
f = lambda i: int(hex_word[i:i+2], 16)
return f'{f(0)}.{f(2)}.{f(4)}.{f(6)}'
elif fmt == 'mac':
f = lambda i: hex_word[i:i+2]
return ':'.join([f(i) for i in range(0, 12, 2)])
elif callable(fmt):
return fmt(int(hex_word, 16) if hex_word else None)
else:
return hex_word
from argparse import ArgumentParser
from dataclasses import dataclass
from math import ceil, floor
from pathlib import Path
import re
from lib_packet_parse import AbstractPacket, pretty_hex_stream
class EthernetPacket(AbstractPacket):
def __init__(self, payload, significant_bits=14 * 8, word_size=None):
super().__init__(payload, significant_bits=significant_bits, word_size=word_size)
self.drop_fields()
self.extr_field('destination_mac_address', 0, 6 * 8, fmt='mac')
self.extr_field('source_mac_address', 0, 6 * 8, offset=6 * 8, fmt='mac')
self.extr_field('layer3_type', 0, 2 * 8, offset=2 * 6 * 8, fmt='hex')
self.is_ipv4 = self.layer3_type == int('0800', 16)
if self.significant_bits:
self.calc()
def upgrade(self):
if self.is_ipv4:
return IpPacket(self.payload[(14 * 2):])
else:
assert False
@dataclass
class IpFlags:
reserved: bool
dont_fragment: bool
more_fragments: bool
class IpPacket(AbstractPacket):
def __init__(self, payload, significant_bits=None, word_size=32):
super().__init__(payload, significant_bits=significant_bits, word_size=word_size)
self.drop_fields()
self.extr_field('version', 0, 4, word=1)
self.extr_field('ihl', 4, 8, word=1)
self.extr_field('type_of_service', 8, 16, word=1)
self.extr_field('total_length', 16, 32, word=1)
self.extr_field('identification', 0, 16, word=2)
self.extr_field('ip_flags', 16, 20, word=2, fmt=lambda v: bin(v >> 1))
self.extr_field('fragment_offset', 20, 32, word=2)
self.extr_field('ttl', 0, 8, word=3)
self.extr_field('protocol', 8, 16, word=3)
self.extr_field('header_checksum', 16, 32, word=3)
self.extr_field('source_address', 0, 32, word=4, fmt='ipv4')
self.extr_field('destination_address', 0, 32, word=5, fmt='ipv4')
for word in range(6, self.ihl + 1):
self.extr_field(f'options_{word}', 0, 32, word=word)
self.is_tcp = self.protocol == 6
self.significant_bits = 32*self.ihl
self.calc()
self.ip_flags = self.ip_flags >> 1
self.flags = IpFlags(
reserved = (self.ip_flags >> 2) & 0b1 == 1,
dont_fragment = (self.ip_flags >> 1) & 0b1 == 1,
more_fragments = (self.ip_flags >> 0) & 0b1 == 1,
)
def upgrade(self):
if self.is_tcp:
return TcpPacket(self.payload[int(self.significant_bits/4):])
else:
assert False
@dataclass
class TcpFlags:
ns: bool
cwr: bool
ece: bool
urg: bool
ack: bool
psh: bool
rst: bool
syn: bool
fin: bool
class TcpPacket(AbstractPacket):
def __init__(self, payload):
super().__init__(payload, significant_bits=None, word_size=32)
self.drop_fields()
# print(pretty_hex_stream(self.payload))
self.extr_field('source_port', 0, 16, word=1)
self.extr_field('destination_port', 16, 32, word=1)
self.extr_field('syn', 0, 32, word=2)
self.extr_field('ack', 0, 32, word=3)
self.extr_field('header_num_words', 0, 4, word=4)
self.extr_field('tcp_flags', 4, 16, word=4, fmt='bin')
self.extr_field('window_size', 16, 32, word=4)
self.extr_field('checksum', 0, 16, word=5)
self.extr_field('urgent_pointer', 16, 32, word=5)
for word in range(6, self.header_num_words + 1):
self.extr_field(f'options_{word}', 0, 32, word=word)
self.significant_bits = 32*self.header_num_words
self.calc()
self.flags = TcpFlags(
ns = (self.tcp_flags >> 8) & 0b1 == 1,
cwr = (self.tcp_flags >> 7) & 0b1 == 1,
ece = (self.tcp_flags >> 6) & 0b1 == 1,
urg = (self.tcp_flags >> 5) & 0b1 == 1,
ack = (self.tcp_flags >> 4) & 0b1 == 1,
psh = (self.tcp_flags >> 3) & 0b1 == 1,
rst = (self.tcp_flags >> 2) & 0b1 == 1,
syn = (self.tcp_flags >> 1) & 0b1 == 1,
fin = (self.tcp_flags >> 0) & 0b1 == 1,
)
# def upgrade(self):
# if self.is_tcp:
# return TcpPacket(self.payload[int(self.significant_bits*4/8):])
# else:
# assert False
def main():
parser = ArgumentParser()
parser.add_argument(
'file',
type=str,
)
args = parser.parse_args()
hex_stream = Path(args.file).read_text().replace(' ', '').replace("\n", "")
# packet = EthernetPacket(hex_stream).upgrade()
eth_packet = EthernetPacket(hex_stream)
print(eth_packet)
ip_packet = eth_packet.upgrade()
print(ip_packet)
tcp_packet = ip_packet.upgrade()
print(tcp_packet)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment