Last active
January 2, 2023 08:29
-
-
Save jan-swiecki/4e7d9cb6e1d8d2bc4ac7dab399118854 to your computer and use it in GitHub Desktop.
python-raw-packet-tools
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
from argparse import ArgumentParser | |
from dataclasses import dataclass | |
from math import ceil, floor | |
from pathlib import Path | |
import re | |
from typing import Callable | |
RLEN = 50 | |
class AbstractPacket: | |
def __init__(self, payload, significant_bits, word_size): | |
self._fields = {} | |
self.significant_bits = significant_bits | |
self.word_size = word_size | |
self.payload = payload | |
self.packet_size = int(len(payload)*4/8) | |
def drop_fields(self): | |
# print(f'{self.__class__.__name__}: drop fields') | |
for field_name in self._fields.keys(): | |
delattr(self, field_name) | |
self._fields = {} | |
def calc(self): | |
self.header_size = int(self.significant_bits/8) | |
self.higher_layer_size = self.packet_size - int(self.significant_bits/8) | |
def extr_field(self, field_name, start_bit, end_bit, word=None, offset=0, fmt='int'): | |
offset = 32*(word - 1) if word is not None else offset | |
f = 2/8 | |
x1 = int((offset + start_bit) * f) | |
x2 = int((offset + end_bit) * f) | |
hex_word = self.payload[x1:x2] | |
# value = int(hex_word, 16) if hex_word else None | |
value = int(hex_word, 16) if len(hex_word) > 0 else None | |
# print(f'{self.__class__.__name__}: parsing field {field_name:20s} {offset+start_bit:4d}-{offset+end_bit:4d} --> "{hex_word}" --> "{value}"') | |
assert value is not None, f'{self.__class__.__name__}: Cannot extract value from field {field_name} - empty data, x1:x2 = {x1}:{x2}' | |
setattr(self, field_name, value) | |
self._fields[field_name] = { | |
'x1': x1, | |
'x2': x2, | |
'value': value, | |
'fmt_value': fmt_value(hex_word, fmt=fmt) | |
} | |
# def __repr__(self): | |
# items = (" %s = %r,\n" % (k, v) for k, v in self.__dict__.items()) | |
# return "%s[\n%s]" % (self.__class__.__name__, ''.join(items)) | |
def __repr__(self): | |
ret = '' | |
if self.word_size == 32: | |
ret += fmt_32bit_ranges(ceil(self.significant_bits/32))+'\n' | |
ret += f"{' '*RLEN}{pretty_hex_stream(self.payload[:int(self.significant_bits/4)])}" | |
ret += f' [... {self.higher_layer_size} bytes payload ...]\n' | |
ret += self.print_fields() | |
ignore_fields = {'payload', 'significant_bits', 'word_size'} | |
computed_fields = [" %s = %r\n" % (k, v) for k, v in self.__dict__.items() if k not in self._fields and not k.startswith('_') and k not in ignore_fields] | |
computed_fields = [' Computed fields:\n'] + computed_fields | |
return "%s[\n%s%s]" % ( | |
self.__class__.__name__, | |
''.join([f' {line}\n' for line in ret.split('\n')]), | |
''.join(computed_fields) | |
) | |
def print_fields(self): | |
ret = '' | |
ret += ' '*RLEN | |
line = '' | |
for vertical_field in self._fields.values(): | |
size = vertical_field['x2'] - vertical_field['x1'] | |
if size == 1: | |
line += '^' | |
elif size == 2: | |
line += '\|' | |
else: | |
line += '\\' | |
line += '_' * (size - 2) | |
line += '/' | |
ret += fill_blanks(pretty_hex_stream(line))+'\n' | |
def fmt_arrow(i, j, vertical_field): | |
ret = '' | |
size = vertical_field['x2'] - vertical_field['x1'] | |
if j < i: | |
ret += '-' * size | |
elif size == 1: | |
ret += '/' if i == j else '|' | |
elif size == 2: | |
ret += '-/' if i == j else ' |' | |
elif size == 3: | |
ret += '-/ ' if i == j else ' | ' | |
else: | |
half = floor(size/2) | |
ret += ('-' if i == j else ' ') * half | |
ret += '/' if i == j else '|' | |
# line += ' ' * (half - (half % 2)) | |
ret += ' ' * (half-1) | |
return ret | |
line = '' | |
for j, vertical_field in enumerate(self._fields.values()): | |
line += fmt_arrow(0, 1, vertical_field) | |
ret += (' ' * RLEN) | |
ret += fill_blanks(pretty_hex_stream(line))+'\n' | |
for i, (field_name, horizontal_field) in enumerate(self._fields.items()): | |
half_RLEN = int(RLEN/2) | |
ret += (field_name+' ').ljust(half_RLEN-1, '.')+' ' | |
ret += f"{horizontal_field['fmt_value']} ".ljust(half_RLEN, '-') | |
line = '' | |
for j, vertical_field in enumerate(self._fields.values()): | |
line += fmt_arrow(i, j, vertical_field) | |
ret += fill_blanks(pretty_hex_stream(line))+'\n' | |
# print('') | |
return ret | |
# def print_64bit_ranges(num_32bit_words): | |
# prnt(f"{' '*RLEN}") | |
# line = '' | |
# for i in range(ceil(num_32bit_words/2)): | |
# line += '|------------64-bits-----------|' | |
# print(fill_blanks(pretty_hex_stream(line))) | |
def fmt_32bit_ranges(num_32bit_words): | |
ret = '' | |
ret += f"{' '*RLEN}" | |
line = '' | |
for i in range(num_32bit_words): | |
line += '|--32b-|' | |
return ret + fill_blanks(pretty_hex_stream(line)) | |
# def print_16bit_ranges(num_32bit_words): | |
# prnt(f"{' '*RLEN}") | |
# line = '' | |
# for i in range(num_32bit_words*4): | |
# line += '|16|' | |
# print(fill_blanks(pretty_hex_stream(line))) | |
def pretty_hex_stream(hex_stream): | |
def gen(): | |
for i in range(0, len(hex_stream), 2): | |
yield hex_stream[i:i+2] | |
return ' '.join(list(gen())) | |
def fill_blanks(line): | |
in_range = False | |
ret_line = '' | |
fill_char = '' | |
for h in line: | |
if h == '\\' or h == '-': | |
if h == '\\': | |
fill_char = '_' | |
elif h == '-': | |
fill_char = '-' | |
in_range = True | |
elif h == '/' or h == '|': | |
in_range = False | |
elif h == ' ' and in_range: | |
h = fill_char | |
ret_line += h | |
def fix(m): | |
return m.group(1) + '-' + m.group(3) + '-'*(len(m.group(2)) - 1) | |
ret_line = re.sub(r'([a-zA-Z0-9]+)-([a-zA-Z0-9]+)', r'\1\2-', ret_line) | |
ret_line = re.sub(r'([a-zA-Z0-9]+)([-]+)([a-zA-Z0-9]+)', lambda m: fix(m), ret_line) | |
return ret_line | |
def fmt_value(hex_word: str | Callable[[str], str], fmt: str): | |
if not hex_word: | |
return None | |
if fmt == 'int': | |
return int(hex_word, 16) if hex_word else None | |
elif fmt == 'hex': | |
return '0x'+hex_word | |
elif fmt == 'bin': | |
return bin(int(hex_word, 16)) | |
elif fmt == 'ipv4': | |
f = lambda i: int(hex_word[i:i+2], 16) | |
return f'{f(0)}.{f(2)}.{f(4)}.{f(6)}' | |
elif fmt == 'mac': | |
f = lambda i: hex_word[i:i+2] | |
return ':'.join([f(i) for i in range(0, 12, 2)]) | |
elif callable(fmt): | |
return fmt(int(hex_word, 16) if hex_word else None) | |
else: | |
return hex_word | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from argparse import ArgumentParser | |
from dataclasses import dataclass | |
from math import ceil, floor | |
from pathlib import Path | |
import re | |
from lib_packet_parse import AbstractPacket, pretty_hex_stream | |
class EthernetPacket(AbstractPacket): | |
def __init__(self, payload, significant_bits=14 * 8, word_size=None): | |
super().__init__(payload, significant_bits=significant_bits, word_size=word_size) | |
self.drop_fields() | |
self.extr_field('destination_mac_address', 0, 6 * 8, fmt='mac') | |
self.extr_field('source_mac_address', 0, 6 * 8, offset=6 * 8, fmt='mac') | |
self.extr_field('layer3_type', 0, 2 * 8, offset=2 * 6 * 8, fmt='hex') | |
self.is_ipv4 = self.layer3_type == int('0800', 16) | |
if self.significant_bits: | |
self.calc() | |
def upgrade(self): | |
if self.is_ipv4: | |
return IpPacket(self.payload[(14 * 2):]) | |
else: | |
assert False | |
@dataclass | |
class IpFlags: | |
reserved: bool | |
dont_fragment: bool | |
more_fragments: bool | |
class IpPacket(AbstractPacket): | |
def __init__(self, payload, significant_bits=None, word_size=32): | |
super().__init__(payload, significant_bits=significant_bits, word_size=word_size) | |
self.drop_fields() | |
self.extr_field('version', 0, 4, word=1) | |
self.extr_field('ihl', 4, 8, word=1) | |
self.extr_field('type_of_service', 8, 16, word=1) | |
self.extr_field('total_length', 16, 32, word=1) | |
self.extr_field('identification', 0, 16, word=2) | |
self.extr_field('ip_flags', 16, 20, word=2, fmt=lambda v: bin(v >> 1)) | |
self.extr_field('fragment_offset', 20, 32, word=2) | |
self.extr_field('ttl', 0, 8, word=3) | |
self.extr_field('protocol', 8, 16, word=3) | |
self.extr_field('header_checksum', 16, 32, word=3) | |
self.extr_field('source_address', 0, 32, word=4, fmt='ipv4') | |
self.extr_field('destination_address', 0, 32, word=5, fmt='ipv4') | |
for word in range(6, self.ihl + 1): | |
self.extr_field(f'options_{word}', 0, 32, word=word) | |
self.is_tcp = self.protocol == 6 | |
self.significant_bits = 32*self.ihl | |
self.calc() | |
self.ip_flags = self.ip_flags >> 1 | |
self.flags = IpFlags( | |
reserved = (self.ip_flags >> 2) & 0b1 == 1, | |
dont_fragment = (self.ip_flags >> 1) & 0b1 == 1, | |
more_fragments = (self.ip_flags >> 0) & 0b1 == 1, | |
) | |
def upgrade(self): | |
if self.is_tcp: | |
return TcpPacket(self.payload[int(self.significant_bits/4):]) | |
else: | |
assert False | |
@dataclass | |
class TcpFlags: | |
ns: bool | |
cwr: bool | |
ece: bool | |
urg: bool | |
ack: bool | |
psh: bool | |
rst: bool | |
syn: bool | |
fin: bool | |
class TcpPacket(AbstractPacket): | |
def __init__(self, payload): | |
super().__init__(payload, significant_bits=None, word_size=32) | |
self.drop_fields() | |
# print(pretty_hex_stream(self.payload)) | |
self.extr_field('source_port', 0, 16, word=1) | |
self.extr_field('destination_port', 16, 32, word=1) | |
self.extr_field('syn', 0, 32, word=2) | |
self.extr_field('ack', 0, 32, word=3) | |
self.extr_field('header_num_words', 0, 4, word=4) | |
self.extr_field('tcp_flags', 4, 16, word=4, fmt='bin') | |
self.extr_field('window_size', 16, 32, word=4) | |
self.extr_field('checksum', 0, 16, word=5) | |
self.extr_field('urgent_pointer', 16, 32, word=5) | |
for word in range(6, self.header_num_words + 1): | |
self.extr_field(f'options_{word}', 0, 32, word=word) | |
self.significant_bits = 32*self.header_num_words | |
self.calc() | |
self.flags = TcpFlags( | |
ns = (self.tcp_flags >> 8) & 0b1 == 1, | |
cwr = (self.tcp_flags >> 7) & 0b1 == 1, | |
ece = (self.tcp_flags >> 6) & 0b1 == 1, | |
urg = (self.tcp_flags >> 5) & 0b1 == 1, | |
ack = (self.tcp_flags >> 4) & 0b1 == 1, | |
psh = (self.tcp_flags >> 3) & 0b1 == 1, | |
rst = (self.tcp_flags >> 2) & 0b1 == 1, | |
syn = (self.tcp_flags >> 1) & 0b1 == 1, | |
fin = (self.tcp_flags >> 0) & 0b1 == 1, | |
) | |
# def upgrade(self): | |
# if self.is_tcp: | |
# return TcpPacket(self.payload[int(self.significant_bits*4/8):]) | |
# else: | |
# assert False | |
def main(): | |
parser = ArgumentParser() | |
parser.add_argument( | |
'file', | |
type=str, | |
) | |
args = parser.parse_args() | |
hex_stream = Path(args.file).read_text().replace(' ', '').replace("\n", "") | |
# packet = EthernetPacket(hex_stream).upgrade() | |
eth_packet = EthernetPacket(hex_stream) | |
print(eth_packet) | |
ip_packet = eth_packet.upgrade() | |
print(ip_packet) | |
tcp_packet = ip_packet.upgrade() | |
print(tcp_packet) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment