Skip to content

Instantly share code, notes, and snippets.

@sebdelsol
Last active October 8, 2024 16:43
Show Gist options
  • Save sebdelsol/2776842e655bd02b45a5be20c9aa7c09 to your computer and use it in GitHub Desktop.
Save sebdelsol/2776842e655bd02b45a5be20c9aa7c09 to your computer and use it in GitHub Desktop.
start mitmproxy in background programmatically
import asyncio
import threading
from typing import Any, Callable, Self
from mitmproxy import http
from mitmproxy.addons import default_addons, script
from mitmproxy.master import Master
from mitmproxy.options import Options
class Addon:
def __init__(self) -> None:
self.n_reponse = 0
def response(self, flow: http.HTTPFlow) -> None:
if flow.response:
self.n_reponse += 1
print(f"reponse {self.n_reponse}")
class ThreadedMitmProxy(threading.Thread):
def __init__(self, user_addon: Callable, **options: Any) -> None:
self.loop = asyncio.get_event_loop()
self.master = Master(Options(), event_loop=self.loop)
# replace the ScriptLoader with the user addon
self.master.addons.add(
*(
user_addon() if isinstance(addon, script.ScriptLoader) else addon
for addon in default_addons()
)
)
# set the options after the addons since some options depend on addons
self.master.options.update(**options)
super().__init__()
def run(self) -> None:
self.loop.run_until_complete(self.master.run())
def __enter__(self) -> Self:
self.start()
return self
def __exit__(self, *_) -> None:
self.master.shutdown()
self.join()
if __name__ == "__main__":
with ThreadedMitmProxy(Addon, listen_host="127.0.0.1", listen_port=8080):
# Other stuff might be started then
input("hit <Enter> to quit")
print("shutdown mitmproxy")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment