Created
November 29, 2020 08:51
-
-
Save TerryGeng/741400eacbc79d2e22ee88da97c2c4f6 to your computer and use it in GitHub Desktop.
Utility to read Huawei's binary firewall NAT session log. For auditing use.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Utility to read Huawei's binary firewall NAT session log. For auditing use. | |
PLEASE DO RESPECT THE PRIVACY OF OTHER PEOPLE. | |
Log formats are explained in | |
https://support.huawei.com/enterprise/en/doc/EDOC1000107643?section=j00c | |
Author: Terry Geng | |
Date: 2020-11-26 | |
""" | |
import os | |
import argparse | |
import struct | |
from typing import Union | |
from enum import Enum | |
from io import BufferedIOBase, RawIOBase | |
from datetime import datetime | |
from functools import partial | |
import json | |
class ProtocolType(Enum): | |
ICMP = 1 | |
TCP = 6 | |
UDP = 17 | |
class Packet: | |
def jsonify(self): | |
return json.dumps(self.__dict__) | |
class PacketUnpack: | |
packet_format = [ | |
# (struct_format, [description], [expected_val_list], | |
# [attribute_to_write], [handler]) | |
] | |
packet_class = Packet | |
class PacketUnpackInstruction: | |
def __init__(self, struct_format, description=None, | |
expected_val_list=None, attribute_to_write=None, | |
handler=None): | |
self.unpacker = struct.Struct(struct_format) | |
self.size = self.unpacker.size | |
self.parts = self.count_parts(struct_format) | |
self.is_array = (self.parts > 1) | |
self.description = description | |
self.expected_val_list = expected_val_list | |
if expected_val_list: | |
if self.is_array: | |
self.check_func = \ | |
(lambda val: val in self.expected_val_list) | |
else: | |
self.check_func = \ | |
(lambda val: val[0] in self.expected_val_list) | |
else: | |
self.check_func = None | |
if attribute_to_write: | |
if self.is_array: | |
self.write_val_func = ( | |
lambda obj, val: setattr(obj, attribute_to_write, val) | |
) if attribute_to_write else None | |
else: | |
self.write_val_func = ( | |
lambda obj, val: setattr(obj, attribute_to_write, val[0]) | |
) if attribute_to_write else None | |
else: | |
self.write_val_func = None | |
if handler: | |
if self.is_array: | |
self.handler = handler | |
else: | |
self.handler = lambda obj, val: handler(obj, val[0]) | |
else: | |
self.handler = None | |
self.placeholder = (not expected_val_list and \ | |
not attribute_to_write and not handler) | |
@staticmethod | |
def count_parts(struct_format): | |
parts = 0 | |
for s in struct_format: | |
if s.isalpha(): | |
parts += 1 | |
return parts | |
def __init__(self): | |
self.unpack_insts = [ | |
PacketUnpack.PacketUnpackInstruction(*inst) for inst \ | |
in self.packet_format | |
] | |
self.packet_size = sum([i.size for i in self.unpack_insts]) | |
def unpack(self, _io: Union[BufferedIOBase, RawIOBase]): | |
obj = self.packet_class() | |
for inst in self.unpack_insts: | |
if not inst.placeholder: | |
b = _io.read(inst.size) | |
if not b: | |
return None | |
val = inst.unpacker.unpack(b) | |
if inst.check_func and not inst.check_func(val): | |
raise ValueError( | |
f"Expected {inst.expected_val_list} for field " | |
f"{inst.description}, encounter {val} instead." | |
) | |
if inst.handler: | |
inst.handler(obj, val) | |
if inst.write_val_func: | |
inst.write_val_func(obj, val) | |
else: | |
_io.read(inst.size) | |
return obj | |
class NATIPv4SessionPacket(Packet): | |
def __init__(self): | |
self.version = 0 | |
self.gen_time = 0 | |
self.sequence_num = 0 | |
self.protocol_type = None | |
self.src_ip = "" | |
self.src_port = 0 | |
self.src_nat_ip = "" | |
self.src_nat_port = 0 | |
self.dest_ip = "" | |
self.dest_nat_ip = "" | |
self.dest_port = 0 | |
self.dest_nat_port = 0 | |
self.session_creation = 0 | |
self.session_end = 0 | |
self.received_packets = 0 | |
self.received_bytes = 0 | |
self.sent_packets = 0 | |
self.sent_bytes = 0 | |
def _set_protocol(self, protocol_int): | |
self.protocol_type = ProtocolType(protocol_int) | |
def _set_src_ip(self, ints): | |
self.src_ip = f"{ints[0]}.{ints[1]}.{ints[2]}.{ints[3]}" | |
def _set_src_nat_ip(self, ints): | |
self.src_nat_ip = f"{ints[0]}.{ints[1]}.{ints[2]}.{ints[3]}" | |
def _set_dest_ip(self, ints): | |
self.dest_ip = f"{ints[0]}.{ints[1]}.{ints[2]}.{ints[3]}" | |
def _set_dest_nat_ip(self, ints): | |
self.src_nat_ip = f"{ints[0]}.{ints[1]}.{ints[2]}.{ints[3]}" | |
class NATIPv4SessionPacketUnpack(PacketUnpack): | |
packet_format = [ | |
# (struct_format, is_array, [description], [expected_val_list], | |
# [attribute_to_write], [handler]) | |
# Header | |
("!B", "Version", [2, 3], "version", None), | |
("!B", "Log packet type", [4], None, None), | |
("!H", "Num. of sessions in this packet", [1], None, None), | |
("!I", "Time of generation", None, "gen_time", None), | |
("!I", "Sequence number", None, "sequence_num", None), | |
("!H", "Unused", [0], None, None), | |
("!B", "Slot", None, None, None), | |
("!B", "Unused", [0], None, None), | |
# Log Content | |
("!B", "Protocol type", [1, 6, 17], None, NATIPv4SessionPacket._set_protocol), | |
("!B", "Unused", None, None, None), | |
("!B", "IP version", [4], None, None), | |
("!B", "ToS", None, None, None), | |
("!BBBB", "Src IP", None, None, NATIPv4SessionPacket._set_src_ip), | |
("!BBBB", "Src IP after NAT", None, None, NATIPv4SessionPacket._set_src_nat_ip), | |
("!BBBB", "Dest IP", None, None, NATIPv4SessionPacket._set_dest_ip), | |
("!BBBB", "Dest IP after NAT", None, None, NATIPv4SessionPacket._set_dest_nat_ip), | |
("!H", "Src port", None, "src_port", None), | |
("!H", "Src port after NAT", None, "src_nat_port", None), | |
("!H", "Dest port", None, "dest_port", None), | |
("!H", "Dest port after NAT", None, "dest_nat_port", None), | |
("!I", "Session creation time", None, "session_creation", None), | |
("!I", "Session end time", None, "session_end", None), | |
("!I", "Packets received by src", None, "received_packets", None), | |
("!I", "Bytes received by src", None, "received_bytes", None), | |
("!I", "Packets sent by src", None, "sent_packets", None), | |
("!I", "Bytes sent by src", None, "sent_bytes", None), | |
("!HHBBHI", "Reserved fields", None, None, None), | |
] | |
packet_class = NATIPv4SessionPacket | |
def __init__(self): | |
super().__init__() | |
class NATSessionFilter: | |
checkers = { | |
'srcip': (lambda expected, pkt: pkt.src_ip == expected), | |
'srcport': (lambda expected, pkt: pkt.src_port == expected), | |
'destip': (lambda expected, pkt: pkt.dest_ip == expected), | |
'destport': (lambda expected, pkt: pkt.dest_port == expected) | |
} | |
def __init__(self, filter_str): | |
filters = filter_str.split(",") | |
filters = [s.strip() for s in filters] | |
self.filters = [] | |
self.init_filters(filters) | |
def init_filters(self, filters): | |
for fstr in filters: | |
arg, expected = fstr.split("=") | |
arg = arg.strip() | |
expected = expected.strip() | |
not_ = False | |
if arg[0] == '!': | |
not_ = True | |
arg = arg[1:] | |
elif arg[1] == "!": | |
not_ = True | |
arg = arg[:-1] | |
if arg not in self.checkers: | |
raise ValueError(f"Unknown filter parameter: {arg}.") | |
if not not_: | |
self.filters.append(partial(self.checkers[arg], expected)) | |
else: | |
self.filters.append( | |
lambda pkt: not partial(self.checkers[arg], expected)) | |
def match(self, pkt): | |
for flt in self.filters: | |
if not flt(pkt): | |
return False | |
return True | |
def format_file_size(in_bytes): | |
if in_bytes < 1024: | |
return f"{in_bytes}B" | |
elif in_bytes < 1048576: | |
in_bytes /= 1024 | |
return f"{in_bytes:.1f}KB" | |
elif in_bytes < 1073741824: | |
in_bytes /= 1048576 | |
return f"{in_bytes:.1f}MB" | |
elif in_bytes < 1099511627776: | |
in_bytes /= 1073741824 | |
return f"{in_bytes:.1f}GB" | |
return ">1TB" | |
def print_nat_packet_header(): | |
h = "Id Timestamp Src_ip:port Dest_ip:port Bytes/pkt_sent Bytes/pkt_received" | |
print(h) | |
print("=" * len(h)) | |
def format_ip_port(ip, port): | |
return f"{ip}:{port}" | |
def print_nat_packet(p): | |
# Id Timestamp Src_ip:port Dest_ip:port Bytes/pkt_sent Bytes/pkt_received | |
# 00000 0000-00-00T00:00:00 000.000.000.000:00000 000.000.000.000:00000 000000.0(000000) 000000.0(000000) | |
t = datetime.fromtimestamp(p.session_creation).isoformat() | |
print( | |
f"{p.sequence_num:>7d} {t} " | |
f"{format_ip_port(p.src_ip, p.src_port):21s} " | |
f"{format_ip_port(p.dest_ip, p.dest_port):21s} " | |
f"{format_file_size(p.sent_bytes):>7s}/{p.sent_packets:<6d} " | |
f"{format_file_size(p.received_bytes):>7s}/{p.received_packets:<6d}" | |
) | |
def read_session_log_file(fd, start_from=0, stop_at=0, length=0, filter_=None): | |
unpacker = NATIPv4SessionPacketUnpack() | |
packet_size = unpacker.packet_size | |
if start_from > 0: | |
fd.seek(start_from * packet_size) | |
if stop_at == 0 and length > 0: | |
stop_at = packet_size * (start_from + length) | |
error_bytes_before_success = 0 | |
last_pos = 0 | |
count = 0 | |
print_nat_packet_header() | |
while not stop_at or fd.tell() < stop_at: | |
try: | |
last_pos = fd.tell() | |
packet = unpacker.unpack(fd) | |
if not packet: | |
break | |
if not filter_ or filter_.match(packet): | |
print_nat_packet(packet) | |
count += 1 | |
error_bytes_before_success = 0 | |
except ValueError: | |
if error_bytes_before_success > 2 * packet_size: | |
print("ERROR: wrong file format or corrupted file. exit.") | |
exit(1) | |
error_bytes_before_success += 1 | |
fd.seek(last_pos + 1) | |
continue | |
print("============================") | |
print(f"{count} records in total") | |
if __name__ == "__main__": | |
import textwrap | |
parser = argparse.ArgumentParser( | |
formatter_class=argparse.RawDescriptionHelpFormatter, | |
description=textwrap.dedent( | |
"""\ | |
Utility to read Huawei's binary firewall NAT session log. For | |
auditing use. | |
PLEASE DO RESPECT THE PRIVACY OF OTHER PEOPLE. | |
"""), | |
epilog=textwrap.dedent( | |
"""\ | |
Filter string are in the format of | |
[!]cond=val,[cond=val...] | |
where cond can be one of the `srcip`, `dstip`, `srcport`, `dstport`. | |
`!` mark means not. | |
""") | |
) | |
parser.add_argument("--start-from", "-s", dest="start_from", type=int, | |
default=0, | |
help="read from the given index of records") | |
parser.add_argument("--stop-at", "-t", dest="stop_at", type=int, | |
default=0, | |
help="read until the given index of records. 0 means " | |
"read till the end") | |
parser.add_argument("--length", "-l", dest="length", type=int, | |
default=0, | |
help="the number of records to read. 0 means all.") | |
parser.add_argument("--filter", "-f", dest="filter", type=str, default="", | |
help="output record that match the filter criteria") | |
parser.add_argument("path", type=str, help="path to the log file") | |
args = parser.parse_args() | |
if os.path.exists(args.path): | |
f = open(args.path, "rb") | |
filter_ = None | |
if args.filter: | |
filter_ = NATSessionFilter(args.filter) | |
read_session_log_file(f, args.start_from, args.stop_at, args.length, | |
filter_) | |
f.close() | |
else: | |
print("ERROR: file not found.") | |
exit(1) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment