Skip to content

Instantly share code, notes, and snippets.

@TerryGeng
Created November 29, 2020 08:51
Show Gist options
  • Save TerryGeng/741400eacbc79d2e22ee88da97c2c4f6 to your computer and use it in GitHub Desktop.
Save TerryGeng/741400eacbc79d2e22ee88da97c2c4f6 to your computer and use it in GitHub Desktop.
Utility to read Huawei's binary firewall NAT session log. For auditing use.
"""
Utility to read Huawei's binary firewall NAT session log. For auditing use.
PLEASE DO RESPECT THE PRIVACY OF OTHER PEOPLE.
Log formats are explained in
https://support.huawei.com/enterprise/en/doc/EDOC1000107643?section=j00c
Author: Terry Geng
Date: 2020-11-26
"""
import os
import argparse
import struct
from typing import Union
from enum import Enum
from io import BufferedIOBase, RawIOBase
from datetime import datetime
from functools import partial
import json
class ProtocolType(Enum):
ICMP = 1
TCP = 6
UDP = 17
class Packet:
def jsonify(self):
return json.dumps(self.__dict__)
class PacketUnpack:
packet_format = [
# (struct_format, [description], [expected_val_list],
# [attribute_to_write], [handler])
]
packet_class = Packet
class PacketUnpackInstruction:
def __init__(self, struct_format, description=None,
expected_val_list=None, attribute_to_write=None,
handler=None):
self.unpacker = struct.Struct(struct_format)
self.size = self.unpacker.size
self.parts = self.count_parts(struct_format)
self.is_array = (self.parts > 1)
self.description = description
self.expected_val_list = expected_val_list
if expected_val_list:
if self.is_array:
self.check_func = \
(lambda val: val in self.expected_val_list)
else:
self.check_func = \
(lambda val: val[0] in self.expected_val_list)
else:
self.check_func = None
if attribute_to_write:
if self.is_array:
self.write_val_func = (
lambda obj, val: setattr(obj, attribute_to_write, val)
) if attribute_to_write else None
else:
self.write_val_func = (
lambda obj, val: setattr(obj, attribute_to_write, val[0])
) if attribute_to_write else None
else:
self.write_val_func = None
if handler:
if self.is_array:
self.handler = handler
else:
self.handler = lambda obj, val: handler(obj, val[0])
else:
self.handler = None
self.placeholder = (not expected_val_list and \
not attribute_to_write and not handler)
@staticmethod
def count_parts(struct_format):
parts = 0
for s in struct_format:
if s.isalpha():
parts += 1
return parts
def __init__(self):
self.unpack_insts = [
PacketUnpack.PacketUnpackInstruction(*inst) for inst \
in self.packet_format
]
self.packet_size = sum([i.size for i in self.unpack_insts])
def unpack(self, _io: Union[BufferedIOBase, RawIOBase]):
obj = self.packet_class()
for inst in self.unpack_insts:
if not inst.placeholder:
b = _io.read(inst.size)
if not b:
return None
val = inst.unpacker.unpack(b)
if inst.check_func and not inst.check_func(val):
raise ValueError(
f"Expected {inst.expected_val_list} for field "
f"{inst.description}, encounter {val} instead."
)
if inst.handler:
inst.handler(obj, val)
if inst.write_val_func:
inst.write_val_func(obj, val)
else:
_io.read(inst.size)
return obj
class NATIPv4SessionPacket(Packet):
def __init__(self):
self.version = 0
self.gen_time = 0
self.sequence_num = 0
self.protocol_type = None
self.src_ip = ""
self.src_port = 0
self.src_nat_ip = ""
self.src_nat_port = 0
self.dest_ip = ""
self.dest_nat_ip = ""
self.dest_port = 0
self.dest_nat_port = 0
self.session_creation = 0
self.session_end = 0
self.received_packets = 0
self.received_bytes = 0
self.sent_packets = 0
self.sent_bytes = 0
def _set_protocol(self, protocol_int):
self.protocol_type = ProtocolType(protocol_int)
def _set_src_ip(self, ints):
self.src_ip = f"{ints[0]}.{ints[1]}.{ints[2]}.{ints[3]}"
def _set_src_nat_ip(self, ints):
self.src_nat_ip = f"{ints[0]}.{ints[1]}.{ints[2]}.{ints[3]}"
def _set_dest_ip(self, ints):
self.dest_ip = f"{ints[0]}.{ints[1]}.{ints[2]}.{ints[3]}"
def _set_dest_nat_ip(self, ints):
self.src_nat_ip = f"{ints[0]}.{ints[1]}.{ints[2]}.{ints[3]}"
class NATIPv4SessionPacketUnpack(PacketUnpack):
packet_format = [
# (struct_format, is_array, [description], [expected_val_list],
# [attribute_to_write], [handler])
# Header
("!B", "Version", [2, 3], "version", None),
("!B", "Log packet type", [4], None, None),
("!H", "Num. of sessions in this packet", [1], None, None),
("!I", "Time of generation", None, "gen_time", None),
("!I", "Sequence number", None, "sequence_num", None),
("!H", "Unused", [0], None, None),
("!B", "Slot", None, None, None),
("!B", "Unused", [0], None, None),
# Log Content
("!B", "Protocol type", [1, 6, 17], None, NATIPv4SessionPacket._set_protocol),
("!B", "Unused", None, None, None),
("!B", "IP version", [4], None, None),
("!B", "ToS", None, None, None),
("!BBBB", "Src IP", None, None, NATIPv4SessionPacket._set_src_ip),
("!BBBB", "Src IP after NAT", None, None, NATIPv4SessionPacket._set_src_nat_ip),
("!BBBB", "Dest IP", None, None, NATIPv4SessionPacket._set_dest_ip),
("!BBBB", "Dest IP after NAT", None, None, NATIPv4SessionPacket._set_dest_nat_ip),
("!H", "Src port", None, "src_port", None),
("!H", "Src port after NAT", None, "src_nat_port", None),
("!H", "Dest port", None, "dest_port", None),
("!H", "Dest port after NAT", None, "dest_nat_port", None),
("!I", "Session creation time", None, "session_creation", None),
("!I", "Session end time", None, "session_end", None),
("!I", "Packets received by src", None, "received_packets", None),
("!I", "Bytes received by src", None, "received_bytes", None),
("!I", "Packets sent by src", None, "sent_packets", None),
("!I", "Bytes sent by src", None, "sent_bytes", None),
("!HHBBHI", "Reserved fields", None, None, None),
]
packet_class = NATIPv4SessionPacket
def __init__(self):
super().__init__()
class NATSessionFilter:
checkers = {
'srcip': (lambda expected, pkt: pkt.src_ip == expected),
'srcport': (lambda expected, pkt: pkt.src_port == expected),
'destip': (lambda expected, pkt: pkt.dest_ip == expected),
'destport': (lambda expected, pkt: pkt.dest_port == expected)
}
def __init__(self, filter_str):
filters = filter_str.split(",")
filters = [s.strip() for s in filters]
self.filters = []
self.init_filters(filters)
def init_filters(self, filters):
for fstr in filters:
arg, expected = fstr.split("=")
arg = arg.strip()
expected = expected.strip()
not_ = False
if arg[0] == '!':
not_ = True
arg = arg[1:]
elif arg[1] == "!":
not_ = True
arg = arg[:-1]
if arg not in self.checkers:
raise ValueError(f"Unknown filter parameter: {arg}.")
if not not_:
self.filters.append(partial(self.checkers[arg], expected))
else:
self.filters.append(
lambda pkt: not partial(self.checkers[arg], expected))
def match(self, pkt):
for flt in self.filters:
if not flt(pkt):
return False
return True
def format_file_size(in_bytes):
if in_bytes < 1024:
return f"{in_bytes}B"
elif in_bytes < 1048576:
in_bytes /= 1024
return f"{in_bytes:.1f}KB"
elif in_bytes < 1073741824:
in_bytes /= 1048576
return f"{in_bytes:.1f}MB"
elif in_bytes < 1099511627776:
in_bytes /= 1073741824
return f"{in_bytes:.1f}GB"
return ">1TB"
def print_nat_packet_header():
h = "Id Timestamp Src_ip:port Dest_ip:port Bytes/pkt_sent Bytes/pkt_received"
print(h)
print("=" * len(h))
def format_ip_port(ip, port):
return f"{ip}:{port}"
def print_nat_packet(p):
# Id Timestamp Src_ip:port Dest_ip:port Bytes/pkt_sent Bytes/pkt_received
# 00000 0000-00-00T00:00:00 000.000.000.000:00000 000.000.000.000:00000 000000.0(000000) 000000.0(000000)
t = datetime.fromtimestamp(p.session_creation).isoformat()
print(
f"{p.sequence_num:>7d} {t} "
f"{format_ip_port(p.src_ip, p.src_port):21s} "
f"{format_ip_port(p.dest_ip, p.dest_port):21s} "
f"{format_file_size(p.sent_bytes):>7s}/{p.sent_packets:<6d} "
f"{format_file_size(p.received_bytes):>7s}/{p.received_packets:<6d}"
)
def read_session_log_file(fd, start_from=0, stop_at=0, length=0, filter_=None):
unpacker = NATIPv4SessionPacketUnpack()
packet_size = unpacker.packet_size
if start_from > 0:
fd.seek(start_from * packet_size)
if stop_at == 0 and length > 0:
stop_at = packet_size * (start_from + length)
error_bytes_before_success = 0
last_pos = 0
count = 0
print_nat_packet_header()
while not stop_at or fd.tell() < stop_at:
try:
last_pos = fd.tell()
packet = unpacker.unpack(fd)
if not packet:
break
if not filter_ or filter_.match(packet):
print_nat_packet(packet)
count += 1
error_bytes_before_success = 0
except ValueError:
if error_bytes_before_success > 2 * packet_size:
print("ERROR: wrong file format or corrupted file. exit.")
exit(1)
error_bytes_before_success += 1
fd.seek(last_pos + 1)
continue
print("============================")
print(f"{count} records in total")
if __name__ == "__main__":
import textwrap
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description=textwrap.dedent(
"""\
Utility to read Huawei's binary firewall NAT session log. For
auditing use.
PLEASE DO RESPECT THE PRIVACY OF OTHER PEOPLE.
"""),
epilog=textwrap.dedent(
"""\
Filter string are in the format of
[!]cond=val,[cond=val...]
where cond can be one of the `srcip`, `dstip`, `srcport`, `dstport`.
`!` mark means not.
""")
)
parser.add_argument("--start-from", "-s", dest="start_from", type=int,
default=0,
help="read from the given index of records")
parser.add_argument("--stop-at", "-t", dest="stop_at", type=int,
default=0,
help="read until the given index of records. 0 means "
"read till the end")
parser.add_argument("--length", "-l", dest="length", type=int,
default=0,
help="the number of records to read. 0 means all.")
parser.add_argument("--filter", "-f", dest="filter", type=str, default="",
help="output record that match the filter criteria")
parser.add_argument("path", type=str, help="path to the log file")
args = parser.parse_args()
if os.path.exists(args.path):
f = open(args.path, "rb")
filter_ = None
if args.filter:
filter_ = NATSessionFilter(args.filter)
read_session_log_file(f, args.start_from, args.stop_at, args.length,
filter_)
f.close()
else:
print("ERROR: file not found.")
exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment