Last active
September 11, 2024 16:14
-
-
Save SeanPesce/29ef6073f3e98dac80e75d7b2dabf94e to your computer and use it in GitHub Desktop.
HTTP Request Replay Script
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# Author: Sean Pesce | |
""" | |
This script takes in a captured (well-formed) HTTP request dump and runs the request. | |
Example input: | |
GET /test HTTP/1.1 | |
Accept:application/json | |
Accept-Charset:UTF-8 | |
Host:test.com | |
""" | |
import requests | |
import sys | |
REQUEST_METHOD = { | |
'GET': requests.get, | |
'HEAD': requests.head, | |
'POST': requests.post, | |
'PUT': requests.put | |
} | |
def do_print(data, out_file=None, encoding='utf8'): | |
if out_file is None: | |
print(data) | |
else: | |
if type(data) == str: | |
data = data.encode(encoding) | |
out_file.write(data + b'\n') | |
return | |
def parse_header_from_line(line, encoding='utf8'): | |
if type(line) == bytes: | |
line = line.decode(encoding) | |
colon_idx = line.find(':') | |
assert colon_idx >= 0, f'Invalid header line:\n{line}' | |
key = line[:colon_idx].strip() | |
val = line[colon_idx+1:].strip() | |
return key, val | |
def do_http_request(http_req_data, use_ssl=True, out_file=None, encoding='utf8'): | |
if b'\r\n' not in http_req_data: | |
hdrs_end = http_req_data.find(b'\n\n') | |
assert hdrs_end >= 0, 'Failed to find double line break' | |
p1 = http_req_data[:hdrs_end] | |
p2 = http_req_data[hdrs_end+2:] | |
http_req_data = p1.replace(b'\n', b'\r\n') + b'\r\n\r\n' + p2 | |
lines = http_req_data.split(b'\r\n') | |
method_line = lines[0].split(b' ') | |
lines = lines[1:] | |
method = method_line[0].decode('ascii') | |
path = method_line[1].decode('ascii') | |
protocol = method_line[2].decode('ascii') | |
headers = {} | |
i = 0 | |
for line in lines: | |
try: | |
key, val = parse_header_from_line(line, encoding) | |
headers[key] = val | |
i += 1 | |
except AssertionError: | |
break | |
while i < len(lines) and lines[i].strip() == b'': | |
i += 1 | |
lines = lines[i:] | |
body = b'\r\n'.join(lines).strip() | |
url = 'https://' | |
if not use_ssl: | |
url = 'http://' | |
url += headers['Host'] + path | |
do_print(f'{method} {path} {protocol}\n\n', out_file) | |
if method in ('GET', 'HEAD'): | |
resp = REQUEST_METHOD[method](url=url, headers=headers) | |
elif method in ('POST', 'PUT'): | |
resp = REQUEST_METHOD[method](url=url, headers=headers, data=body) | |
else: | |
raise ValueError(f'Unsupported HTTP method: {method}') | |
return resp | |
def print_http_response(resp, out_file=None): | |
http_ver = 'HTTP/' | |
if resp.raw.version == 9: | |
http_ver += '0.9' # Not actually in the documentation, but I assume this would be the case | |
elif resp.raw.version == 10: | |
http_ver += '1.0' | |
elif resp.raw.version == 11: | |
http_ver += '1.1' | |
elif resp.raw.version == 20: | |
http_ver += '2.0' # Not actually in the documentation, but I assume this would be the case | |
else: | |
http_ver += '?' | |
do_print(f'{http_ver} {resp.status_code} {resp.reason}', out_file) | |
for hdr in resp.headers: | |
do_print(f'{hdr}: {resp.headers[hdr]}', out_file) | |
do_print('', out_file) | |
if len(resp.content) > 0: | |
# Attempt ASCII decode | |
try: | |
do_print(resp.text, out_file) | |
return | |
except UnicodeEncodeError: | |
pass | |
# Attempt unicode decode | |
try: | |
do_print(resp.content.decode('utf8'), out_file) | |
return | |
except UnicodeEncodeError: | |
pass | |
# Print raw Python byte string | |
do_print(resp.content, out_file) | |
return | |
if __name__ == '__main__': | |
NO_SSL_FLAG = '--no-ssl' | |
if len(sys.argv) < 2: | |
print(f'Usage:\n {sys.argv[0]} <HTTP request file> [{NO_SSL_FLAG}] [output file]') | |
exit() | |
USE_SSL = True | |
if (len(sys.argv) > 2 and sys.argv[2] == NO_SSL_FLAG) or (len(sys.argv) > 3 and sys.argv[3] == NO_SSL_FLAG): | |
USE_SSL = False | |
OUT_FILE = None | |
OUT_FPATH = None | |
if len(sys.argv) > 2 and sys.argv[2] != NO_SSL_FLAG: | |
OUT_FPATH = sys.argv[2] | |
elif len(sys.argv) > 3 and sys.argv[3] != NO_SSL_FLAG: | |
OUT_FPATH = sys.argv[3] | |
if OUT_FPATH is not None: | |
OUT_FILE = open(OUT_FPATH, 'wb') | |
FPATH = sys.argv[1] | |
HTTP_DATA = b'' | |
with open(FPATH, 'rb') as f: | |
HTTP_DATA = f.read() | |
HTTP_DATA = HTTP_DATA.lstrip() | |
RESPONSE = do_http_request(HTTP_DATA, USE_SSL, OUT_FILE) | |
print_http_response(RESPONSE, OUT_FILE) | |
if OUT_FILE is not None: | |
OUT_FILE.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment