Skip to content

Instantly share code, notes, and snippets.

@santrancisco
Last active May 26, 2025 07:38
Show Gist options
  • Save santrancisco/12d61915b42a752d2d3c9ae9b1267540 to your computer and use it in GitHub Desktop.
Save santrancisco/12d61915b42a752d2d3c9ae9b1267540 to your computer and use it in GitHub Desktop.
burp_jwt_replacer
from burp import IBurpExtender, IContextMenuFactory, IProxyListener
from javax.swing import JMenuItem
import re
import base64
import json
class BurpExtender(IBurpExtender, IContextMenuFactory, IProxyListener):
def __init__(self):
self.authorization_dict={}
def registerExtenderCallbacks(self, callbacks):
self._callbacks = callbacks
self._helpers = callbacks.getHelpers()
callbacks.setExtensionName("JWT Token Replacer (Context Menu)")
callbacks.registerContextMenuFactory(self)
callbacks.registerProxyListener(self)
self.log("JWT Token Replacer loaded.")
def log(self, msg):
self._callbacks.printOutput(msg)
#
# Create menu item and trigger here
#
def createMenuItems(self, invocation):
messages = invocation.getSelectedMessages()
def replaceJWT(e):
request_byte_array=messages[0].getRequest()
request_string= self._helpers.bytesToString(request_byte_array)
jwtmatch = re.search(r"Authorization: Bearer (.+)",request_string)
current_token = jwtmatch.group(1).strip() if jwtmatch else None
host_match = re.search(r"Host: (.+)", request_string)
host = host_match.group(1).strip() if host_match else None
if current_token:
newjwt = self.checkForNewJWT(current_token, host)
if (newjwt):
print(newjwt)
request_string= re.sub("Authorization: Bearer [^\r\n]*","Authorization: Bearer "+newjwt,request_string)
print(request_string)
new_request_bytes = self._helpers.stringToBytes(request_string)
messages[0].setRequest(new_request_bytes)
menuList=[]
menu = JMenuItem("Update BearerTokens")
menu.addActionListener(replaceJWT)
menuList.append(menu)
return menuList
#
# Search for jwt in all proxied message here!
#
def processProxyMessage(self, messageIsRequest, message):
requestInfo = self._helpers.analyzeRequest(message.getMessageInfo().getRequest())
host = message.getMessageInfo().getHttpService().getHost()
if messageIsRequest:
self.captureAuthorizationHeader(requestInfo,host)
##### HELPER FUNCTIONS ######
def captureAuthorizationHeader(self, request_info, host):
"""
Capture the Authorization header and store the JWT's 'sub' claim in the dictionary.
"""
headers = request_info.getHeaders()
for header in headers:
if header.lower().startswith("authorization:"):
# Extract the JWT from the Authorization header
auth_value = header.split(":", 1)[1].strip()
if auth_value.lower().startswith("bearer "):
jwt_token = auth_value[7:] # Remove 'Bearer ' prefix
sub_id = self.decode_jwt_and_extract_sub(jwt_token)
if sub_id:
if sub_id not in self.authorization_dict:
self.authorization_dict[sub_id] = {}
self.authorization_dict[sub_id][host] = jwt_token # Store the JWT for later replacement
# print("Captured JWT Authorization header for sub"+sub_id+" at "+host+": "+jwt_token)
break
def decode_jwt_and_extract_sub(self, jwt_token):
try:
header,payload,signature = [unicode(s).encode('utf-8') for s in jwt_token.split(".",3)]
decoded_payload = self._helpers.bytesToString(
self._helpers.base64Decode(payload+"=" * (-len(payload) % 4))
)
payload_dict = json.loads(decoded_payload)
return payload_dict.get("sub") # Extract the 'sub' claim
except Exception as e:
print(e)
return None
def checkForNewJWT(self, currentjwt,host):
sub_id = self.decode_jwt_and_extract_sub(currentjwt)
if (unicode(sub_id) in self.authorization_dict):
if (unicode(host) in self.authorization_dict[unicode(sub_id)]):
return self.authorization_dict[unicode(sub_id)][unicode(host)]
return None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment