Created
January 4, 2017 11:35
-
-
Save fredrikhl/8bc35a11ddf8bb02250d1b2a06a0f8fd to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python2 | |
# encoding: utf-8 | |
u""" Decode and pretty-print a SAML HTTP-Redirect message """ | |
import zlib | |
import base64 | |
import urlparse | |
import hashlib | |
# Conditional imports: M2Crypto, xml.dom.minidom, argparse, sys | |
def deflate(strval): | |
u""" Encodes a document for use in query strings. | |
This encodes the data using zlib, strips the header and checksum, and | |
base64 encodes the compressed data. | |
:see: https://www.ietf.org/rfc/rfc1951.txt | |
""" | |
return base64.b64encode(zlib.compress(strval)[2:-4]) | |
def inflate(strval): | |
u""" Decodes a string encoded by `deflate'. """ | |
return zlib.decompress(base64.b64decode(strval), -zlib.MAX_WBITS) | |
class HTTPRedirect(object): | |
u""" Parse a SAML HTTP-Redirect URL. """ | |
def __init__(self, url): | |
self.url = url | |
self._parts = urlparse.urlparse(url) | |
self._query = dict( | |
(k, v[0]) for k, v in urlparse.parse_qs( | |
self._parts.query, True, True).iteritems()) | |
self._raw = dict( | |
(item.split(u'=') for item in self._parts.query.split(u'&'))) | |
@property | |
def msgtype(self): | |
""" The message type. """ | |
for t in (u'SAMLRequest', u'SAMLResponse'): | |
if t in self._query: | |
return t | |
return u'' | |
@property | |
def message(self): | |
u""" The SAML message. """ | |
return inflate(self._query.get(self.msgtype)) | |
@property | |
def relay(self): | |
u""" The included relay state. """ | |
return self._query.get(u'RelayState') | |
@property | |
def algo(self): | |
u""" The signature algorithm of this mesage. """ | |
return self._query.get(u'SigAlg', u'') | |
@property | |
def signature(self): | |
u""" The signature of this message. """ | |
return self._query.get(u'Signature', u'') | |
@property | |
def endpoint(self): | |
u""" The target endpoint of this message. """ | |
return urlparse.urlunparse(self._parts._replace(query='')) | |
@property | |
def signed(self): | |
u""" Signed part (input for the digest) of this message. """ | |
params = [u'SAMLRequest', u'SAMLResponse', u'RelayState', u'SigAlg'] | |
return u'&'.join( | |
(u'{}={}'.format(k, self._raw.get(k)) for k in params | |
if k in self._raw)) | |
class HTTPPost(HTTPRedirect): | |
def __init__(self, body): | |
self.body = u''.join((part.strip() for part in body.split())) | |
self._raw = dict( | |
(item.split(u'=') for item in self.body.split(u'&'))) | |
self._query = dict((urlparse.unquote(param), | |
urlparse.unquote(value)) | |
for param, value in self._raw.iteritems()) | |
@property | |
def message(self): | |
u""" The SAML message. """ | |
return base64.decodestring(self._query.get(self.msgtype)) | |
@property | |
def endpoint(self): | |
u""" The target endpoint of this message. """ | |
return u'' | |
def _getNode(self, *path): | |
u""" path: Node path. """ | |
import xml.dom.minidom | |
dom = xml.dom.minidom.parseString(self.message) | |
def search(node, path): | |
if not path: | |
return node # Found it | |
nextpath = path.pop(0) | |
for n in node.childNodes: | |
if n.nodeName == nextpath: | |
return search(n, path) | |
return None | |
return search(dom, list(path)) | |
def _getTextNode(self, node): | |
for n in node.childNodes: | |
if n.nodeType == n.TEXT_NODE: | |
return n.nodeValue | |
return None | |
def _getNodeAttribute(self, node, attr): | |
n = node.attributes.get(attr) | |
if n and n.nodeType == n.ATTRIBUTE_NODE: | |
return n.nodeValue | |
return None | |
def _getMessageElem(self, elem): | |
root_node = 'samlp:Response' | |
try: | |
import xml.dom.minidom | |
dom = xml.dom.minidom.parseString(self.message) | |
root = dom.getElementsByTagName(root_node)[0] | |
for node in root.childNodes: | |
if node.tagName == elem: | |
return node | |
except: | |
pass | |
return None | |
@property | |
def signed(self): | |
u""" The message digest. """ | |
node = self._getNode('samlp:Response', 'ds:Signature', 'ds:SignedInfo', | |
'ds:Reference', 'ds:DigestValue') | |
if node: | |
return self._getTextNode(node) or u'' | |
return u'' | |
@property | |
def algo(self): | |
u""" The signature algorithm of this mesage. """ | |
node = self._getNode('samlp:Response', 'ds:Signature', 'ds:SignedInfo', | |
'ds:SignatureMethod') | |
if node: | |
return self._getNodeAttribute(node, 'Algorithm') or u'' | |
return u'' | |
@property | |
def signature(self): | |
u""" The signature of this message. """ | |
node = self._getNode('samlp:Response', 'ds:Signature', 'ds:SignatureValue') | |
if node: | |
return self._getTextNode(node) or u'' | |
return u'' | |
# Signature stuff | |
def rsa_sha1(pemstr, msgstr, bsign): | |
u""" RSA-encrypted SHA1-digest. """ | |
import M2Crypto.X509 | |
rsakey = M2Crypto.X509.load_cert_string(pemstr).get_pubkey().get_rsa() | |
return bool(rsakey.verify(hashlib.sha1(msgstr).digest(), bsign, 'sha1')) | |
def get_method(sigalg): | |
u""" Return a validation method for the signature algorithm. """ | |
known = {u'http://www.w3.org/2000/09/xmldsig#rsa-sha1': rsa_sha1, } | |
if sigalg in known: | |
return known[sigalg] | |
raise NotImplementedError(u"Unknown SigAlg: '{}'".format(sigalg)) | |
def check_sign(keystr, msgstr, base64sig, sigalg): | |
u""" Check a signature. | |
:param str keystr: A PEM-formatted key to validate the message with. | |
:param str msgstr: The message that has been signed. | |
:param str base64sign: A base64 encoded binary signature for `msgstr'. | |
:param str sigalg: The algorithm used to sign the message. | |
:return bool: True if the signature validates. | |
""" | |
return get_method(sigalg)(keystr, msgstr, base64.b64decode(base64sig)) | |
# Output formatting stuff | |
def prettyxml(xmlstr, indent=4, attr_indent=8, attr_split_lim=64): | |
u""" Format an XML string. """ | |
import xml.dom.minidom | |
dom = xml.dom.minidom.parseString(xmlstr) | |
lines = dom.toprettyxml(indent * u' ').split(u"\n") | |
# If long line, put attributes on separate, extra indented lines | |
for n, line in enumerate(lines): | |
if len(line) <= attr_split_lim: | |
continue | |
oldind = len(line) - len(line.lstrip(u' ')) | |
parts = line.split() | |
lines[n] = u"\n".join( | |
((oldind + attr_indent * int(i > 0)) * u' ' + p | |
for i, p in enumerate(parts))) | |
return u"\n".join(lines).strip() | |
def fmt_item(key, value, vmod=None, vind=4): | |
""" Format a k,v pair. """ | |
if vmod: | |
value = u"\n".join( | |
[value[i:i+vmod] for i in range(0, len(value), vmod)]) | |
value = u"\n{}\n".format( | |
u"\n".join((vind * u' ' + line for line in value.split(u"\n")))) | |
return "{}: {}".format(key, value) | |
# Invocation stuff | |
def test(): | |
url = """ | |
https://idp2.weblogin-ext-utv01.uio.no/simplesaml/saml2/idp/SSOService.php | |
?SAMLRequest=jZJBj9owEIX%2FSuR7EidLQLYAiZa2ILFdtEF0tZfKiWfBlWOn9niB%2FfVNQ | |
g%2F0gnrx4fm9mfdJM%2FWi0S1fBDyaZ%2FgdwGN0brTxfPiYkeAMt8Irz41o | |
wHOsebl43PA8obx1Fm1tNbmJ3E8I78GhsoZE6%2BWM%2FBRFJjNWgGBvGRuxj | |
NYggYmCQc4KORGT0QQm43GRVyTag%2FNdcka6QV3c%2BwBr41EY7CSaFTEdxz | |
TfUcYfHnjGXkm07GiUETikjoit52mqZJsnJ6i0PSgTwxnjgO80S4KyibGpV02 | |
roSdJ%2Byfv%2FWlZPpXg3lUNSXtsSbT9C%2F5JGanM4T5zdTV5vtrttvH2qd | |
yR%2BbSfzQcGN79plv1fs8bKoIcuqQQNB4GQNoBCChSDKlbPtF49jjcX9lEvv | |
wSZ71X1jRnxY4SvL0e6edlT%2BWtx3ph9W12KUJ2m6W2l6fUsvncw6%2BXWal | |
Vfoq%2FWNQLvs%2FaKkvHbYOXohPEKDJJoobU9fXbQNZ0RdAFIOr%2Bu%2FPf | |
45n8A | |
&RelayState=https%3A%2F%2Fsp1.weblogin-ext-utv01.uio.no%2Flogin.php | |
&SigAlg=http%3A%2F%2Fwww.w3.org%2F2000%2F09%2Fxmldsig%23rsa-sha1 | |
&Signature=glZu6iLc8QQKb%2Bo28iaCGfVUb87ZSn%2BEPRxoFmCUT8Vu7KXQYDrNeZjLjfF | |
nS28OxIEpT07YbYWPeZHrNKEMZ5RWPv4APqqsEEsI0gIk2CG%2FbxyPuBvRcA79 | |
3zLcS7q4CcvaO6MfzCSi6Iv6%2FYIB5f%2BhzXEX%2FSOF1ouaaFMDm5smT0UXg | |
i07geOvgXZ4Stz%2BfnT0ni1hFHdkfBnbSAZ3rU9AKVXpZSPrENN6lzZ0uAGnF2 | |
GkeCloFzmPYC6W1vT7F8LQj6RHlOONUFerj7c2WX6Ic%2FZFWJcFpEWsQvVllQt | |
8O4SNzMjlPdhaWAmyCrRxL0AJLp4YY75evNSVFsTVnA%3D%3D | |
""".replace(' ', '').replace("\n", '') | |
certdata = """ | |
-----BEGIN CERTIFICATE----- | |
MIIDXTCCAkWgAwIBAgIJAMk2OcvLA3pLMA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNV | |
BAYTAk5PMQ0wCwYDVQQKDARVQUlUMScwJQYDVQQDDB5pZHAxLndlYmxvZ2luLWV4 | |
dC11dHYwMS51aW8ubm8wHhcNMTUwNDIyMTI1NzI5WhcNMjUwNDE5MTI1NzI5WjBF | |
MQswCQYDVQQGEwJOTzENMAsGA1UECgwEVUFJVDEnMCUGA1UEAwweaWRwMS53ZWJs | |
b2dpbi1leHQtdXR2MDEudWlvLm5vMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB | |
CgKCAQEAxU6Ujxj9y0ZEYrvUzyd2jgrMJFbvZ2HiPLeZPa6JUcuizg/EyI8EFtV6 | |
YmNRmW5HXOnHN6nc0tQOD8ggP1sBgNSz6Nuu6trRxuIOug42DQuyugu1ZQb5I9lB | |
y9+h2eynS6DG8qNYLMxYRlRRgbXJ6k5MxZGOAL7+TYI3CNaQBlVwPgGZLXEPP18J | |
5LL6rKrqGOEphyxLTnIvYRNDMcU+Ow6NeWBV+Qq7YpW/wd3bhYmIQg8QZZ5KbWr0 | |
eFCIXwy2YSo986moaJM18Q11QaXQNurdC4zF7hiNIFjnLlN7/Rhf6JWTVzIxLWqD | |
yZXk6VQhLpG/VtgUj1ub1sp12tc6XwIDAQABo1AwTjAdBgNVHQ4EFgQUE6JyWCTk | |
9j6YphJzkT5ZefilnRIwHwYDVR0jBBgwFoAUE6JyWCTk9j6YphJzkT5ZefilnRIw | |
DAYDVR0TBAUwAwEB/zANBgkqhkiG9w0BAQUFAAOCAQEAGJkkzMBmi5qRKPlflOym | |
1S26p/tpVe6V12vWvFVBZMuZnl7bVPAsS20gPb1FshmQPZwGvdH5ekKmfJEeBXZu | |
WOjYsClu21WVSTnGYv7aHxizmQjccJZwt9RA+++/Rr9cylRTl6Dg/xPWe5VfKOVc | |
qaq20tU6ihEaRyBD+DuHTxqNaJlPhkUuVc1OhaO9VVTNFXgiXGIUJ4ajBvTb44FH | |
ZsWGt+oQVeNxi9BgrNU4a5efijmUl2MUQLqNDn3pQ9IYuCtgzkwF6xgNv91xG+Dp | |
/amEXb2WRwdA4lvpeRB5G+kgyJhGec2lZvn90Jet4VUxGHm/si7eSJghfctIyBTI | |
4A== | |
-----END CERTIFICATE----- | |
""".replace(' ', '') | |
main([url], certdata) | |
def main(args=None, certdata=None): | |
import argparse | |
parser = argparse.ArgumentParser(description=__doc__) | |
parser.add_argument( | |
u'--cert', | |
metavar='PEM', | |
default=None, | |
help=u"Validate signature using %(metavar)s") | |
parser.add_argument( | |
u'--test', | |
default=False, | |
action='store_true', | |
help=u'Run with test data') | |
parser.add_argument( | |
u'-p', u'--post', | |
default=False, | |
action='store_true', | |
help=u'(Experimental) Decode SAML2 POST form data') | |
parser.add_argument( | |
u'url', | |
default=None, | |
nargs='?', | |
help=u'A SAML2 HTTP-Redirect URL/message') | |
args = parser.parse_args(args) | |
# Run with test args | |
if args.test: | |
test() | |
raise SystemExit() | |
# If not url, read from stdin | |
if args.url: | |
url = HTTPRedirect(args.url) | |
else: | |
import sys | |
if args.post: | |
url = HTTPPost(u''.join(sys.stdin.readlines()).strip()) | |
else: | |
url = HTTPRedirect(u''.join(sys.stdin.readlines()).strip()) | |
# Read certificate option into certdata | |
if args.cert: | |
with open(args.cert) as f: | |
certdata = f.read() | |
# Output | |
print fmt_item(u'Endpoint', url.endpoint) | |
print fmt_item(url.msgtype, prettyxml(url.message)) | |
if url.relay: | |
print fmt_item(u'RelayState', url.relay) | |
print fmt_item(u'Digest input', url.signed, 64) | |
if any((url.signature, url.algo)): | |
print fmt_item(u'Signature (b64)', url.signature, 64) | |
if certdata: | |
print fmt_item( | |
u'Signature validity', | |
repr(check_sign(certdata, url.signed, url.signature, url.algo))) | |
if __name__ == u'__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment