Created
February 5, 2019 22:34
-
-
Save jaraco/a648d793c847aa3be2ff5186bb8c1dc7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
__requires__ = [ | |
'cheroot', | |
'requests', | |
'pyopenssl', | |
'cryptography', | |
'trustme', | |
'pytest', | |
] | |
import functools | |
import ssl | |
import threading | |
import time | |
import OpenSSL.SSL | |
import pytest | |
import requests | |
import trustme | |
import cheroot.server | |
class HelloWorldGateway(cheroot.server.Gateway): | |
"""Gateway responding with Hello World to root URI.""" | |
def respond(self): | |
"""Respond with dummy content via HTTP.""" | |
req = self.req | |
if req.uri == '/': | |
req.status = b'200 OK' | |
req.ensure_headers_sent() | |
req.write(b'Hello world!') | |
return | |
return super(HelloWorldGateway, self).respond() | |
def make_tls_http_server(bind_addr, ssl_adapter): | |
"""Create and start an HTTP server bound to bind_addr.""" | |
httpserver = cheroot.server.HTTPServer( | |
bind_addr=bind_addr, | |
gateway=HelloWorldGateway, | |
) | |
# httpserver.gateway = HelloWorldGateway | |
httpserver.ssl_adapter = ssl_adapter | |
threading.Thread(target=httpserver.safe_start).start() | |
while not httpserver.ready: | |
time.sleep(0.1) | |
return httpserver | |
@pytest.fixture | |
def tls_http_server(): | |
"""Provision a server creator as a fixture.""" | |
def start_srv(): | |
bind_addr, ssl_adapter = yield | |
httpserver = make_tls_http_server(bind_addr, ssl_adapter) | |
yield httpserver | |
yield httpserver | |
srv_creator = iter(start_srv()) | |
next(srv_creator) | |
yield srv_creator | |
try: | |
while True: | |
httpserver = next(srv_creator) | |
if httpserver is not None: | |
httpserver.stop() | |
except StopIteration: | |
pass | |
@pytest.fixture | |
def ca(): | |
return trustme.CA() | |
@pytest.fixture | |
def tls_ca_certificate_pem_path(ca): | |
with ca.cert_pem.tempfile() as ca_cert_pem: | |
yield ca_cert_pem | |
@pytest.fixture | |
def tls_certificate(ca): | |
return ca.issue_server_cert('127.0.0.1') | |
@pytest.fixture | |
def tls_certificate_chain_pem_path(tls_certificate): | |
"""Provide a certificate chain PEM file path via fixture.""" | |
with tls_certificate.private_key_and_cert_chain_pem.tempfile() as cert_pem: | |
yield cert_pem | |
@pytest.fixture | |
def tls_certificate_private_key_pem_path(tls_certificate): | |
"""Provide a certificate private key PEM file path via fixture.""" | |
with tls_certificate.private_key_pem.tempfile() as cert_key_pem: | |
yield cert_key_pem | |
@pytest.mark.parametrize( | |
'adapter_type', | |
( | |
'pyopenssl', | |
), | |
) | |
@pytest.mark.parametrize( | |
'is_trusted_cert,tls_client_identity', | |
( | |
(False, 'localhost'), | |
), | |
) | |
@pytest.mark.parametrize( | |
'tls_verify_mode', | |
( | |
ssl.CERT_REQUIRED, # server should validate if client cert CA is OK | |
), | |
) | |
def test_tls_client_auth( | |
monkeypatch, | |
tls_http_server, adapter_type, | |
ca, | |
tls_certificate, | |
tls_certificate_chain_pem_path, | |
tls_certificate_private_key_pem_path, | |
tls_ca_certificate_pem_path, | |
is_trusted_cert, tls_client_identity, | |
tls_verify_mode, | |
): | |
"""Verify that client TLS certificate auth works correctly.""" | |
interface, _host, port = '127.0.0.1', '0.0.0.0', 0 | |
client_cert_root_ca = ca if is_trusted_cert else trustme.CA() | |
monkeypatch.setattr( | |
'idna.core.ulabel', | |
lambda *args, **kwargs: tls_client_identity.encode(), | |
) | |
client_cert = client_cert_root_ca.issue_server_cert( | |
# FIXME: change to issue_cert once new trustme is out | |
tls_client_identity, | |
) | |
del client_cert_root_ca | |
with client_cert.private_key_and_cert_chain_pem.tempfile() as cl_pem: | |
tls_adapter_cls = cheroot.server.get_ssl_adapter_class( | |
name=adapter_type) | |
tls_adapter = tls_adapter_cls( | |
tls_certificate_chain_pem_path, | |
tls_certificate_private_key_pem_path, | |
) | |
tls_adapter.context = tls_adapter.get_context() | |
tls_adapter.context.set_verify( | |
OpenSSL.SSL.VERIFY_PEER + OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT, | |
lambda conn, cert, errno, depth, preverify_ok: preverify_ok, | |
) | |
ca.configure_trust(tls_adapter.context) | |
tls_certificate.configure_cert(tls_adapter.context) | |
tlshttpserver = tls_http_server.send( | |
( | |
(interface, port), | |
tls_adapter, | |
), | |
) | |
_host, port = tlshttpserver.bind_addr | |
make_https_request = functools.partial( | |
requests.get, | |
'https://' + interface + ':' + str(port) + '/', | |
# Server TLS certificate verification: | |
verify=tls_ca_certificate_pem_path, | |
# Client TLS certificate verification: | |
cert=cl_pem, | |
) | |
with pytest.raises(requests.exceptions.SSLError) as ssl_err: | |
make_https_request() | |
err_text = ssl_err.value.args[0].reason.args[0].args[0] | |
assert 'tlsv1 alert unknown ca' in err_text |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment