Created
August 28, 2022 20:52
-
-
Save tothi/ad2575c29aa8880f8fb8ff48495e1afc to your computer and use it in GitHub Desktop.
mitmproxy addon for handling oauth access and refresh tokens automatically
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# run: mitmproxy -k -p 8090 -s mitmproxy-jwt-refresh-addon.py | |
# set burp upstream proxy to localhost:8090 | |
# | |
# use case: | |
# - application authorization is implemented by OAuth 2.0 | |
# - testing is performed using Burp as primary and mitmproxy as upstream proxy | |
# - mitmproxy takes care of the Authorization tokens using this addon | |
# - user gets an access_token and a refresh_token during the 1st login (e.g. password login) | |
# - mitmproxy addon caches access_token and refresh_token | |
# - mitmproxy addon adds Authorization: Bearer [access_token from cache] header for every request | |
# - mitmproxy addon gets a new access_token before expiration using the OAuth refresh_token grant_type | |
# | |
# without this upstream mitmproxy addon, Burp active scans (and other testing) would fail after access_token expires | |
# (and browser does not interact with the application) | |
from mitmproxy import ctx | |
import json | |
import requests | |
import threading | |
import urllib3 | |
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) | |
TOKEN_URL = "https://example.domain.com/openid-connect/token" | |
CLIENT_ID = "example-client" | |
class JwtRefresh: | |
def __init__(self): | |
self._refresh_token = None | |
self._access_token = None | |
self._interval = None | |
self._timer = None | |
# extrat tokens from the appropriate response | |
def _update_tokens(self, resp): | |
data = json.loads(resp) | |
assert data["token_type"] == "Bearer" | |
self._interval = data["expires_in"] / 2 # make sure it is refreshed before expiration | |
self._access_token = data["access_token"] | |
self._refresh_token = data["refresh_token"] | |
ctx.log("[+] JwtRefresh: Updated in-cache access_token and refresh_token.") | |
# get a new access (and refresh) token using our stored refresh_token and start a background refresh task | |
def _refresh_request(self): | |
ctx.log("[*] JwtRefresh: Calling TOKEN_URL for refresh_token...") | |
r = requests.post(TOKEN_URL, data = {"grant_type": "refresh_token", "refresh_token": self._refresh_token, "client_id": CLIENT_ID}, verify=False) | |
self._update_tokens(r.content) | |
self._add_refresh_timer() | |
# init refresh loop (make sure there is only one instance) | |
def _add_refresh_timer(self): | |
if self._timer is not None: | |
self._timer.cancel() | |
self._timer = threading.Timer(self._interval, self._refresh_request) | |
self._timer.start() | |
ctx.log("[*] JwtRefresh: Added %ds timer for refresh_token request." % self._interval) | |
# get tokens from TOKEN_URL response and start a background refresh task | |
def response(self, flow): | |
if flow.request.pretty_url == TOKEN_URL: | |
ctx.log("[*] JwtRefresh: Triggered TOKEN_URL. Extracting tokens from response...") | |
self._update_tokens(flow.response.get_text()) | |
self._add_refresh_timer() | |
# replace Auth header is we have the access_token | |
def request(self, flow): | |
if self._access_token is not None: | |
flow.request.headers["Authorization"] = "Bearer %s" % self._access_token | |
ctx.log("[*] JwtRefresh: Updated HTTP request, access_token from cache injected in Authorization header") | |
addons = [JwtRefresh()] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment