Last active
September 22, 2018 13:58
-
-
Save fahadysf/de7810a308115395d5373f1248b78b2c to your computer and use it in GitHub Desktop.
This script extracts TCP Flows from a Packet Capture files and finds common patterns (hex) of give length (default 7-bytes). Useful for Custom App-ID Creation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Copyright (C) 2018 | |
Author: Fahad Yousuf | |
This script extracts TCP Flows from a Packet Capture files and finds | |
common patterns (hex) of give length (default 7-bytes) | |
Usage: flowextractor.py [-h] [-d DPORT] inputfile | |
positional arguments: | |
inputfile Input file in pcap format | |
optional arguments: | |
-h, --help show this help message and exit | |
-d DPORT, --dport DPORT | |
""" | |
import dpkt | |
import hashlib | |
from dpkt.compat import compat_ord | |
import datetime | |
import socket | |
import binascii | |
import string | |
import argparse | |
#Setting up the argument parser | |
parser = argparse.ArgumentParser() | |
parser.add_argument("inputfile", help="Input file in pcap format", action='store', type=str) | |
parser.add_argument("-d", "--dport", help="Destination Port of interest", action='store', type=int) | |
parser.add_argument("-n", "--length", help="Minimum length of pattern to match", default=7, action='store', type=int) | |
args = parser.parse_args() | |
filename = args.inputfile | |
# Helper functions to convert MAC and IP into readable strings | |
def mac_addr(address): | |
"""Convert a MAC address to a readable/printable string | |
Args: | |
address (str): a MAC address in hex form (e.g. '\x01\x02\x03\x04\x05\x06') | |
Returns: | |
str: Printable/readable MAC address | |
""" | |
return ':'.join('%02x' % compat_ord(b) for b in address) | |
def inet_to_str(inet): | |
"""Convert inet object to a string | |
Args: | |
inet (inet struct): inet network address | |
Returns: | |
str: Printable/readable IP address | |
""" | |
# First try ipv4 and then ipv6 | |
try: | |
return socket.inet_ntop(socket.AF_INET, inet) | |
except ValueError: | |
return socket.inet_ntop(socket.AF_INET6, inet) | |
def hex_to_ascii(hexstr): | |
if len(hexstr)%2==1: | |
hexstr = "0"+hexstr | |
data = (hexstr).decode("hex") | |
output = "".join(c if c in string.printable else '.' for c in data) | |
return output.strip() | |
def generate_flow_name(src_ip, dest_ip, sport, dport, seq): | |
""" | |
""" | |
return ("{0}:{1}->{2}:{3} ({4})").format(src_ip, int(sport), dest_ip, int(dport), int(seq)) | |
def generate_flow_id(flow_name): | |
"""Generate an Name for a specific TCP flow. | |
Args: | |
flow_name (str): Flow name | |
Returns: | |
str: Unique ID (SHA256 hash of flow name) | |
""" | |
return (hashlib.sha256(flow_name)).hexdigest() | |
def common_substr(a, b, k): | |
""" Find first matching substring in two strings | |
Args: | |
a (str): First string | |
b (str): Second string | |
k (int): Length of substring to find | |
Returns: | |
str: First found common substring | |
""" | |
substrs = set(a[i:i+k] for i in range(len(a)-k+1)) | |
for substr in (b[i:i+k] for i in range(len(b)-k+1)): | |
if substr in substrs: | |
return substr | |
return None | |
def find_all_common_substrings(str_a, str_b, size): | |
found_strings = list() | |
substr = common_substr(str_a, str_b, size) | |
#print(("Found common subtstring: {0}").format(substr)) | |
while substr != None: | |
found_strings += [substr] | |
str_a = str_a.replace(substr, '') | |
str_b = str_b.replace(substr, '') | |
substr = common_substr(str_a, str_b, size) | |
return found_strings | |
def find_common_substrings_in_string_array(string_list, size): | |
""" Find first matching substring in list of strings | |
Args: | |
string_list (list): List of strings to search | |
size (int): Length of substring to find | |
Returns: | |
list(str): List of common substrings | |
""" | |
string_list.sort() | |
listlen = len(string_list) | |
common_strings = dict() | |
unchecked_remaining = True | |
unchecked_indexes = list(xrange(listlen)) | |
# Keep checking for commong strings until you have | |
# at-least 1 or have checked all source strings | |
while (common_strings.keys() == []) and (unchecked_remaining == True): | |
if len(unchecked_indexes) >= 2: | |
str_a = string_list[unchecked_indexes.pop()] | |
str_b = string_list[unchecked_indexes.pop()] | |
elif len(unchecked_indexes) == 1: | |
str_a = str_b | |
str_b = string_list[unchecked_indexes.pop()] | |
else: | |
unchecked_remaining = False | |
if unchecked_remaining == True: | |
substr_list = find_all_common_substrings(str_a, str_b, size) | |
if len(substr_list): | |
for substr in substr_list: | |
common_strings[substr] = dict() | |
for cstr in common_strings: | |
common_strings[cstr]["hits"] = 0 | |
for string in string_list: | |
if cstr in string: | |
common_strings[cstr]["hits"] += 1 | |
common_strings[cstr]['count'] = len(string_list) | |
common_strings[cstr]["hit_percentage"] = 100.0 * float(common_strings[cstr]["hits"])/float(common_strings[cstr]['count']) | |
common_strings[cstr]["printable-string"] = hex_to_ascii(cstr) | |
return common_strings | |
def flow_extractor(pcapreader): | |
flows = dict() | |
tcpcounter = 0 | |
ipcounter = 0 | |
seq = 0 | |
# PCAP parsing starts here | |
for ts, pkt in pcapreader: | |
try: | |
eth = dpkt.ethernet.Ethernet(pkt) | |
except: | |
print("Could not process frame at timestamp: %s" % str(ts)) | |
ip = eth.data | |
if (type(ip)== dpkt.ip.IP) and isinstance(ip.data, dpkt.tcp.TCP): | |
tcp = ip.data | |
process = False | |
if args.dport: | |
if (tcp.dport==args.dport) or (tcp.sport==args.dport): | |
process = True | |
else: | |
process = True | |
# Save sequence number of the first segment in TCP flow | |
if process: | |
if ( tcp.flags & dpkt.tcp.TH_SYN ) != 0 and ( tcp.flags & dpkt.tcp.TH_ACK ) == 0: | |
seq = tcp.seq | |
flow_name = generate_flow_name(inet_to_str(ip.src), inet_to_str(ip.dst), tcp.sport, tcp.dport, seq) | |
inverted_flow_string = generate_flow_name(inet_to_str(ip.dst), inet_to_str(ip.src), tcp.dport, tcp.sport, seq) | |
flow_id = generate_flow_id(flow_name) | |
tcpcounter += 1 | |
flows[flow_id] = dict() | |
flows[flow_id]["name"] = flow_name | |
flows[flow_id]["inverted"] = inverted_flow_string | |
flows[flow_id]["src"] = inet_to_str(ip.src) | |
flows[flow_id]["dst"] = inet_to_str(ip.dst) | |
flows[flow_id]["sport"] = tcp.sport | |
flows[flow_id]["dport"] = tcp.dport | |
flows[flow_id]["seq"] = seq | |
flows[flow_id]["payload"] = list() | |
if len(tcp.data): | |
flow_name = generate_flow_name(inet_to_str(ip.src), inet_to_str(ip.dst), tcp.sport, tcp.dport, seq) | |
inverted_flow_string = generate_flow_name(inet_to_str(ip.dst), inet_to_str(ip.src), tcp.dport, tcp.sport, seq) | |
flow_id = generate_flow_id(flow_name) | |
reverse_id = generate_flow_id(inverted_flow_string) | |
payload = binascii.hexlify(tcp.data).decode() | |
if flow_id in flows.keys(): | |
# Only add new payload if it is not a duplicate (already present in the list) | |
if tuple(("forward", payload)) not in flows[flow_id]["payload"]: | |
flows[flow_id]["payload"].append(tuple(("forward", payload))) | |
elif reverse_id in flows.keys(): | |
if tuple(("reverse", payload)) not in flows[reverse_id]["payload"]: | |
flows[reverse_id]["payload"].append(tuple(("reverse", payload))) | |
ipcounter += 1 | |
return flows | |
if __name__ == "__main__": | |
flows = flow_extractor(dpkt.pcap.Reader(open(filename, 'rb'))) | |
# Remove exact duplicate flows: | |
import json | |
request_strings = list() | |
response_strings = list() | |
for i, key in enumerate(flows): | |
for item in flows[key]["payload"][:2]: | |
if item[0] == "forward": | |
request_strings.append(item[1]) | |
elif item[0] == "reverse": | |
response_strings.append(item[1]) | |
print("Request Strings:") | |
print(json.dumps(request_strings, indent=2)) | |
print("Response Strings:") | |
print(json.dumps(response_strings, indent=2)) | |
print("") | |
print(("Common {0}-byte substrings in requests with hit-count").format(args.length)) | |
print(json.dumps(find_common_substrings_in_string_array(request_strings, 2*args.length), indent=2)) | |
print(("Common {0}-byte substrings in responses with hit-count").format(args.length)) | |
print(json.dumps(find_common_substrings_in_string_array(response_strings, 2*args.length), indent=2)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Revision #3 resolves an issue with "Duplicate" (re-transmit) payloads causing issues with detection of responses.