Created
March 24, 2014 08:16
-
-
Save zaccone/9736214 to your computer and use it in GitHub Desktop.
Python ECP
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This simple client uses standard Python modules | |
along with the Python lxml toolkit from | |
http://lxml.de/ | |
to demonstrate how a SAML ECP client works. | |
Studying this client is not an acceptable replacement | |
for reading Draft 02 of the ECP profile [ECP] available at | |
http://wiki.oasis-open.org/security/SAML2EnhancedClientProfile | |
Please read the profile document and consult this script | |
as one example of a non-conformant client. | |
This script cannot be considered a conformant client as defined | |
in section 3.1.3 of [ECP] because it does not support the use of | |
channel bindings of type "tls-server-end-point" nor does it support | |
TLS Client Authentication. | |
This client has been tested on Debian Wheezy against | |
the Shibboleth IdP version 2.4.0 and the Shibboleth Native SP | |
version 2.5.3. | |
The script assumes the ECP extension is installed and properly | |
configured for the IdP and that the SP is configured | |
properly with ECP support. See the Shibboleth documentation for | |
details. | |
It also assumes that the server hosting the IdP has been properly | |
configured to require a type of Basic Auth (login and password) | |
for the ECP location. See the documentation for the IdP ECP extension | |
for details. | |
""" | |
import os | |
import sys | |
import stat | |
import urllib2 | |
import cookielib | |
import re | |
import getpass | |
from optparse import OptionParser | |
from lxml import etree | |
from copy import deepcopy | |
# mapping from user friendly names or tags to IdP ECP enpoints | |
IDP_ENDPOINTS = { | |
"Campus01" : "https://campus01.edu/idp/profile/SAML2/SOAP/ECP", | |
"Campus02" : "https://campus02.edu/idp/profile/SAML2/SOAP/ECP", | |
} | |
class MyCookieJar(cookielib.MozillaCookieJar): | |
""" | |
Custom cookie jar subclassed from Mozilla because the file format | |
stored is not useable by the libcurl libraries. See the comment below. | |
""" | |
def save(self, filename=None, ignore_discard=False, ignore_expires=False): | |
if filename is None: | |
if self.filename is not None: filename = self.filename | |
else: raise ValueError(MISSING_FILENAME_TEXT) | |
f = open(filename, "w") | |
try: | |
f.write(self.header) | |
now = time.time() | |
for cookie in self: | |
if not ignore_discard and cookie.discard: | |
continue | |
if not ignore_expires and cookie.is_expired(now): | |
continue | |
if cookie.secure: secure = "TRUE" | |
else: secure = "FALSE" | |
if cookie.domain.startswith("."): initial_dot = "TRUE" | |
else: initial_dot = "FALSE" | |
if cookie.expires is not None: | |
expires = str(cookie.expires) | |
else: | |
# change so that if a cookie does not have an expiration | |
# date set it is saved with a '0' in that field instead | |
# of a blank space so that the curl libraries can | |
# read in and use the cookie | |
#expires = "" | |
expires = "0" | |
if cookie.value is None: | |
# cookies.txt regards 'Set-Cookie: foo' as a cookie | |
# with no name, whereas cookielib regards it as a | |
# cookie with no value. | |
name = "" | |
value = cookie.name | |
else: | |
name = cookie.name | |
value = cookie.value | |
f.write( | |
"\t".join([cookie.domain, initial_dot, cookie.path, | |
secure, expires, name, value])+ | |
"\n") | |
finally: | |
f.close() | |
def get(idp_endpoint, sp_target, login, debug=False): | |
""" | |
Given an IdP endpoint for ECP, the desired target | |
from the SP, and a login to use against the IdP | |
manage an ECP exchange with the SP and the IdP | |
and print the contents of the target to stdout | |
after establishing a session with the SP. | |
""" | |
# create a cookie jar and cookie handler | |
cookie_jar = cookielib.LWPCookieJar() | |
cookie_handler = urllib2.HTTPCookieProcessor(cookie_jar) | |
# need an instance of HTTPS handler to do HTTPS | |
httpsHandler = urllib2.HTTPSHandler(debuglevel = 0) | |
if debug: | |
httpsHandler.set_http_debuglevel(1) | |
# create the base opener object | |
opener = urllib2.build_opener(cookie_handler, httpsHandler) | |
# headers needed to indicate to the SP an ECP request | |
headers = { | |
'Accept' : 'text/html; application/vnd.paos+xml', | |
'PAOS' : 'ver="urn:liberty:paos:2003-08";"urn:oasis:names:tc:SAML:2.0:profiles:SSO:ecp"' | |
} | |
# request target from SP | |
request = urllib2.Request(url=sp_target,headers=headers) | |
try: | |
response = opener.open(request) | |
except Exception, e: | |
print >>sys.stderr, "First request to SP failed: %s" % e | |
sys.exit(1) | |
# convert the SP resonse from string to etree Element object | |
sp_response = etree.XML(response.read()) | |
if debug: | |
print "###### BEGIN SP RESPONSE" | |
print etree.tostring(sp_response) | |
print "###### END SP RESPONSE" | |
# pick out the relay state element from the SP so that it can | |
# be included later in the response to the SP | |
namespaces = { | |
'ecp' : 'urn:oasis:names:tc:SAML:2.0:profiles:SSO:ecp', | |
'S' : 'http://schemas.xmlsoap.org/soap/envelope/', | |
'paos': 'urn:liberty:paos:2003-08' | |
} | |
try: | |
relay_state = sp_response.xpath("//ecp:RelayState", namespaces=namespaces)[0] | |
except Exception, e: | |
print >>sys.stderr, "Unable to parse relay state element from SP response: %s" % e | |
sys.exit(1) | |
if debug: | |
print "###### BEGIN RELAY STATE ELEMENT" | |
print etree.tostring(relay_state) | |
print "###### END RELAY STATE ELEMENT" | |
# pick out the responseConsumerURL attribute so that it can | |
# later be compared with the assertionConsumerURL sent by the IdP | |
try: | |
response_consumer_url = sp_response.xpath("/S:Envelope/S:Header/paos:Request/@responseConsumerURL", namespaces=namespaces)[0] | |
except Exception, e: | |
print >>sys.stderr, "Unable to parse responseConsumerURL attribute from SP response: %s" % e | |
sys.exit(1) | |
if debug: | |
print "###### BEGIN RESPONSE CONSUMER URL" | |
print etree.tostring(relay_state) | |
print "###### END RESPONSE CONSUMER URL" | |
# make a deep copy of the SP response and then remove the header | |
# in order to create the package for the IdP | |
idp_request = deepcopy(sp_response) | |
header = idp_request[0] | |
idp_request.remove(header) | |
if debug: | |
print "###### BEGIN IDP REQUEST" | |
print etree.tostring(idp_request) | |
print "###### END IDP REQUEST" | |
# prompt the user for a password and then create a password manager | |
# and basic auth handler to add to the existing opener | |
password = getpass.getpass("Enter password for login '%s': " % login) | |
password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm() | |
password_mgr.add_password(None, idp_endpoint, login, password) | |
auth_handler = urllib2.HTTPBasicAuthHandler(password_mgr) | |
opener.add_handler(auth_handler) | |
# POST the request to the IdP | |
request = urllib2.Request(idp_endpoint, data=etree.tostring(idp_request)) | |
request.get_method = lambda: 'POST' | |
request.add_header('Content-Type', 'text/html') | |
try: | |
response = opener.open(request) | |
except Exception, e: | |
print >>sys.stderr, "Request to IdP failed: %s" % e | |
sys.exit(1) | |
idp_response = etree.XML(response.read()) | |
if debug: | |
print "###### BEGIN IDP RESPONSE" | |
print etree.tostring(idp_response) | |
print "###### END IDP RESPONSE" | |
try: | |
assertion_consumer_service = idp_response.xpath("/S:Envelope/S:Header/ecp:Response/@AssertionConsumerServiceURL", namespaces=namespaces)[0] | |
except Exception,e: | |
print >>sys.stderr, "Error parsing assertionConsumerService attribute from IdP response: %s" % e | |
sys.exit(1) | |
if debug: | |
print "###### BEGIN ASSERTION CONSUMER SERVICE URL" | |
print etree.tostring(idp_response) | |
print "###### END ASSERTION CONSUMER SERVICE URL" | |
# if the assertionConsumerService attribute from the IdP | |
# does not match the responseConsumerURL from the SP | |
# we cannot trust this exchange so send SOAP 1.1 fault | |
# to the SP and exit | |
if assertion_consumer_service != response_consumer_url: | |
print >> sys.stderr, "ERROR: assertionConsumerServiceURL %s does not" % assertion_consumer_service | |
print >> sys.stderr, "match responseConsumerURL %s" % response_consumer_url | |
print >> sys.stderr, "" | |
print >> sys.stderr, "sending SOAP fault to SP" | |
soap_fault = """ | |
<S:Envelope xmlns:S="http://schemas.xmlsoap.org/soap/envelope/"> | |
<S:Body> | |
<S:Fault> | |
<faultcode>S:Server</faultcode> | |
<faultstring>responseConsumerURL from SP and assertionConsumerServiceURL from IdP do not match</faultstring> | |
</S:Fault> | |
</S:Body> | |
</S:Envelope> | |
""" | |
headers = { | |
'Content-Type' : 'application/vnd.paos+xml', | |
} | |
request = urllib2.Request(url=response_consumer_url, data=soap_fault, headers=headers) | |
request.get_method = lambda: 'POST' | |
# POST the SOAP 1.1 fault to the SP and ignore any return | |
try: | |
response = opener.open(request) | |
except Exception, e: | |
pass | |
sys.exit(1) | |
# make a deep cop of the IdP response and replace its | |
# header contents with the relay state initially sent by | |
# the SP | |
sp_package = deepcopy(idp_response) | |
sp_package[0][0] = relay_state | |
if debug: | |
print "###### BEGIN PACKAGE TO SEND TO SP" | |
print etree.tostring(sp_package) | |
print "###### END PACKAGE TO SEND TO SP" | |
headers = { | |
'Content-Type' : 'application/vnd.paos+xml', | |
} | |
# POST the package to the SP | |
request = urllib2.Request(url=assertion_consumer_service, data=etree.tostring(sp_package), headers=headers) | |
request.get_method = lambda: 'POST' | |
try: | |
response = opener.open(request) | |
except Exception, e: | |
print >>sys.stderr, "Error POSTing package to SP: %s" % e | |
sys.exit(1) | |
# we ignore the response from the SP here and rely on the | |
# opener() instance and the cookie jar to get the cookies | |
# we need as they are sent from the SP in order to make the | |
# final request | |
# use existing established session to request the original target | |
# from the SP | |
request = urllib2.Request(url=sp_target) | |
try: | |
response = opener.open(request) | |
except Exception, e: | |
print >>sys.stderr, "Error requesting target %s from SP: %s" % (sp_target, e) | |
sys.exit(1) | |
# print the response from the SP to stdout | |
print response.read() | |
# multiple calls could be done here using the established session | |
# with the SP, and the cookies could be saved to the file system | |
# to be used with later processes but we do neither here and just exit | |
def main(): | |
""" | |
Process command line arguments and then call get() with | |
the appropriate IdP endpoint. | |
""" | |
usage = "usage: %prog [options] IdP_tag target_url login" | |
parser = OptionParser(usage=usage) | |
parser.add_option("-d", "--debug", | |
action="store_true", dest="debug", default=False, | |
help="write debug output to stdout") | |
(options, args) = parser.parse_args() | |
if len(args) != 3: | |
parser.error("incorrect number of arguments") | |
idp_tag, target, login = args | |
if not IDP_ENDPOINTS.has_key(idp_tag): | |
parser.error("IDP tag %s is not configured" % idp_tag) | |
if not re.match('^https://', target): | |
parser.error("target_url is not of form https://") | |
idp_endpoint = IDP_ENDPOINTS[idp_tag] | |
# get the target from the SP using ECP exchange | |
get(idp_endpoint, target, login, options.debug) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment