Last active
March 6, 2020 13:39
-
-
Save englehardt/5860308c5a69e622bf15b16735343934 to your computer and use it in GitHub Desktop.
Leak detection code used in https://freedom-to-tinker.com/tag/noboundaries/
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from urlparse import urlparse | |
from Crypto.Hash import MD2 | |
import pandas as pd | |
import cookies as ck | |
import hackercodecs # noqa | |
import hashlib | |
import pyblake2 | |
import urllib | |
import sha3 | |
import mmh3 | |
import mmhash | |
import base64 | |
import base58 | |
import zlib | |
import json | |
import re | |
from urllib import quote_plus | |
# DELIMITERS = re.compile('[&|\,]') | |
DELIMITERS = re.compile('[&|\,]|%s|%s' % (quote_plus("="), quote_plus("&"))) | |
EXTENSION_RE = re.compile('\.[A-Za-z]{2,4}$') | |
ENCODING_LAYERS = 3 | |
ENCODINGS_NO_ROT = ['base16', 'base32', 'base58', 'base64', | |
'urlencode', 'yenc', 'entity', | |
'deflate', 'zlib', 'gzip'] | |
LIKELY_ENCODINGS = ['base16', 'base32', 'base58', 'base64', | |
'urlencode', 'yenc', 'entity'] | |
HASHES = ['md2', 'md4', 'md5', 'sha1', 'sha256', 'sha224', 'sha384', | |
'sha512', 'sha3_224', 'sha3_256', 'sha3_384', 'sha3_512', 'mmh2', | |
'mmh2_unsigned', 'mmh3_32', 'mmh3_64_1', 'mmh3_64_2', 'mmh3_128', | |
'ripemd160', 'whirlpool', 'blake2b', 'blake2s'] | |
class Hasher(): | |
def __init__(self): | |
# Define Supported hashes | |
hashes = dict() | |
hashes['md2'] = lambda x: self._get_md2_hash(x) | |
hashes['md4'] = lambda x: self._get_hashlib_hash('md4', x) | |
hashes['md5'] = lambda x: hashlib.md5(x).hexdigest() | |
hashes['sha'] = lambda x: self._get_hashlib_hash('sha', x) | |
hashes['sha1'] = lambda x: hashlib.sha1(x).hexdigest() | |
hashes['sha256'] = lambda x: hashlib.sha256(x).hexdigest() | |
hashes['sha224'] = lambda x: hashlib.sha224(x).hexdigest() | |
hashes['sha384'] = lambda x: hashlib.sha384(x).hexdigest() | |
hashes['sha512'] = lambda x: hashlib.sha512(x).hexdigest() | |
hashes['sha3_224'] = lambda x: sha3.sha3_224(x).hexdigest() | |
hashes['sha3_256'] = lambda x: sha3.sha3_256(x).hexdigest() | |
hashes['sha3_384'] = lambda x: sha3.sha3_384(x).hexdigest() | |
hashes['sha3_512'] = lambda x: sha3.sha3_512(x).hexdigest() | |
hashes['mmh2'] = lambda x: str(mmhash.get_hash(x)) | |
hashes['mmh2_unsigned'] = lambda x: str(mmhash.get_unsigned_hash(x)) | |
hashes['mmh3_32'] = lambda x: str(mmh3.hash(x)) | |
hashes['mmh3_64_1'] = lambda x: str(mmh3.hash64(x)[0]) | |
hashes['mmh3_64_2'] = lambda x: str(mmh3.hash64(x)[1]) | |
hashes['mmh3_128'] = lambda x: str(mmh3.hash128(x)) | |
hashes['ripemd160'] = lambda x: self._get_hashlib_hash('ripemd160', x) | |
hashes['whirlpool'] = lambda x: self._get_hashlib_hash('whirlpool', x) | |
hashes['blake2b'] = lambda x: pyblake2.blake2b(x).hexdigest() | |
hashes['blake2s'] = lambda x: pyblake2.blake2s(x).hexdigest() | |
hashes['crc32'] = lambda x: str(zlib.crc32(x)) | |
hashes['adler32'] = lambda x: str(zlib.adler32(x)) | |
self._hashes = hashes | |
self.hashes_and_checksums = self._hashes.keys() | |
self.supported_hashes = HASHES | |
def _get_hashlib_hash(self, name, string): | |
"""Use for hashlib hashes that don't have a shortcut""" | |
hasher = hashlib.new(name) | |
hasher.update(string) | |
return hasher.hexdigest() | |
def _get_md2_hash(self, string): | |
"""Compute md2 hash""" | |
md2 = MD2.new() | |
md2.update(string) | |
return md2.hexdigest() | |
def get_hash(self, hash_name, string): | |
"""Compute the desired hash""" | |
return self._hashes[hash_name](string) | |
class Encoder(): | |
def __init__(self): | |
# Define supported encodings | |
encodings = dict() | |
encodings['base16'] = lambda x: base64.b16encode(x) | |
encodings['base32'] = lambda x: base64.b32encode(x) | |
encodings['base58'] = lambda x: base58.b58encode(x) | |
encodings['base64'] = lambda x: base64.b64encode(x) | |
encodings['urlencode'] = lambda x: urllib.quote_plus(x) | |
encodings['deflate'] = lambda x: self._compress_with_zlib('deflate', x) | |
encodings['zlib'] = lambda x: self._compress_with_zlib('zlib', x) | |
encodings['gzip'] = lambda x: self._compress_with_zlib('gzip', x) | |
encodings['json'] = lambda x: json.dumps(x) | |
encodings['binary'] = lambda x: x.encode('bin') | |
encodings['entity'] = lambda x: x.encode('entity') | |
encodings['rot1'] = lambda x: x.encode('rot1') | |
encodings['rot10'] = lambda x: x.encode('rot10') | |
encodings['rot11'] = lambda x: x.encode('rot11') | |
encodings['rot12'] = lambda x: x.encode('rot12') | |
encodings['rot13'] = lambda x: x.encode('rot13') | |
encodings['rot14'] = lambda x: x.encode('rot14') | |
encodings['rot15'] = lambda x: x.encode('rot15') | |
encodings['rot16'] = lambda x: x.encode('rot16') | |
encodings['rot17'] = lambda x: x.encode('rot17') | |
encodings['rot18'] = lambda x: x.encode('rot18') | |
encodings['rot19'] = lambda x: x.encode('rot19') | |
encodings['rot2'] = lambda x: x.encode('rot2') | |
encodings['rot20'] = lambda x: x.encode('rot20') | |
encodings['rot21'] = lambda x: x.encode('rot21') | |
encodings['rot22'] = lambda x: x.encode('rot22') | |
encodings['rot23'] = lambda x: x.encode('rot23') | |
encodings['rot24'] = lambda x: x.encode('rot24') | |
encodings['rot25'] = lambda x: x.encode('rot25') | |
encodings['rot3'] = lambda x: x.encode('rot3') | |
encodings['rot4'] = lambda x: x.encode('rot4') | |
encodings['rot5'] = lambda x: x.encode('rot5') | |
encodings['rot6'] = lambda x: x.encode('rot6') | |
encodings['rot7'] = lambda x: x.encode('rot7') | |
encodings['rot8'] = lambda x: x.encode('rot8') | |
encodings['rot9'] = lambda x: x.encode('rot9') | |
encodings['yenc'] = lambda x: x.encode('yenc') | |
self._encodings = encodings | |
self.supported_encodings = self._encodings.keys() | |
def _compress_with_zlib(self, compression_type, string, level=6): | |
"""Compress in one of the zlib supported formats: zlib, gzip, or deflate. | |
For a description see: http://stackoverflow.com/a/22311297/6073564 | |
""" | |
if compression_type == 'deflate': | |
compressor = zlib.compressobj(level, zlib.DEFLATED, | |
-zlib.MAX_WBITS) | |
elif compression_type == 'zlib': | |
compressor = zlib.compressobj(level, zlib.DEFLATED, | |
zlib.MAX_WBITS) | |
elif compression_type == 'gzip': | |
compressor = zlib.compressobj(level, zlib.DEFLATED, | |
zlib.MAX_WBITS | 16) | |
else: | |
raise ValueError("Unsupported zlib compression format %s." % | |
compression_type) | |
return compressor.compress(string) + compressor.flush() | |
def encode(self, encoding, string): | |
"""Encode `string` in desired `encoding`""" | |
return self._encodings[encoding](string) | |
class DecodeException(Exception): | |
def __init__(self, message, error): | |
super(DecodeException, self).__init__(message) | |
self.error = error | |
class Decoder(): | |
def __init__(self): | |
# Define supported encodings | |
decodings = dict() | |
decodings['base16'] = lambda x: base64.b16decode(x) | |
decodings['base32'] = lambda x: base64.b32decode(x) | |
decodings['base58'] = lambda x: base58.b58decode(x) | |
decodings['base64'] = lambda x: base64.b64decode(x) | |
decodings['urlencode'] = lambda x: urllib.unquote_plus(x) | |
decodings['deflate'] = lambda x: self._decompress_with_zlib('deflate', | |
x) | |
decodings['zlib'] = lambda x: self._decompress_with_zlib('zlib', x) | |
decodings['gzip'] = lambda x: self._decompress_with_zlib('gzip', x) | |
decodings['json'] = lambda x: json.loads(x) | |
decodings['binary'] = lambda x: x.decode('bin') | |
decodings['entity'] = lambda x: x.decode('entity') | |
decodings['rot1'] = lambda x: x.decode('rot1') | |
decodings['rot10'] = lambda x: x.decode('rot10') | |
decodings['rot11'] = lambda x: x.decode('rot11') | |
decodings['rot12'] = lambda x: x.decode('rot12') | |
decodings['rot13'] = lambda x: x.decode('rot13') | |
decodings['rot14'] = lambda x: x.decode('rot14') | |
decodings['rot15'] = lambda x: x.decode('rot15') | |
decodings['rot16'] = lambda x: x.decode('rot16') | |
decodings['rot17'] = lambda x: x.decode('rot17') | |
decodings['rot18'] = lambda x: x.decode('rot18') | |
decodings['rot19'] = lambda x: x.decode('rot19') | |
decodings['rot2'] = lambda x: x.decode('rot2') | |
decodings['rot20'] = lambda x: x.decode('rot20') | |
decodings['rot21'] = lambda x: x.decode('rot21') | |
decodings['rot22'] = lambda x: x.decode('rot22') | |
decodings['rot23'] = lambda x: x.decode('rot23') | |
decodings['rot24'] = lambda x: x.decode('rot24') | |
decodings['rot25'] = lambda x: x.decode('rot25') | |
decodings['rot3'] = lambda x: x.decode('rot3') | |
decodings['rot4'] = lambda x: x.decode('rot4') | |
decodings['rot5'] = lambda x: x.decode('rot5') | |
decodings['rot6'] = lambda x: x.decode('rot6') | |
decodings['rot7'] = lambda x: x.decode('rot7') | |
decodings['rot8'] = lambda x: x.decode('rot8') | |
decodings['rot9'] = lambda x: x.decode('rot9') | |
decodings['yenc'] = lambda x: x.decode('yenc') | |
self._decodings = decodings | |
self.supported_encodings = self._decodings.keys() | |
def _decompress_with_zlib(self, compression_type, string, level=9): | |
"""Compress in one of the zlib supported formats: zlib, gzip, or deflate. | |
For a description see: http://stackoverflow.com/a/22311297/6073564 | |
""" | |
if compression_type == 'deflate': | |
return zlib.decompress(string, -zlib.MAX_WBITS) | |
elif compression_type == 'zlib': | |
return zlib.decompress(string, zlib.MAX_WBITS) | |
elif compression_type == 'gzip': | |
return zlib.decompress(string, zlib.MAX_WBITS | 16) | |
else: | |
raise ValueError("Unsupported zlib compression format %s." % | |
compression_type) | |
def decode_error(self): | |
"""Catch-all error for all supported decoders""" | |
def decode(self, encoding, string): | |
"""Decode `string` encoded by `encoding`""" | |
try: | |
return self._decodings[encoding](string) | |
except Exception as e: | |
raise DecodeException( | |
'Error while trying to decode %s' % encoding, | |
e | |
) | |
class LeakDetector(): | |
def __init__(self, search_strings, precompute_hashes=True, hash_set=None, | |
hash_layers=2, precompute_encodings=True, encoding_set=None, | |
encoding_layers=2, debugging=False): | |
"""LeakDetector searches URL, POST bodies, and cookies for leaks. | |
The detector is constructed with a set of search strings (given by | |
the `search_strings` parameters. It has several methods to check for | |
leaks containing these strings in URLs, POST bodies, and cookie header | |
strings. | |
Parameters | |
========== | |
search_strings : list | |
LeakDetector will search for leaks containing any item in this list | |
precompute_hashes : bool | |
Set to `True` to include precomputed hashes in the candidate set. | |
hash_set : list | |
List of hash functions to use when building the set of candidate | |
strings. | |
hash_layers : int | |
The detector will find instances of `search_string` iteratively | |
hashed up to `hash_layers` times by any combination of supported | |
hashes. | |
precompute_encodings : bool | |
Set to `True` to include precomputed encodings in the candidate set | |
encoding_set : list | |
List of encodings to use when building the set of candidate | |
strings. | |
encoding_layers : int | |
The detector will find instances of `search_string` iteratively | |
encoded up to `encoding_layers` times by any combination of | |
supported encodings. | |
debugging : bool | |
Set to `True` to enable a verbose output. | |
""" | |
self.search_strings = search_strings | |
self._min_length = min([len(x) for x in search_strings]) | |
self._hasher = Hasher() | |
self._hash_set = hash_set | |
self._hash_layers = hash_layers | |
self._encoder = Encoder() | |
self._encoding_set = encoding_set | |
self._encoding_layers = encoding_layers | |
self._decoder = Decoder() | |
self._precompute_pool = dict() | |
# If hash/encoding sets aren't specified, use all available. | |
if self._hash_set is None: | |
self._hash_set = self._hasher.supported_hashes | |
if self._encoding_set is None: | |
self._encoding_set = self._encoder.supported_encodings | |
self._build_precompute_pool(precompute_hashes, precompute_encodings) | |
self._debugging = debugging | |
def _compute_hashes(self, string, layers, prev_hashes=tuple()): | |
"""Returns all iterative hashes of `string` up to the | |
specified number of `layers`""" | |
for h in self._hasher.supported_hashes: | |
hashed_string = self._hasher.get_hash(h, string) | |
if hashed_string == string: # skip no-ops | |
continue | |
hash_stack = (h,) + prev_hashes | |
self._precompute_pool[hashed_string] = hash_stack | |
if layers > 1: | |
self._compute_hashes(hashed_string, layers-1, hash_stack) | |
def _compute_encodings(self, string, layers, prev_encodings=tuple()): | |
"""Returns all iterative encodings of `string` up to the | |
specified number of `layers`""" | |
for enc in self._encoding_set: | |
encoded_string = str(self._encoder.encode(enc, string)) | |
if encoded_string == string: # skip no-ops | |
continue | |
encoding_stack = (enc,) + prev_encodings | |
self._precompute_pool[encoded_string] = encoding_stack | |
if layers > 1: | |
self._compute_encodings(encoded_string, layers-1, | |
encoding_stack) | |
def _build_precompute_pool(self, precompute_hashes, precompute_encodings): | |
"""Build a pool of hashes for the given search string""" | |
seed_strings = list() | |
for string in self.search_strings: | |
seed_strings.append(string) | |
if string.startswith('http'): | |
continue | |
all_lower = string.lower() | |
if all_lower != string: | |
seed_strings.append(string.lower()) | |
all_upper = string.upper() | |
if all_upper != string: | |
seed_strings.append(string.upper()) | |
strings = list() | |
for string in seed_strings: | |
strings.append(string) | |
# If the search string appears to be an email address, we also want | |
# to include just the username portion of the URL, and the address | |
# and username with any '.'s removed from the username (since these | |
# are optional in Gmail). | |
if '@' in string: | |
parts = string.rsplit('@') | |
if len(parts) == 2: | |
uname, domain = parts | |
strings.append(uname) | |
strings.append(re.sub('\.', '', uname)) | |
strings.append(re.sub('\.', '', uname) + '@' + domain) | |
# Domain searches have too many false positives | |
# strings.append(parts[1]) | |
# strings.append(parts[1].rsplit('.', 1)[0]) | |
# The URL tokenizer strips file extensions. So if our search string | |
# has a file extension we should also search for a stripped version | |
if re.match(EXTENSION_RE, string): | |
strings.append(re.sub(EXTENSION_RE, '', string)) | |
for string in strings: | |
self._precompute_pool[string] = (string,) | |
self._min_length = min([len(x) for x in self._precompute_pool.keys()]) | |
initial_items = self._precompute_pool.items() | |
if precompute_hashes: | |
for string, name in initial_items: | |
self._compute_hashes(string, self._hash_layers, name) | |
if precompute_encodings: | |
for string, name in initial_items: | |
self._compute_encodings(string, self._encoding_layers, name) | |
def _split_on_delims(self, string, rv_parts, rv_named): | |
"""Splits a string on several delimiters""" | |
if string == '': | |
return | |
parts = set(re.split(DELIMITERS, string)) | |
if '' in parts: | |
parts.remove('') | |
for part in parts: | |
if part == '': | |
continue | |
count = part.count('=') | |
if count != 1: | |
rv_parts.add(part) | |
if count == 0: | |
continue | |
n, k = part.split('=', 1) | |
if len(n) > 0 and len(k) > 0: | |
rv_named.add((n, k)) | |
else: | |
rv_parts.add(part) | |
def check_if_in_precompute_pool(self, string): | |
"""Returns a tuple that lists the (possibly layered) hashes or | |
encodings that result in input string | |
""" | |
try: | |
return self._precompute_pool[str(string)] | |
except KeyError: | |
return | |
except (UnicodeDecodeError, UnicodeEncodeError): | |
return | |
def check_for_leak(self, string, layers=1, prev_encodings=tuple(), | |
prev=''): | |
"""Check if given string contains a leak""" | |
# Short tokens won't contain email address | |
if len(string) < self._min_length: | |
return | |
# Check if direct hash or plaintext | |
rv = self.check_if_in_precompute_pool(string) | |
if rv is not None: | |
return prev_encodings + rv | |
# Try encodings | |
for encoding in self._encoding_set: | |
# multiple rots are unnecessary | |
if encoding.startswith('rot') and prev.startswith('rot'): | |
continue | |
try: | |
decoded = self._decoder.decode(encoding, string) | |
if type(decoded) == int or type(decoded) == long: | |
decoded = str(decoded) | |
except DecodeException: # means this isn't the correct decoding | |
continue | |
if decoded == string: # don't add no-ops | |
continue | |
if decoded is None: # Empty decodings aren't useful | |
continue | |
encoding_stack = prev_encodings + (encoding,) | |
if layers > 1: | |
rv = self.check_for_leak(decoded, layers-1, | |
encoding_stack, encoding) | |
if rv is not None: | |
return rv | |
else: | |
rv = self.check_if_in_precompute_pool(decoded) | |
if rv is not None: | |
return encoding_stack + rv | |
return | |
def _check_parts_for_leaks(self, tokens, parameters, nlayers): | |
"""Check token and parameter string parts for leaks""" | |
leaks = list() | |
for token in tokens: | |
leak = self.check_for_leak(token, layers=nlayers) | |
if leak is not None: | |
leaks.append(leak) | |
for name, value in parameters: | |
leak = self.check_for_leak(value, layers=nlayers) | |
if leak is not None: | |
leaks.append(leak) | |
leak = self.check_for_leak(name, layers=nlayers) | |
if leak is not None: | |
leaks.append(leak) | |
return leaks | |
def _split_url(self, url): | |
"""Split url path and query string on delimiters""" | |
tokens = set() | |
parameters = set() | |
try: | |
purl = urlparse(url) | |
except ValueError: | |
print "Can't parse url:", url | |
return [], [] | |
path_parts = purl.path.split('/') | |
for part in path_parts: | |
if not part.endswith('.com'): | |
part = re.sub(EXTENSION_RE, '', part) | |
self._split_on_delims(part, tokens, parameters) | |
self._split_on_delims(purl.query, tokens, parameters) | |
self._split_on_delims(purl.fragment, tokens, parameters) | |
return tokens, parameters | |
def check_url(self, url, encoding_layers=3, substring_search=True): | |
"""Check if a given url contains a leak""" | |
tokens, parameters = self._split_url(url) | |
if self._debugging: | |
print "URL tokens:" | |
for token in tokens: | |
print token | |
print "\nURL parameters:" | |
for key, value in parameters: | |
print "Key: %s | Value: %s" % (key, value) | |
return self._check_whole_and_parts_for_leaks( | |
url, tokens, parameters, encoding_layers, substring_search) | |
def _get_header_str(self, header_str, header_name): | |
"""Returns the header string parsed from `header_str`""" | |
for item in json.loads(header_str): | |
if item[0] == header_name: | |
return item[1] | |
return "" | |
def _split_cookie(self, cookie_str, from_request=True): | |
"""Returns all parsed parts of the cookie names and values""" | |
tokens = set() | |
parameters = set() | |
try: | |
if from_request: | |
cookies = ck.Cookies.from_request(cookie_str) | |
else: | |
cookies = ck.Cookies.from_response(cookie_str, | |
ignore_bad_cookies=True) | |
except (ck.InvalidCookieError, UnicodeDecodeError, KeyError): | |
return tokens, parameters # return empty sets | |
for cookie in cookies.values(): | |
self._split_on_delims(cookie.name, tokens, parameters) | |
self._split_on_delims(cookie.value, tokens, parameters) | |
return tokens, parameters | |
def get_location_str(self, header_str): | |
return self._get_header_str(header_str, "Location") | |
def get_referrer_str(self, header_str): | |
return self._get_header_str(header_str, "Referer") | |
def get_cookie_str(self, header_str, from_request=True): | |
if not header_str: | |
return "" | |
if from_request: | |
header_name = 'Cookie' | |
else: | |
header_name = 'Set-Cookie' | |
return self._get_header_str(header_str, header_name) | |
def check_cookies(self, header_str, encoding_layers=3, | |
from_request=True, substring_search=True): | |
"""Check the cookies portion of the header string for leaks""" | |
cookie_str = self.get_cookie_str(header_str, from_request) | |
if not cookie_str: | |
return list() | |
tokens, parameters = self._split_cookie(header_str, from_request=from_request) | |
return self._check_whole_and_parts_for_leaks( | |
cookie_str, tokens, parameters, encoding_layers, substring_search) | |
def check_location_header(self, location_str, encoding_layers=3, | |
substring_search=True): | |
"""Check the Location HTTP response header for leaks.""" | |
if location_str == '': | |
return list() | |
tokens, parameters = self._split_url(location_str) | |
return self._check_whole_and_parts_for_leaks( | |
location_str, tokens, parameters, encoding_layers, | |
substring_search) | |
def check_referrer_header(self, header_str, encoding_layers=3, | |
substring_search=True): | |
"""Check the Referer HTTP request header for leaks.""" | |
if header_str == '': | |
return list() | |
referrer_str = self.get_referrer_str(header_str) | |
# We use this check instead of =='' | |
# since _get_header_str may return None | |
if not referrer_str: | |
return list() | |
# print "referrer_str", referrer_str | |
tokens, parameters = self._split_url(referrer_str) | |
return self._check_whole_and_parts_for_leaks( | |
referrer_str, tokens, parameters, encoding_layers, | |
substring_search) | |
def _check_whole_and_parts_for_leaks(self, input_string, tokens, | |
parameters, encoding_layers, | |
substring_search): | |
"""Search an input string and its parts for leaks.""" | |
results = self._check_parts_for_leaks(tokens, parameters, | |
encoding_layers) | |
if substring_search: | |
substr_results = self.substring_search(input_string, max_layers=2) | |
# filter repeating results | |
return list(set(results + substr_results)) | |
else: | |
return results | |
def substring_search(self, input_string, max_layers=None): | |
"""Do a substring search for all precomputed hashes/encodings | |
`max_layers` limits the number of encoding/hashing layers used in the | |
substring search (to limit time). The default is no limit (`None`). | |
""" | |
if input_string is None or input_string == '': | |
return list() | |
try: | |
input_string = input_string.encode('utf8') | |
except (UnicodeDecodeError, UnicodeEncodeError): | |
print "ERROR encoding %s" % input_string | |
return list() | |
leaks = list() | |
for string, transform_stack in self._precompute_pool.items(): | |
if max_layers and len(transform_stack) > (max_layers + 1): | |
continue | |
if string in input_string: | |
leaks.append(transform_stack) | |
return leaks |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment