Skip to content

Instantly share code, notes, and snippets.

@mhils
Created November 18, 2015 12:46
Show Gist options
  • Select an option

  • Save mhils/c69c599a40c61b596c7a to your computer and use it in GitHub Desktop.

Select an option

Save mhils/c69c599a40c61b596c7a to your computer and use it in GitHub Desktop.
mitmproxy dump file converter
import uuid
from pprint import pprint
import click
from libmproxy import tnetstring
@click.group()
def cli():
"""mitmproxy dumpfile utilities"""
def read_tnetstring(input):
# tnetstring throw a ValueError on EOF, which is hard to catch
# because they raise ValueErrors for a couple of other reasons.
# Check for EOF to avoid this.
if not input.read(1):
return None
else:
input.seek(-1, 1)
return tnetstring.load(input)
@cli.command("convert")
@click.argument("input", type=click.File('rb'))
@click.argument("output", type=click.File('wb'))
def convert(input, output):
"""Convert a 0.9 dumpfile into a 0.13 dumpfile"""
while True:
data = read_tnetstring(input)
if not data:
break
try:
client_conn = dict(
address=dict(
address=data["request"]["client_conn"]["address"],
use_ipv6=False,
),
clientcert=None,
ssl_established=(data["request"]["scheme"] == "https"),
timestamp_end=data["request"]["timestamp_end"],
timestamp_ssl_setup=data["request"]["ssl_setup_timestamp"],
timestamp_start=data["request"]["timestamp_start"],
)
if data["response"]:
server_conn = dict(
address=dict(
address=(data["request"]["host"], data["request"]["port"]),
use_ipv6=False,
),
cert=data["response"]["cert"],
sni=None,
source_address=dict(
address=("unknown", 0),
use_ipv6=False,
),
ssl_established=(data["request"]["scheme"] == "https"),
timestamp_end=data["response"]["timestamp_end"],
timestamp_ssl_setup=data["request"]["ssl_setup_timestamp"],
timestamp_start=data["response"]["timestamp_start"],
timestamp_tcp_setup=data["response"]["timestamp_start"],
)
else:
server_conn = None
request = dict(
body=data["request"]["content"],
form_in="relative",
form_out="relative",
headers=data["request"]["headers"],
host=data["request"]["host"],
http_version=data["request"]["httpversion"],
is_replay=False,
method=data["request"]["method"],
path=data["request"]["path"],
port=data["request"]["port"],
scheme=data["request"]["scheme"],
timestamp_end=data["request"]["timestamp_end"],
timestamp_start=data["request"]["timestamp_start"],
)
if data["response"]:
response = dict(
body=data["response"]["content"],
headers=data["response"]["headers"],
http_version=data["response"]["httpversion"],
msg=data["response"]["msg"],
status_code=data["response"]["code"],
timestamp_end=data["request"]["timestamp_end"],
timestamp_start=data["request"]["timestamp_start"],
)
else:
response = None
if data["error"]:
error = data["error"]
else:
error = None
v13 = dict(
id=str(uuid.uuid4()),
type="http",
version=(0, 13),
client_conn=client_conn,
server_conn=server_conn,
request=request,
response=response,
error=error
)
tnetstring.dump(v13, output)
except:
print("Failed for the following input:")
pprint(data)
raise
@cli.command("inspect")
@click.argument("input", type=click.File('rb'))
def inspect(input):
"""
pretty-print a dumpfile
"""
while True:
data = read_tnetstring(input)
if not data:
break
pprint(data)
if __name__ == "__main__":
cli()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment