Last active
June 14, 2017 16:57
-
-
Save miou-gh/7b90fcea356d901d190ee0457fea31bc to your computer and use it in GitHub Desktop.
WPE PAC to SPT Converter with Filtering
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import struct | |
| import sys | |
| import re | |
| import argparse | |
| class SPT_Packet: | |
| def __init__(self, name, data): | |
| self.name = name | |
| self.data = data | |
| class SPT: | |
| def __init__(self): | |
| pass | |
| def read_int(self, bytes, index): | |
| return index + 4, struct.unpack('i', bytes[index:index+4])[0] | |
| def read_str(self, bytes, index, count): | |
| return index + count, struct.unpack(str(count) + 's', bytes[index:index+count])[0] | |
| def read_bytes(self, bytes, index, count): | |
| return index + count, bytearray(struct.unpack(str(count) + 's', bytes[index:index+count])[0]) | |
| def deserialize(self, input_bytes): | |
| input_bytes = bytearray(input_bytes) | |
| packets = [] | |
| index, capacity, = self.read_int(input_bytes, 0) | |
| for i in range(0, capacity): | |
| index, name_length, = self.read_int(input_bytes, index) | |
| index, name, = self.read_str(input_bytes, index, name_length) | |
| index, data_length, = self.read_int(input_bytes, index) | |
| index, data, = self.read_bytes(input_bytes, index, data_length); | |
| packets.append(SPT_Packet(name, data)) | |
| pass | |
| return packets | |
| def serialize(self, packets): | |
| bytes = bytearray() | |
| bytes += struct.pack('i', len(packets)) | |
| for packet in packets: | |
| bytes += struct.pack('i', len(packet.name)) | |
| bytes += struct.pack(str(len(packet.name)) + 's', packet.name) | |
| bytes += struct.pack('i', len(packet.data)) | |
| bytes += struct.pack(str(len(packet.data)) + 's', packet.data) | |
| return bytes | |
| class PAC_Packet: | |
| def __init__(self, data, socket_id, source_addr, dest_addr, packet_size, function): | |
| self.data = data | |
| self.socket_id = socket_id | |
| self.source_addr = source_addr | |
| self.dest_addr = dest_addr | |
| self.packet_size = packet_size | |
| self.function = function | |
| class PAC: | |
| def __init__(self): | |
| pass | |
| def read_int(self, bytes, index): | |
| return index + 4, struct.unpack('i', bytes[index:index+4])[0] | |
| def read_str(self, bytes, index, count): | |
| return index + count, struct.unpack(str(count) + 's', bytes[index:index+count])[0] | |
| def read_bytes(self, bytes, index, count): | |
| return index + count, bytearray(struct.unpack(str(count) + 's', bytes[index:index+count])[0]) | |
| def deserialize(self, input_bytes): | |
| input_bytes = bytearray(input_bytes) | |
| packets = [] | |
| index, capacity, = self.read_int(input_bytes, 0) | |
| for i in range(0, capacity): | |
| index, data_length = self.read_int(input_bytes, index) | |
| index, data = self.read_bytes(input_bytes, index, data_length) | |
| index, socket_id = self.read_int(input_bytes, index) | |
| index, source_addr_length = self.read_int(input_bytes, index) | |
| index, source_addr = self.read_str(input_bytes, index, source_addr_length) | |
| index, dest_addr_length = self.read_int(input_bytes, index) | |
| index, dest_addr = self.read_str(input_bytes, index, dest_addr_length) | |
| index, packet_size_length = self.read_int(input_bytes, index) | |
| index, packet_size = self.read_str(input_bytes, index, packet_size_length) | |
| index, function_length = self.read_int(input_bytes, index) | |
| index, function = self.read_str(input_bytes, index, function_length) | |
| packets.append(PAC_Packet(data, str(socket_id), str(source_addr), str(dest_addr), str(packet_size), str(function))) | |
| pass | |
| return packets | |
| def pairwise(iterable): | |
| "s -> (s0, s1), (s2, s3), (s4, s5), ..." | |
| a = iter(iterable) | |
| return zip(a, a) | |
| reader = PAC() | |
| writer = SPT() | |
| parser = argparse.ArgumentParser(description='PAC to SPT converter') | |
| parser.add_argument('-i', type=str, nargs='+', help='the path of the PAC file to input') | |
| parser.add_argument('-o', type=str, nargs='+', help='the path of the SPT file to output') | |
| parser.add_argument('-f', type=str, nargs='+', help='the regular expression filters to apply which when matched, will be added to the SPT output.') | |
| parser.usage = '''WPESelect.py -i filename.pac -o filename.spt -f source_addr 192.168.0.100 dest_addr 192.168.0.255 function WSASend | |
| This would effectively filter packets that originate from 192.168.0.100 and are outgoing to 192.168.0.255 | |
| Filters: | |
| source_addr The originating address which sent the packet | |
| dest_addr The destination address which received the packet | |
| socket_id The Socket ID the packet was received/sent on | |
| packet_size The size of the packet which was sent/received | |
| function The function of the packet (i.e: Send SendTo Recv RecvFrom WSASend WSASendTo WSARecv WSARecvFrom) | |
| ''' | |
| args = parser.parse_args() | |
| with open(args.i[0], 'rb') as pac: | |
| pac_packets = reader.deserialize(pac.read()) | |
| spt_packets = list() | |
| for pac_packet in pac_packets: | |
| match = False | |
| if args.f is None: | |
| match = True | |
| else: | |
| for property, pattern in pairwise(args.f): | |
| if re.compile(pattern).search( pac_packet.__dict__[property] ) is None: | |
| match = False | |
| break | |
| else: | |
| match = True | |
| if match: | |
| spt_packet = SPT_Packet(b'New Send', pac_packet.data) | |
| spt_packets.append(spt_packet) | |
| pass | |
| output = writer.serialize(spt_packets) | |
| with open(args.o[0], 'wb') as spt: | |
| spt.write(output) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment