Created
July 18, 2021 11:43
-
-
Save adiroiban/b895a62b4d28197dfdc8b803aa23ab0c to your computer and use it in GitHub Desktop.
chevah txacme
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright (c) 2018 Adi Roiban. | |
# See LICENSE for details. | |
""" | |
Act as ACME client to generate and renew SSL certificates from Let's Encrypt. | |
The certificates are then set to each SSL service and the service | |
is restarted if required. | |
The task of updating the service configuration is done by the | |
certificate storage. | |
""" | |
from zope.interface import implementer, Interface | |
import pem | |
from OpenSSL import SSL | |
from twisted.internet import defer | |
from twisted.internet.endpoints import TCP4ServerEndpoint | |
from twisted.python.url import URL | |
from twisted.web import http | |
from twisted.web.iweb import IResponse | |
# Let's Encrypt is not available everywhere | |
try: | |
from cryptography.hazmat.primitives import serialization | |
from cryptography.hazmat.primitives.asymmetric import rsa | |
from josepy.jwa import RS256 | |
from josepy.jwk import JWKRSA | |
from txacme.client import Client, JWSClient | |
from acme.errors import ClientError | |
from txacme.interfaces import ICertificateStore, IResponder | |
from txacme.service import AcmeIssuingService | |
except ImportError: | |
# Placeholders for when we don't have the lets encrypt libraries. | |
JWSClient = None | |
ICertificateStore = Interface | |
IResponder = Interface | |
AcmeIssuingService = object | |
from chevah.server import force_unicode | |
from chevah.server.commons import DEFAULT_BACKEND | |
from chevah.server.commons.exception import ServerException | |
from chevah.server.commons.interface import ( | |
ILetsEncryptClientConfiguration, | |
ILetsEncryptClientResource, | |
ISSLOptions, | |
) | |
from chevah.server.commons.runnable import Runnable | |
from chevah.server.commons.service import listen | |
from chevah.server.configuration.model import ( | |
WritableBoolean, | |
WritableString, | |
WritableStringOrNone, | |
) | |
from chevah.server.configuration.option import ( | |
AddressPortOptionsMixin, | |
) | |
from chevah.server.http.client import PersistentAgent | |
from chevah.server.http.resource import ( | |
RedirectResource, | |
StaticTextResource, | |
) | |
from chevah.server.http.factory import HTTPFactoryBase | |
from chevah.server.http.configuration import HTTPServiceConfigurationSection | |
from chevah.server.resource.configuration import _ResourceConfigurationBase | |
# Self-signed certificate returned for the domains for which we don't yet | |
# have a certificate. | |
# openssl req -x509 -newkey rsa:2048 -nodes \ | |
# -keyout key.pem -out cert.pem -days 7300 | |
# It has CN = "loading-lets-encrypt" | |
_PEMS_PLACEHOLDER = u""" | |
-----BEGIN CERTIFICATE----- | |
MIIDLjCCAhagAwIBAgIJAPcHtbsKQK/BMA0GCSqGSIb3DQEBCwUAMCwxCzAJBgNV | |
BAYTAkdCMR0wGwYDVQQDDBRsb2FkaW5nLWxldHMtZW5jcnlwdDAeFw0xODExMTgy | |
MjIyNTFaFw0zODExMTMyMjIyNTFaMCwxCzAJBgNVBAYTAkdCMR0wGwYDVQQDDBRs | |
b2FkaW5nLWxldHMtZW5jcnlwdDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoC | |
ggEBAMBTFjLB7moRizureD2u8l42ZjG3/y/iDjQdV7EQeEWKWF/8QusaTmBBXRvS | |
e8Rz7p0aRfdiZS80Kp2h8Wbdk9BY/R1OmpHsaofCNL00s5cF345wFIPiIjFubI2X | |
9SZ6WmhiMA8CvxDu9mCJBoJf7jRJn5gBQVIE2jSSap8lVvyVsJp7G8+MV2PL9Ax1 | |
GinDXN6r+zdnjG2sVx1pFDShuGNCnD6l+RQToUm771t5kAuE6usx5FICcxu+Y18h | |
AFza4g15cdSN4ql4hOc2M1jeqlhwYPGSyZVi3OyDpiQGlVQE2N3PwFue9d7pHu/A | |
gS3ROWygoxjgyRC4kiobsgHqMdsCAwEAAaNTMFEwHQYDVR0OBBYEFBVQLqOzkoBb | |
MF9xs9rEMKfGhqYrMB8GA1UdIwQYMBaAFBVQLqOzkoBbMF9xs9rEMKfGhqYrMA8G | |
A1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEBAKn3xdAH+U6RkfMQXn0F | |
6GF3oDcL9y0Pgr6twwgzR+XI3FzdJrLz2FgNbxyAB2pJsi+luDJ/b4Qu6ba+Dwft | |
LUvPtcR+4OpcINANOnsRbmqQnlURxbEZgm6A8rgQHlec6yMTNV2O+I8ccvhd3ERK | |
mpn5UlFMuR3L5UWzt7kYu5nXvchNdk2C41kj+coKy5rUyVe70162qXei+hB3bglf | |
6kT8BMQYJty8sFLIcsHa27Y7icXGNTZTtRNojh5/gkJNzn5nwMwySAfn+SE+qRsB | |
L8NLVJ3C/2kq31I1yRDl3j7smctr+1B3sLqLzhW9wYifCYYYGOaRsE676akX4/LH | |
Zlo= | |
-----END CERTIFICATE----- | |
-----BEGIN PRIVATE KEY----- | |
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDAUxYywe5qEYs7 | |
q3g9rvJeNmYxt/8v4g40HVexEHhFilhf/ELrGk5gQV0b0nvEc+6dGkX3YmUvNCqd | |
ofFm3ZPQWP0dTpqR7GqHwjS9NLOXBd+OcBSD4iIxbmyNl/UmelpoYjAPAr8Q7vZg | |
iQaCX+40SZ+YAUFSBNo0kmqfJVb8lbCaexvPjFdjy/QMdRopw1zeq/s3Z4xtrFcd | |
aRQ0obhjQpw+pfkUE6FJu+9beZALhOrrMeRSAnMbvmNfIQBc2uINeXHUjeKpeITn | |
NjNY3qpYcGDxksmVYtzsg6YkBpVUBNjdz8BbnvXe6R7vwIEt0TlsoKMY4MkQuJIq | |
G7IB6jHbAgMBAAECggEAYw6oYU5HmPQeTYZ8sPCZvvKv4i3NzwDUpfzy/Kzp9Y9T | |
A2Uvpl3vPl6MMVdRDhdUMDCnFsrkKNLqnDQb3QqFJoufGugwbrZhDDl35nftg/yv | |
b3LE7kSbrfAdEzTQPJhKVnc0H5cfh/48ge4xnxb/3oiJHcaPuUnshaVgK0YVLpSZ | |
Di2+5u6jSK9vDba54/IEc2IsnfEA+Uu6JEBSZYWKPK2vI7jVutU3j52N9QQV3Ua2 | |
YSOdOv4nehF3Uqv0YZUoydfkl5OQBMK3TtrRBL9qQCH7vcIpTRcvhXqmBRTuvyAd | |
xKXsDzcrvK2NMe+DCMJ650cEyIzdUWylARs51nX9YQKBgQD2ABPmMVAxlpmYbXfC | |
V/WY3m7VnUmvjzumKx2+6OByzgwnCv2Gigr+C+YuQ9TMgl2SPQvoz/ltMf+LU7iL | |
mth2hs1z4i6bUsVF9QxRe8mkPVPTaASNGXENoQG/2uK2AIi5FE1czdALE1w16d1U | |
otMpw+808FqhtMxygfsZzubW6wKBgQDIJHMjeM2zB4z1hfbBaEejdGYxGy1w6kQZ | |
5wdGx/1uL698U+Xly/RuRKUDxlHDxpSJaLN/jKhYct+S4mvxwRhjY2p8MfDMEJw8 | |
Pd/VHEq/1d4F85D9I+Hg6k8NXlCwGYdL2jJVcF9ui+zwyv/Xi9plWHRNZA3TdkUJ | |
M8VZ4ic00QKBgAdKHUzW7T9q7QY9SC00Ggz7mmEuFf7jyaq04F7F29DLFkUZ6EVS | |
Vd0NUTbRv76Hpmos8OtnLkb0ElR4mKFaZ0ur1u62JxdnLn1SM5k+h80cHB3JmUjt | |
FhKHUNv0in9GKMcDOP+lAaMyYasfUPWvFX7JVY9GCAl+qAhEncI4BVMdAoGBALEb | |
YDABlLVadyhFdchMiShhtpS54gpLpBvvzwaZrvA0jmvMqmIhi5mQcR3X/z7pmUPH | |
PSAfzlEGxkVqy+7Q2s4IiZCBeP57rGW6szoYoSUFEkXd9W2stROdBHtl/Kz4yjsb | |
SPNGT4y5keC7Io8RGSAZmEFryrhXrluoTnltoRghAoGAPtJkcqgx710XD6hxgFEy | |
G9JR+g9l+NGcK7SSvqRBfQMFZC4KjfziSJVxI2rfZkuDSN4qccprtQQsYIbKzQs3 | |
FJLYQdefBfHAMRuYokvg7qgb5anzkAM61Q/MIq9lnlRM4Kik4NDivMdS1vh6ClHt | |
kt/kWXaJ2YYyqk9b5YhMhzU= | |
-----END PRIVATE KEY----- | |
""" | |
@implementer(ILetsEncryptClientConfiguration) | |
class LetsEncryptClientConfiguration( | |
_ResourceConfigurationBase, | |
AddressPortOptionsMixin, | |
HTTPServiceConfigurationSection, | |
): | |
""" | |
Configurations for the Let's Encrypt resource. | |
It is a resource configuration and has all the basic HTTP server | |
configurations. | |
""" | |
PORT_DEFAULT = 80 | |
# HTTPServiceConfigurationSection is messing with proxy. | |
_proxy = None | |
# The resources are started on demand, but for Let's Encrypt we want | |
# to be a service and a resource. | |
enabled = WritableBoolean('enabled') | |
debug = WritableBoolean('debug') | |
acme_url = WritableString('acme_url') | |
contact_email = WritableStringOrNone('contact_email') | |
redirect_url = WritableString('redirect_url') | |
_account_key = WritableString('_account_key') | |
_storage = WritableString('_storage') | |
@property | |
def use_ssl(self): | |
""" | |
See `IServiceConfiguration`. | |
""" | |
return False | |
# FIXME:2599: | |
# This should be removed once service configuration is flat. | |
class _ServiceConfgurationProxy(object): | |
""" | |
This tried to simulate the service configuration in which the | |
configuration if found at 2 levels: base and base.configuration. | |
For Let's Encrypt we only have `base`. | |
""" | |
_OWN_ATTRIBUTES = ['parent'] | |
def __init__(self, context): | |
self._me = context | |
def __getattr__(self, attr): | |
if attr in self._OWN_ATTRIBUTES: | |
target = self | |
else: | |
target = self._me | |
return getattr(target, attr) | |
@property | |
def parent(self): | |
return self._me | |
@implementer(ILetsEncryptClientResource) | |
class LetsEncryptClientResource(Runnable): | |
""" | |
Manage certificate issued by Let's Encrypt. | |
It uses the configured account key to register/update an account with that | |
key. | |
It is an HTTP server for HTTP-01 challenge and ACME client. | |
The certificate storage is available even when the resource is stopped. | |
""" | |
PROPERTIES_REQUIRING_RESTART = ( | |
HTTPServiceConfigurationSection.PROPERTIES_REQUIRING_RESTART + ( | |
'acme_url', | |
'address', | |
'port', | |
'redirect_url', | |
) | |
) | |
# Size of the account key when auto-generated. | |
_KEY_SIZE = 2048 | |
_JWSClient = JWSClient | |
def __init__(self, parent, configuration): | |
super(LetsEncryptClientResource, self).__init__(parent, configuration) | |
self._resetState() | |
def _resetState(self): | |
""" | |
Reset the internal state. | |
""" | |
self._http_server = None | |
self._agent = None | |
# The high level ACME service taking care of updating the certs. | |
self._acme_service = None | |
# The acme client used to make high level requests. | |
self._client = None | |
# The HTTP Server. | |
self._factory = LetsEncryptHTTPFactory( | |
_ServiceConfgurationProxy(self.configuration), | |
on_error=self.fail, | |
) | |
# The storage for the domain certificates. | |
self._store = CertificateStore( | |
self.configuration, | |
on_update=self._onUpdate, | |
domains_getter=self._getConfiguredDomains, | |
) | |
def _onStart(self, avatar=None): | |
""" | |
See: Runnable. | |
""" | |
self._resetState() | |
if self._JWSClient is None: | |
raise ServerException( | |
'Let\'s Encrypt is not yet supported on this OS.') | |
if not self.configuration.acme_url: | |
raise ServerException( | |
'Missing URL to the Let\'s Encrypt Server Directory.') | |
endpoint = TCP4ServerEndpoint( | |
reactor=self._scheduler, | |
interface=self.configuration.address, | |
port=self.configuration.port, | |
) | |
deferred = listen(endpoint, self._factory, self.configuration.port) | |
deferred.addErrback(self._ebHTTPServerStarted) | |
deferred.addCallback(self._cbHTTPServerStarted) | |
return deferred | |
def _ebHTTPServerStarted(self, failure): | |
""" | |
Called when failing to start the HTTP server. | |
""" | |
raise ServerException( | |
u'Failed to start the local HTTP-01 challenge server. %s' % ( | |
force_unicode(failure.value),)) | |
def _cbHTTPServerStarted(self, port): | |
""" | |
Called when HTTP server was started. | |
""" | |
self._http_server = port | |
return self._startACMEClient() | |
def _startACMEClient(self): | |
""" | |
Start the ACME client service. | |
""" | |
acme_key = self._getAccountKey() | |
self._agent = PersistentAgent( | |
event_emitter=self.emitEvent, | |
ssl_context=SSL.Context(SSL.SSLv23_METHOD), | |
) | |
jws_client = JWSClient( | |
agent=self._agent, key=acme_key, alg=RS256) | |
def get_message(failure): | |
""" | |
Return the best human readable message we can get out of failure. | |
""" | |
error = failure.value | |
if failure.check(ClientError): | |
try: | |
error = error.args[0] | |
except IndexError: | |
pass | |
if IResponse.providedBy(failure.value.args[0]): | |
response = failure.value.args[0] | |
error = 'Server responded %s:%s' % ( | |
response.code, response.phrase) | |
return force_unicode(error) | |
def cb_client_creation(client): | |
""" | |
Called when the client has validated the ACME server. | |
""" | |
self._client = client | |
self._acme_service = AcmeIssuingService( | |
cert_store=self._store, | |
client=self._client, | |
email=self.configuration.contact_email, | |
clock=self._scheduler, | |
responders=[self._factory.responder], | |
panic=self._onPanic, | |
) | |
deferred = self._acme_service.start() | |
deferred.addErrback(eb_service_start) | |
return deferred | |
def eb_client_creation(failure): | |
""" | |
Called when failing to create the client. | |
""" | |
raise ServerException('Failed to discover the ACME server. %s' % ( | |
get_message(failure),)) | |
def eb_service_start(failure): | |
""" | |
Called when the service fails to start.. | |
""" | |
raise ServerException('Failed to register the ACME server. %s' % ( | |
get_message(failure),)) | |
# To simplify the upgrade, we try to automatically migrate from | |
# public V1 to V2. | |
acme_url = self.configuration.acme_url | |
if acme_url.lower() == 'https://acme-v1.api.letsencrypt.org/directory': | |
acme_url = u'https://acme-v02.api.letsencrypt.org/directory' | |
self.configuration.acme_url = acme_url | |
deferred = Client.from_url( | |
self._scheduler, | |
URL.fromText(acme_url), | |
key=acme_key, | |
alg=RS256, | |
jws_client=jws_client, | |
) | |
deferred.addErrback(eb_client_creation) | |
deferred.addCallback(cb_client_creation) | |
return deferred | |
@defer.inlineCallbacks | |
def _onStop(self, avatar=None): | |
""" | |
See: Runnable. | |
""" | |
if self._agent: | |
yield self._agent.closePersistentConnections() | |
self._agent = None | |
if self._acme_service: | |
yield self._acme_service.stopService() | |
self._acme_service = None | |
if self._http_server: | |
yield self._http_server.stopListening() | |
def getCertificatePEMs(self, name, sans=None): | |
""" | |
Return a PEM serialized of all the certificate objects for `name`. | |
This include cert itself, key and cert chain. | |
This will always return a certificate. | |
If no certificate is yet found, it will return a self-signed one. | |
""" | |
if not sans: | |
sans = [] | |
# Is important to join using a comma and space, as this is the | |
# normalized name of the certificate. | |
certificate_name = u', '.join([name] + sans) | |
def cb_store_get(pem_objects): | |
""" | |
Called when we got valid object in the cert storage. | |
""" | |
if not pem_objects: | |
# No certificate yet...but should get one soon. | |
return _PEMS_PLACEHOLDER | |
return u'\n'.join(o.as_bytes() for o in pem_objects) | |
def eb_store_get(failure): | |
""" | |
Called when `name` is not in the certificate storage. | |
""" | |
failure.trap(ServerException) | |
if self._acme_service: | |
# Service is started. | |
# Trigger a new cert create. Don't wait for it. | |
# Make sure errors are handled. | |
deferred = self._acme_service.issue_cert(certificate_name) | |
deferred.addErrback( | |
lambda failure: self._onPanic(failure, certificate_name)) | |
# Here to help with testing. | |
self._ongoing_new_cert = deferred | |
return _PEMS_PLACEHOLDER | |
deferred = self._store.get(certificate_name) | |
deferred.addCallback(cb_store_get) | |
deferred.addErrback(eb_store_get) | |
# This is a hack as `getCertificatePEMs` is a sync call. | |
# But by the time we are here, we should already have a result: the | |
# real one or the placeholder. | |
return deferred.result | |
def _getDomainsConfiguration(self): | |
""" | |
Return a dict with mapping of the configured domains to the targeted | |
services. | |
""" | |
result = {} | |
for service in self.root.services.getAll(): | |
configuration = service.configuration.configuration | |
if not ISSLOptions.providedBy(configuration): | |
continue | |
if not configuration.ssl_domains: | |
continue | |
# Is important to join using a comma and space, as this is the | |
# normalized name of the certificate. | |
domains = ', '.join(configuration.ssl_domains) | |
services = result.get(domains, []) | |
services.append(service) | |
result[domains] = services | |
return result | |
def _getConfiguredDomains(self): | |
""" | |
Return the list of the domains configured for the services and | |
which need to be managed by the resource. | |
""" | |
return self._getDomainsConfiguration().keys() | |
def _onUpdate(self, certificate_name, pem_objects): | |
""" | |
Called when we got updated certificates/keys for a certificate. | |
""" | |
for name, services in self._getDomainsConfiguration().items(): | |
if name != certificate_name: | |
continue | |
for service in services: | |
update = ''.join(o.as_bytes() for o in pem_objects) | |
# The service runnable has a service configuration which | |
# has the configuration section for the protocol. | |
# We don't save the configuration as it is saved | |
# in another call. | |
service.configuration.configuration.ssl_certificate = update | |
service.configuration.configuration.ssl_key = '' | |
self.emitEvent( | |
'20016', | |
data={'domains': certificate_name, 'service': service.name} | |
) | |
if not service.restart_required: | |
# We either got the same cert, | |
# but most probably the service is stopped. | |
continue | |
service.restart() | |
def _getAccountKey(self): | |
""" | |
Return the JWK key used by the acme client account. | |
This can take some time to run and will block the server when | |
generating a new key, but we expect that the key is not generated | |
often so for now, don't bother with threading. | |
""" | |
content = self.configuration._account_key | |
if content: | |
try: | |
key = serialization.load_pem_private_key( | |
content.encode('ascii'), | |
password=None, | |
backend=DEFAULT_BACKEND, | |
) | |
except Exception as error: | |
raise ServerException( | |
'Failed to load the account key. %s' % (error,)) | |
else: | |
try: | |
key = rsa.generate_private_key( | |
public_exponent=65537, | |
key_size=self._KEY_SIZE, | |
backend=DEFAULT_BACKEND, | |
) | |
account_key = key.private_bytes( | |
encoding=serialization.Encoding.PEM, | |
format=serialization.PrivateFormat.TraditionalOpenSSL, | |
encryption_algorithm=serialization.NoEncryption(), | |
) | |
self._configuration._account_key = account_key | |
self._configuration._proxy.save() | |
except Exception as error: | |
raise ServerException( | |
'Failed to generate and save the account key. %s' % ( | |
error,)) | |
return JWKRSA(key=key) | |
def _onPanic(self, failure, certificate_name): | |
""" | |
Called when failing to get a certificate for `domain`. | |
""" | |
if failure.check(defer.FirstError): | |
failure = failure.value.subFailure | |
self.emitEvent( | |
'20017', | |
data={ | |
'domains': certificate_name, | |
'details': force_unicode(failure.value), | |
}, | |
) | |
class LetsEncryptHTTPFactory(HTTPFactoryBase): | |
""" | |
A simple HTTP site/factory which does Let's Encrypt over HTTP-01. | |
""" | |
def __init__(self, service_configuration, on_error): | |
self._responder = HTTP01ChevahResponder() | |
super(LetsEncryptHTTPFactory, self).__init__( | |
service_configuration, on_error) | |
@property | |
def responder(self): | |
return self._responder | |
def getRootResource(self): | |
""" | |
Root is empty and only has .well-known/acme-challenge/ . | |
""" | |
redirect_url = self.service_configuration.redirect_url | |
if redirect_url: | |
root = RedirectResource( | |
url=redirect_url, code=http.MOVED_PERMANENTLY) | |
else: | |
root = StaticTextResource('') | |
well_known = StaticTextResource('') | |
root.putChild('.well-known', well_known) | |
well_known.putChild('acme-challenge', self._responder) | |
return root | |
@implementer(IResponder) | |
class HTTP01ChevahResponder(StaticTextResource): | |
""" | |
Web resource for ``http-01`` challenge responder. | |
Beside the challenge pages, it displays empty pages. | |
""" | |
challenge_type = u'http-01' | |
def __init__(self): | |
super(HTTP01ChevahResponder, self).__init__('') | |
# Add a static response to help with connection troubleshooting. | |
self.putChild(b'test.txt', StaticTextResource('Let\'s Encrypt Ready')) | |
def start_responding(self, server_name, challenge, response): | |
""" | |
Prepare for the ACME server to validate the challenge. | |
""" | |
self.putChild( | |
challenge.encode('token').encode('utf-8'), | |
StaticTextResource(response.key_authorization), | |
) | |
def stop_responding(self, server_name, challenge, response): | |
""" | |
Remove the child resource once the process is done. | |
""" | |
encoded_token = challenge.encode('token').encode('utf-8') | |
if self.getStaticEntity(encoded_token) is not None: | |
self.delEntity(encoded_token) | |
@implementer(ICertificateStore) | |
class CertificateStore(object): | |
""" | |
A certificate store that keeps certificates in the configuration as a | |
single value. | |
For certificates with multiple domains, the key is a comma separated list | |
of domains (with a space after the comma) | |
Return a `ServerException` failure when a requested domain has no | |
certificate. | |
""" | |
def __init__(self, configuration, on_update, domains_getter): | |
self._storage = {} | |
self._configuration = configuration | |
self._onUpdate = on_update | |
self._getConfiguredDomains = domains_getter | |
self._load() | |
def _getNormalizedName(self, name): | |
""" | |
Return the normalized comma separated certificate name. | |
""" | |
domains = [] | |
for part in name.split(','): | |
part = part.strip() | |
if not part: | |
continue | |
domains.append(part) | |
return ', '.join(domains) | |
def as_dict(self): | |
""" | |
Return all the managed certificates as a dict, | |
indexed by the certificate name. | |
It will not return certificates which were managed in the past | |
for which no longer have a service using them. | |
This is done so that when a new check is triggered for the certs, | |
old certs are not scheduled for renew. | |
""" | |
self._refresh() | |
return defer.succeed(self._storage.copy()) | |
def get(self, certificate_name): | |
""" | |
Get the X.509 objects for `certificate_name`. | |
""" | |
certificate_name = self._getNormalizedName(certificate_name) | |
try: | |
return defer.succeed(self._storage[certificate_name]) | |
except KeyError: | |
return defer.fail(ServerException( | |
'No certificate for "%s".' % (certificate_name,))) | |
def store(self, certificate_name, pem_objects): | |
""" | |
Keep and persist the X.509 objects for `certificate_name`. | |
""" | |
certificate_name = self._getNormalizedName(certificate_name) | |
self._storage[certificate_name] = pem_objects | |
self._save() | |
self._onUpdate(certificate_name, pem_objects) | |
return defer.succeed(None) | |
def _refresh(self): | |
""" | |
Update the domains which needs to be managed. | |
""" | |
names = self._getConfiguredDomains() | |
# Delete domains which are no longer handled. | |
existing_domains = list(self._storage.keys()) | |
for domain in existing_domains: | |
if domain not in names: | |
del self._storage[domain] | |
for domain in names: | |
if domain not in existing_domains: | |
self._storage[domain] = [] | |
def _load(self): | |
""" | |
Populate the storage with the persisted PEM objects. | |
""" | |
lines = [] | |
cert_names = None | |
lines = self._configuration._storage.splitlines() | |
if not lines: | |
# Cache is completely empty. | |
return | |
storage_acme_url = lines[0].strip() | |
if storage_acme_url != self._configuration.acme_url: | |
# We have a new acme_url so all certificates should be reloaded. | |
# So we don't load anything as storage values are no longer valid. | |
return | |
for line in lines[1:]: | |
if '|||' in line: | |
# We got a certificate delimiter. | |
if cert_names: | |
# Load the current certificate. | |
self._storage[cert_names] = pem.parse('\n'.join(lines)) | |
# Start a new certificate. | |
# The certificate name is guarded by '|||'. | |
lines = [] | |
cert_names = self._getNormalizedName( | |
line.strip()[3:-3].strip()) | |
continue | |
lines.append(line) | |
if cert_names: | |
# Load the last certificate. | |
self._storage[cert_names] = pem.parse('\n'.join(lines)) | |
def _save(self): | |
""" | |
Persist the current domains to the configuration. | |
""" | |
serialization = [] | |
for key, value in self._storage.items(): | |
cert_names = '|||%s|||\n' % (key,) | |
serialization.append( | |
cert_names + ''.join(o.as_bytes() for o in value) + '\n\n') | |
acme_url = self._configuration.acme_url + '\n' | |
self._configuration._storage = acme_url + '\n'.join(serialization) | |
self._configuration._proxy.save() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright (c) 2018 Adi Roiban. | |
# See LICENSE for details. | |
""" | |
Tests for Let's Encrypt resource. | |
""" | |
from __future__ import unicode_literals | |
import pem | |
from twisted.internet import reactor | |
from twisted.web import http | |
from twisted.web.client import Agent | |
try: | |
from acme import challenges | |
except ImportError: | |
challenges = None | |
from chevah.server import json | |
from chevah.server.commons.constant import ( | |
CONFIGURATION_DEFAULTS, | |
TYPE_NAME, | |
) | |
from chevah.server.commons.exception import ( | |
NoSuchAttributeError, | |
) | |
from chevah.server.commons.interface import ILetsEncryptClientConfiguration | |
from chevah.server.resource.lets_encrypt import ( | |
CertificateStore, | |
HTTP01ChevahResponder, | |
LetsEncryptClientConfiguration, | |
LetsEncryptClientResource, | |
LetsEncryptHTTPFactory, | |
_PEMS_PLACEHOLDER, | |
) | |
from chevah.server.testing import ( | |
attr, | |
HTTPServerContext, | |
ResponseDefinition, | |
WithProcessTestCase, | |
mk, | |
) | |
from chevah.server.testing.constant import SSL_DATA | |
from chevah.server.testing.proto_helpers import InMemoryTCPReactor | |
def setup_module(): | |
""" | |
Let's encrypt is not available on all systems. | |
""" | |
if challenges is None: | |
raise WithProcessTestCase.skipTest('No lets-encrypt support.') | |
class TestLetsEncryptClientConfiguration(WithProcessTestCase): | |
""" | |
Tests for LetsEncryptClientConfiguration. | |
""" | |
def test_init(self): | |
""" | |
It is initialized with a .ini backend. | |
""" | |
uuid = unicode(mk.uuid4()) | |
content = ( | |
'[resources/%s]\n' | |
'enabled: no\n' | |
'name: some-name\n' | |
'type: lets-encrypt\n' | |
'description: bla bla\n' | |
'address: 1.2.3.5\n' | |
'port: 2532\n' | |
'acme_url: \thttp://acme.com/directory \n' | |
'redirect_url: \thttps://ftp.domain.com/some-path/ \n' | |
'debug: yes\n' | |
'_account_key: some-key\n' | |
'_storage: some-storage\n' | |
) % (uuid,) | |
proxy = mk.makeFileConfigurationProxy( | |
content, defaults=CONFIGURATION_DEFAULTS) | |
parent = self.Bunch(_proxy=proxy, parent=None) | |
sut = LetsEncryptClientConfiguration(parent, uuid) | |
self.assertFalse(sut.enabled) | |
self.assertEqual('some-name', sut.name) | |
self.assertEqual('bla bla', sut.description) | |
self.assertEqual('1.2.3.5', sut.address) | |
self.assertEqual(2532, sut.port) | |
self.assertProvides(ILetsEncryptClientConfiguration, sut) | |
self.assertEqual('http://acme.com/directory', sut.acme_url) | |
self.assertEqual('https://ftp.domain.com/some-path/', sut.redirect_url) | |
self.assertIsTrue(sut.debug) | |
# The private values are also loaded. | |
self.assertEqual('some-key', sut._account_key) | |
self.assertEqual('some-storage', sut._storage) | |
self.assertFalse(sut.use_ssl) | |
def test_register_default(self): | |
""" | |
It is registered and can be created a a property with default values. | |
""" | |
sut = self.process.configuration.resources.createProperty( | |
'', {'type': TYPE_NAME.LETS_ENCRYPT}) | |
self.assertTrue(sut.enabled) | |
self.assertProvides(ILetsEncryptClientConfiguration, sut) | |
self.assertEqual( | |
'https://acme-v02.api.letsencrypt.org/directory', sut.acme_url) | |
self.assertEqual('', sut._account_key) | |
self.assertEqual('', sut._storage) | |
self.assertEqual('', sut.name) | |
self.assertEqual('', sut.description) | |
self.assertEqual('127.0.0.1', sut.address) | |
self.assertEqual(80, sut.port) | |
self.assertEqual('', sut.redirect_url) | |
self.assertIsFalse(sut.debug) | |
def test_private_fields(self): | |
""" | |
The private members are not available to public API | |
""" | |
resources = self.process.configuration.resources | |
with self.assertRaises(NoSuchAttributeError): | |
resources.createProperty( | |
'', | |
{'type': TYPE_NAME.LETS_ENCRYPT, '_account_key': 'something'}) | |
with self.assertRaises(NoSuchAttributeError): | |
resources.createProperty( | |
'', | |
{'type': TYPE_NAME.LETS_ENCRYPT, '_storage': 'something'}) | |
sut = resources.createProperty( | |
'', {'type': TYPE_NAME.LETS_ENCRYPT}) | |
sut._account_key = 'new-account-key' | |
sut._stroage = 'new-storage' | |
result = sut.getProperty() | |
self.assertNotContains('_account_key', result) | |
self.assertNotContains('_storage', result) | |
class TestHTTP01ChevahResponder(WithProcessTestCase): | |
""" | |
Unit tests for HTTP01ChevahResponder. | |
""" | |
def test_init(self): | |
""" | |
Is initialized like a resource with empty body and has a test child. | |
""" | |
request = mk.makeTwistedWebRequest() | |
sut = HTTP01ChevahResponder() | |
test_child = sut.getChildWithDefault(b'test.txt', request) | |
result = test_child.render(request) | |
self.assertEqual(b'Let\'s Encrypt Ready', result) | |
self.assertEqual(http.OK, request.code) | |
other_child = sut.getChildWithDefault(b'other-child', request) | |
code = other_child.headersReceived(request) | |
self.assertEqual(http.NOT_FOUND, code) | |
def test_start_responding_stop_responding(self): | |
""" | |
It will make the challenge available over HTTP GET and will make it | |
unavailable once stopped. | |
""" | |
token = 'bla-\N{sun}' | |
token_base64 = b'YmxhLeKYiQ' | |
request = mk.makeTwistedWebRequest(method=b'GET') | |
sut = HTTP01ChevahResponder() | |
challenge = challenges.HTTP01(token=token.encode('utf-8')) | |
response = challenges.HTTP01Response(key_authorization='tra-\N{leo}') | |
sut.start_responding('domain-ignored', challenge, response) | |
child = sut.getChildWithDefault(token_base64, request) | |
result = child.render(request) | |
self.assertEqual(b'tra-\xe2\x99\x8c', result) | |
self.assertEqual(http.OK, request.code) | |
sut.stop_responding('domain-ignored', challenge, 'ignored') | |
child = sut.getChildWithDefault(token_base64, request) | |
code = child.headersReceived(request) | |
self.assertEqual(http.NOT_FOUND, code) | |
def test_stop_responding_no_start(self): | |
""" | |
Does nothing when stop is called without already having a challenge | |
setup. | |
""" | |
token = 'bla-\N{sun}' | |
token_base64 = b'YmxhLeKYiQ' | |
request = mk.makeTwistedWebRequest(method=b'GET') | |
sut = HTTP01ChevahResponder() | |
challenge = challenges.HTTP01(token=token.encode('utf-8')) | |
sut.stop_responding('domain-ignored', challenge, 'ignored') | |
child = sut.getChildWithDefault(token_base64, request) | |
code = child.headersReceived(request) | |
self.assertEqual(http.NOT_FOUND, code) | |
def fail_on_call(domain, pem_objects): | |
""" | |
Method used to make sure that on_update is not called. | |
""" | |
raise AssertionError('Should not be called.') | |
class TestCertificateStore(WithProcessTestCase): | |
""" | |
Unit tests for CertificateStore. | |
""" | |
def getConfiguration(self, content=''): | |
""" | |
Return the configuration for Lets encrypt resource. | |
""" | |
self.process.reactor = self.clock | |
resources = self.process.configuration.resources | |
resource = resources.createProperty('', { | |
'type': TYPE_NAME.LETS_ENCRYPT, | |
'acme_url': 'http://test.local:123/directory' | |
}) | |
resource._storage = content | |
# Advance the clock to settle configuration updates. | |
self.clock.advance(1) | |
# FIXME:5580: | |
mk.setReWritableBytesIO(self.process.configuration) | |
return resource | |
def test_init_empty(self): | |
""" | |
Is initialized with configuration and event emitted and can read an | |
empty cache. | |
""" | |
configuration = self.getConfiguration(content='') | |
sut = CertificateStore( | |
configuration=configuration, | |
on_update=fail_on_call, | |
domains_getter=lambda: [], | |
) | |
deferred = sut.as_dict() | |
result = self.successResultOf(deferred) | |
self.assertEqual({}, result) | |
def test_init_no_services(self): | |
""" | |
It can be initialized even when we don't have any services. | |
This happens during the main process initialization. | |
""" | |
self.process.services = None | |
configuration = self.getConfiguration(content='') | |
sut = CertificateStore( | |
configuration=configuration, | |
on_update=fail_on_call, | |
domains_getter=lambda: [], | |
) | |
deferred = sut.as_dict() | |
result = self.successResultOf(deferred) | |
self.assertEqual({}, result) | |
def test_as_dict_single_domain(self): | |
""" | |
It can load the storage for a single domain. | |
""" | |
configuration = self.getConfiguration(content=( | |
'http://test.local:123/directory\n' | |
' ||| ftp.example.com |||\n' | |
'-----BEGIN RSA PRIVATE KEY-----\n' | |
'MII private key here\n' | |
'-----END RSA PRIVATE KEY-----\n' | |
'-----BEGIN CERTIFICATE-----\n' | |
'MIIEqz cert here\n' | |
'-----END CERTIFICATE-----\n' | |
)) | |
sut = CertificateStore( | |
configuration=configuration, | |
on_update=fail_on_call, | |
domains_getter=lambda: ['ftp.example.com'], | |
) | |
deferred = sut.as_dict() | |
result = self.successResultOf(deferred) | |
self.assertItemsEqual( | |
['ftp.example.com'], | |
result.keys(), | |
) | |
self.assertEqual(2, len(result['ftp.example.com'])) | |
def test_as_dict_multi_domains(self): | |
""" | |
It can load the storage for multiple domains. | |
""" | |
configuration = self.getConfiguration(content=( | |
'http://test.local:123/directory\n' | |
' ||| ftp.example.com |||\n' | |
'-----BEGIN RSA PRIVATE KEY-----\n' | |
'MII private key here\n' | |
'-----END RSA PRIVATE KEY-----\n' | |
'-----BEGIN CERTIFICATE-----\n' | |
'MIIEqz cert here\n' | |
'-----END CERTIFICATE-----\n' | |
'-----BEGIN CERTIFICATE-----\n' | |
'MIIEqz cert here\n' | |
'-----END CERTIFICATE-----\n' | |
' ||| web.domain.tld |||\n' | |
'-----BEGIN RSA PRIVATE KEY-----\n' | |
'MII private key here\n' | |
'-----END RSA PRIVATE KEY-----\n' | |
' ||| empty.domain.tld |||\n' | |
)) | |
sut = CertificateStore( | |
configuration=configuration, | |
on_update=fail_on_call, | |
domains_getter=lambda: [ | |
'ftp.example.com', 'web.domain.tld', 'empty.domain.tld'], | |
) | |
deferred = sut.as_dict() | |
result = self.successResultOf(deferred) | |
self.assertItemsEqual( | |
['ftp.example.com', 'web.domain.tld', 'empty.domain.tld'], | |
result.keys(), | |
) | |
self.assertEqual(3, len(result['ftp.example.com'])) | |
self.assertEqual(1, len(result['web.domain.tld'])) | |
self.assertEqual(0, len(result['empty.domain.tld'])) | |
def test_as_dict_new_acme_url(self): | |
""" | |
It will invalidate the cache when the configuration is for a different | |
ACME server than the stored certs, and will start the "registered" | |
domains without any certs. | |
""" | |
configuration = self.getConfiguration(content=( | |
'http://NEW.ACME:123/directory\n' | |
' ||| ftp.example.com |||\n' | |
'-----BEGIN RSA PRIVATE KEY-----\n' | |
'MII private key here\n' | |
'-----END RSA PRIVATE KEY-----\n' | |
'-----BEGIN CERTIFICATE-----\n' | |
'MIIEqz cert here\n' | |
'-----END CERTIFICATE-----\n' | |
'-----BEGIN CERTIFICATE-----\n' | |
'MIIEqz cert here\n' | |
'-----END CERTIFICATE-----\n' | |
' ||| web.domain.tld |||\n' | |
'-----BEGIN RSA PRIVATE KEY-----\n' | |
'MII private key here\n' | |
'-----END RSA PRIVATE KEY-----\n' | |
' ||| empty.domain.tld |||\n' | |
)) | |
sut = CertificateStore( | |
configuration=configuration, | |
on_update=fail_on_call, | |
domains_getter=lambda: [ | |
'ftp.example.com', 'web.domain.tld', 'empty.domain.tld'], | |
) | |
deferred = sut.as_dict() | |
result = self.successResultOf(deferred) | |
self.assertEqual({ | |
'empty.domain.tld': [], | |
'ftp.example.com': [], | |
'web.domain.tld': [], | |
}, result) | |
def test_as_dict_different_domains(self): | |
""" | |
It will not load the storage for domains for which we no longer have | |
a service. | |
New domains are "registered" without any object. | |
""" | |
configuration = self.getConfiguration(content=( | |
'http://test.local:123/directory\n' | |
' ||| ftp.example.com , sftp.example.com |||\n' | |
'-----BEGIN RSA PRIVATE KEY-----\n' | |
'MII private key here\n' | |
'-----END RSA PRIVATE KEY-----\n' | |
'-----BEGIN CERTIFICATE-----\n' | |
'MIIEqz cert here\n' | |
'-----END CERTIFICATE-----\n' | |
'-----BEGIN CERTIFICATE-----\n' | |
'MIIEqz cert here\n' | |
'-----END CERTIFICATE-----\n' | |
' ||| web.domain.tld |||\n' | |
'-----BEGIN RSA PRIVATE KEY-----\n' | |
'MII private key here\n' | |
'-----END RSA PRIVATE KEY-----\n' | |
' ||| empty.domain.tld |||\n' | |
)) | |
sut = CertificateStore( | |
configuration=configuration, | |
on_update=fail_on_call, | |
domains_getter=lambda: [ | |
'ftp.example.com, sftp.example.com', | |
'other.domain.tld'], | |
) | |
deferred = sut.as_dict() | |
result = self.successResultOf(deferred) | |
self.assertItemsEqual( | |
['ftp.example.com, sftp.example.com', 'other.domain.tld'], | |
result.keys()) | |
self.assertEqual(3, len(result['ftp.example.com, sftp.example.com'])) | |
self.assertEqual(0, len(result['other.domain.tld'])) | |
def test_get_not_found_empty(self): | |
""" | |
Return a failure when could not found X.509 objects for a name. | |
""" | |
configuration = self.getConfiguration(content='') | |
sut = CertificateStore( | |
configuration=configuration, | |
on_update=fail_on_call, | |
domains_getter=lambda: ['any-name'], | |
) | |
deferred = sut.get('any-name') | |
failure = self.failureResultOf(deferred) | |
self.assertServerException( | |
'No certificate for "any-name".', | |
failure.value, | |
) | |
def test_get_not_found(self): | |
""" | |
Return a failure when could not found X.509 objects for a name. | |
""" | |
configuration = self.getConfiguration(content=( | |
'http://test.local:123/directory\n' | |
' ||| ftp.example.com |||\n' | |
'-----BEGIN RSA PRIVATE KEY-----\n' | |
'MII private key here\n' | |
'-----END RSA PRIVATE KEY-----\n' | |
'-----BEGIN CERTIFICATE-----\n' | |
'MIIEqz cert here\n' | |
'-----END CERTIFICATE-----\n' | |
)) | |
sut = CertificateStore( | |
configuration=configuration, | |
on_update=fail_on_call, | |
domains_getter=lambda: ['ftp.example.com'], | |
) | |
deferred = sut.get('any-name') | |
failure = self.failureResultOf(deferred) | |
self.assertServerException( | |
'No certificate for "any-name".', | |
failure.value, | |
) | |
def test_get_found(self): | |
""" | |
Return a deferred with the x.509 objects for. | |
""" | |
configuration = self.getConfiguration(content=( | |
'http://test.local:123/directory\n' | |
' ||| ftp.example.com |||\n' | |
'-----BEGIN RSA PRIVATE KEY-----\n' | |
'MII private key here\n' | |
'-----END RSA PRIVATE KEY-----\n' | |
'-----BEGIN CERTIFICATE-----\n' | |
'MIIEqz cert here\n' | |
'-----END CERTIFICATE-----\n' | |
' ||| http.example.com , www.example.com |||\n' | |
'-----BEGIN CERTIFICATE-----\n' | |
'MIIEqz cert here\n' | |
'-----END CERTIFICATE-----\n' | |
)) | |
sut = CertificateStore( | |
configuration=configuration, | |
on_update=fail_on_call, | |
domains_getter=lambda: ['ftp.example.com'], | |
) | |
deferred = sut.get('ftp.example.com') | |
result = self.successResultOf(deferred) | |
self.assertEqual(2, len(result)) | |
deferred = sut.get('http.example.com, www.example.com') | |
result = self.successResultOf(deferred) | |
self.assertEqual(1, len(result)) | |
def test_store_new(self): | |
""" | |
It will add a new entry for a new server name and save it to the | |
configuration. | |
""" | |
updates = [] | |
certs = ( | |
'-----BEGIN RSA PRIVATE KEY-----\n' | |
'MII private key here\n' | |
'-----END RSA PRIVATE KEY-----\n' | |
'-----BEGIN CERTIFICATE-----\n' | |
'MIIEqz cert here\n' | |
'-----END CERTIFICATE-----\n' | |
) | |
configuration = self.getConfiguration(content='') | |
sut = CertificateStore( | |
configuration=configuration, | |
on_update=lambda d, o: updates.append((d, o)), | |
domains_getter=lambda: ['new.example.com'], | |
) | |
pem_objects = pem.parse(certs) | |
sut.store('new.example.com', pem_objects) | |
deferred = sut.get('new.example.com') | |
result = self.successResultOf(deferred) | |
self.assertEqual(2, len(result)) | |
deferred = sut.as_dict() | |
result = self.successResultOf(deferred) | |
self.assertItemsEqual( | |
['new.example.com'], | |
result.keys(), | |
) | |
self.assertEqual(( | |
'http://test.local:123/directory\n' + | |
'|||new.example.com|||\n' + | |
certs + '\n\n' | |
), | |
configuration._storage) | |
self.assertEqual([('new.example.com', pem_objects)], updates) | |
def test_store_multi_domain(self): | |
""" | |
It will add a new entry for multi domain certificate and will | |
normalize the domains comma separated list definition | |
""" | |
updates = [] | |
certs = ( | |
'-----BEGIN CERTIFICATE-----\n' | |
'MIIEqz cert here\n' | |
'-----END CERTIFICATE-----\n' | |
) | |
configuration = self.getConfiguration(content='') | |
sut = CertificateStore( | |
configuration=configuration, | |
on_update=lambda d, o: updates.append((d, o)), | |
domains_getter=lambda: [ | |
'new.example.com, ftp.example.com, example.com'], | |
) | |
# We store with not normalized names. | |
pem_objects = pem.parse(certs) | |
sut.store( | |
' new.example.com,ftp.example.com , example.com ', pem_objects) | |
deferred = sut.get('new.example.com, ftp.example.com, example.com') | |
result = self.successResultOf(deferred) | |
self.assertEqual(1, len(result)) | |
deferred = sut.as_dict() | |
result = self.successResultOf(deferred) | |
self.assertItemsEqual( | |
['new.example.com, ftp.example.com, example.com'], | |
result.keys(), | |
) | |
self.assertEqual(( | |
'http://test.local:123/directory\n' + | |
'|||new.example.com, ftp.example.com, example.com|||\n' + | |
certs + '\n\n' | |
), | |
configuration._storage) | |
self.assertEqual( | |
[('new.example.com, ftp.example.com, example.com', pem_objects)], | |
updates) | |
def test_store_update(self): | |
""" | |
It will replace the cert for an existing domain and save it to the | |
configuration. | |
""" | |
updates = [] | |
existing = ( | |
'http://test.local:123/directory\n' | |
'|||ftp.example.com, sftp.example.com |||\n' | |
'-----BEGIN RSA PRIVATE KEY-----\n' | |
'MII private key here\n' | |
'-----END RSA PRIVATE KEY-----\n' | |
'-----BEGIN CERTIFICATE-----\n' | |
'MIIEqz cert here\n' | |
'-----END CERTIFICATE-----\n' | |
' ||| empty.domain.tld |||\n' | |
) | |
update = ( | |
'-----BEGIN CERTIFICATE-----\n' | |
'MIIEqz New CERTS\n' | |
'-----END CERTIFICATE-----\n' | |
) | |
configuration = self.getConfiguration(content=existing) | |
sut = CertificateStore( | |
configuration=configuration, | |
on_update=lambda d, o: updates.append((d, o)), | |
domains_getter=lambda: [ | |
'ftp.example.com, sftp.example.com', 'empty.domain.tld'], | |
) | |
pem_objects = pem.parse(update) | |
sut.store('ftp.example.com, sftp.example.com', pem_objects) | |
deferred = sut.get('ftp.example.com, sftp.example.com') | |
result = self.successResultOf(deferred) | |
self.assertEqual(1, len(result)) | |
deferred = sut.as_dict() | |
result = self.successResultOf(deferred) | |
self.assertItemsEqual( | |
['ftp.example.com, sftp.example.com', 'empty.domain.tld'], | |
result.keys(), | |
) | |
self.assertEqual(( | |
'http://test.local:123/directory\n' + | |
'|||ftp.example.com, sftp.example.com|||\n' + | |
update + '\n\n\n' | |
'|||empty.domain.tld|||\n\n\n' | |
), | |
configuration._storage) | |
self.assertEqual( | |
[('ftp.example.com, sftp.example.com', pem_objects)], updates) | |
class TestLetsEncryptHTTPFactory(WithProcessTestCase): | |
""" | |
Unit tests for LetsEncryptHTTPFactory. | |
""" | |
def test_init(self): | |
""" | |
It is initialized with the resource configuration. | |
""" | |
errors = [] | |
configuration = self.process.configuration.resources.createProperty( | |
'', {'type': TYPE_NAME.LETS_ENCRYPT}) | |
sut = LetsEncryptHTTPFactory( | |
service_configuration=configuration, | |
on_error=lambda error: errors.append(error), | |
) | |
# Responder is available... so that we can inject responses. | |
self.assertIsInstance(HTTP01ChevahResponder, sut.responder) | |
# The responder is available at expected URL. | |
request = mk.makeTwistedWebRequest( | |
uri='/.well-known/acme-challenge/test.txt') | |
test_child = self.successResultOf(sut.getResourceFor(request)) | |
result = test_child.render(request) | |
self.assertEqual(b'Let\'s Encrypt Ready', result) | |
self.assertEqual(http.OK, request.code) | |
# Other pages got page not found. | |
request = mk.makeTwistedWebRequest([b'other', 'page']) | |
deferred = sut.getResourceFor(request) | |
resource = self.successResultOf(deferred) | |
code = resource.headersReceived(request) | |
self.assertEqual(http.NOT_FOUND, code) | |
# Root page is empty. | |
request = mk.makeTwistedWebRequest([b'']) | |
deferred = sut.getResourceFor(request) | |
resource = self.successResultOf(deferred) | |
result = resource.render(request) | |
self.assertEqual(b'', result) | |
self.assertEqual(http.OK, request.code) | |
# Base ACME folder is empty. | |
request = mk.makeTwistedWebRequest([b'.well-known', 'acme-challenge']) | |
deferred = sut.getResourceFor(request) | |
resource = self.successResultOf(deferred) | |
result = resource.render(request) | |
self.assertEqual(b'', result) | |
self.assertEqual(http.OK, request.code) | |
def test_init_redirection(self): | |
""" | |
When configured to redirect requests, it will server ACME challenges | |
but redirect any other request. | |
""" | |
errors = [] | |
configuration = self.process.configuration.resources.createProperty( | |
'', {'type': TYPE_NAME.LETS_ENCRYPT, 'redirect_url': 'http://a.c'}) | |
sut = LetsEncryptHTTPFactory( | |
service_configuration=configuration, | |
on_error=lambda error: errors.append(error), | |
) | |
# Responder is available... so that we can inject responses. | |
self.assertIsInstance(HTTP01ChevahResponder, sut.responder) | |
# The responder is available at expected URL. | |
request = mk.makeTwistedWebRequest( | |
uri='/.well-known/acme-challenge/test.txt') | |
test_child = self.successResultOf(sut.getResourceFor(request)) | |
result = test_child.render(request) | |
self.assertEqual(b'Let\'s Encrypt Ready', result) | |
self.assertEqual(http.OK, request.code) | |
# Other pages are redirected. | |
request = mk.makeTwistedWebRequest([b'other', 'page']) | |
deferred = sut.getResourceFor(request) | |
resource = self.successResultOf(deferred) | |
result = resource.render(request) | |
self.assertEqual(http.MOVED_PERMANENTLY, request.code) | |
# Root page is a redirection. | |
request = mk.makeTwistedWebRequest([b'']) | |
deferred = sut.getResourceFor(request) | |
resource = self.successResultOf(deferred) | |
result = resource.render(request) | |
self.assertContains(b'<a href="http://a.c">click here</a>\n', result) | |
self.assertEqual(http.MOVED_PERMANENTLY, request.code) | |
# Base ACME folder is empty. | |
request = mk.makeTwistedWebRequest([b'.well-known', 'acme-challenge']) | |
deferred = sut.getResourceFor(request) | |
resource = self.successResultOf(deferred) | |
result = resource.render(request) | |
self.assertEqual(b'', result) | |
self.assertEqual(http.OK, request.code) | |
class TestLetsEncryptClientResource(WithProcessTestCase): | |
""" | |
Tests for Lets's Encrypt HTTP site which servers as the Let's Encrypt | |
client service. | |
""" | |
DIRECTORY_BODY = { | |
"keyChange": "http://%(address)s/acme/rollover-account-key", | |
"meta": { | |
"caaIdentities": ["letsencrypt.org"], | |
"termsOfService": "http://%(address)s/tos.txt", | |
"website": "http://%(address)s/website" | |
}, | |
"newOrder": "http://%(address)s/acme/new-order", | |
"newAccount": "http://%(address)s/acme/new-account", | |
"newNonce": "http://%(address)s/acme/new-nonce", | |
"revokeCert": "http://%(address)s/acme/revoke-cert", | |
} | |
def getResource( | |
self, | |
configuration={}, | |
account_key=SSL_DATA['2048_KEY_DATA'], | |
raw_storage=None, | |
fake_reactor=True, | |
): | |
""" | |
Return a new LetsEncryptHTTPFactory which is already set for a domain. | |
By default it has the `localhost` domain with a cert which is already | |
stored and don't need an update. | |
""" | |
self.clock = InMemoryTCPReactor(expected_addresses=[('127.0.0.1', 80)]) | |
self.process.reactor = self.clock | |
resources = self.process.configuration.resources | |
self.process.configuration.services.createProperty('', { | |
'type': TYPE_NAME.HTTPS, | |
'configuration/ssl_domains': ['localhost'], | |
}) | |
new_configuration = { | |
'type': TYPE_NAME.LETS_ENCRYPT, | |
'acme_url': 'http://127.0.0.1/directory', | |
'address': '127.0.0.1', | |
'port': 80, | |
} | |
new_configuration.update(configuration) | |
resource = resources.createProperty('', new_configuration) | |
if raw_storage is None: | |
raw_storage = ( | |
new_configuration['acme_url'] + '\n' + | |
'|||localhost|||\n' + | |
SSL_DATA['SERVER_CERT_DATA'] | |
) | |
resource._account_key = account_key | |
resource._storage = raw_storage | |
# Let the configuration updates to settle. | |
self.clock.advance(1) | |
result = LetsEncryptClientResource( | |
parent=resources, | |
configuration=resource, | |
) | |
if fake_reactor: | |
result._scheduler = self.clock | |
return result | |
def getServerResponses(self): | |
""" | |
Return the responses. | |
""" | |
directory_response = ResponseDefinition( | |
url='/directory', | |
response_content='to-be-updated', | |
content_type='application/json', | |
) | |
nonce_response = ResponseDefinition( | |
method='HEAD', | |
url='/acme/new-nonce', | |
response_headers=[('Replay-Nonce', 'test1111')], | |
) | |
# This uses the default key. | |
registration_response = ResponseDefinition( | |
method='POST', | |
url='/acme/new-account', | |
request=ResponseDefinition.ANY, | |
response_code=http.CREATED, | |
response_content=( | |
'{"status": "valid", "key": {"e": "AQAB", "kty": "RSA", "n": ' | |
'"zLUJYbSpjSAOSpxfns_w111mRls_FrHIC358fCxZsWzVXX_67uzM9TExAKtt' | |
'y3jrY1EV3C2-JcAIpwLTHhVHQL9ihqMu1Tp82fEoQtyqc68mGJFQP0vXE9I4P' | |
'OHGknpjH9vkHBzC-6V3FSFL3E6aUcfdqGVOquWiLgnE2PSpV-mZtXGceU2oP7' | |
'ERQAMpvT4ZPy2Pe9JcBn1KZrLSrzcdoBbY-q0yCsEemtxMXyVB9Y-0_yRCHAB' | |
'4f0z8ipncdWhiLQcgegCx0Bd6h4jqNSNBmoOIkzT7vMiQsJifmsJ6l6T4uUDN' | |
'q9lmiGK53pPGZIVyUQeO7QrXQSNBQ7qcWpxKhw"},' | |
'"agreement": "http://localhost/acme/ter"' | |
'}' | |
), | |
content_type='application/json', | |
response_headers=[ | |
('Replay-Nonce', 'test2222'), | |
('Link', '<http://localhost/acme/new-authz>;rel="next"'), | |
('Link', '<http://localhost/acme/recover-reg>;rel="recover"'), | |
('Link', '<http://localhost/acme/ter>;rel="terms-of-service"'), | |
], | |
) | |
order_response = ResponseDefinition( | |
method='POST', | |
url='/acme/new-order', | |
request=ResponseDefinition.ANY, | |
content_type='application/json', | |
response_code=http.CREATED, | |
response_content='{"bla": "la"}', | |
response_headers=[('Replay-Nonce', 'test3333')], | |
) | |
return [ | |
directory_response, | |
nonce_response, | |
registration_response, | |
order_response, | |
] | |
def updateResponse(self, directory_response, httpd): | |
""" | |
Update the response for directory to have addressed on the testing | |
server. | |
""" | |
template = json.dumps(self.DIRECTORY_BODY) | |
data = {'address': '%s:%s' % (httpd.ip, httpd.port)} | |
directory_response.updateResponseContent(template % data) | |
def test_init(self): | |
""" | |
It starts a site which has the ACME well-know folder available. | |
""" | |
resources = self.process.configuration.resources | |
configuration = resources.createProperty( | |
'', {'type': TYPE_NAME.LETS_ENCRYPT}) | |
sut = LetsEncryptClientResource( | |
parent=resources, | |
configuration=configuration, | |
) | |
# ACME test page. | |
request = mk.makeTwistedWebRequest( | |
[b'.well-known', 'acme-challenge', 'test.txt']) | |
deferred = sut._factory.getResourceFor(request) | |
resource = self.successResultOf(deferred) | |
result = resource.render(request) | |
self.assertEqual(b"Let's Encrypt Ready", result) | |
self.assertEqual(http.OK, request.code) | |
def test_start_no_acme_url(self): | |
""" | |
It fails to start when no acme_url is defined. | |
""" | |
sut = self.getResource(configuration={ | |
'acme_url': '', | |
}) | |
self.assertRunnableFailsToStart( | |
sut, | |
details='Missing URL to the Let\'s Encrypt Server Directory.') | |
def test_start_not_supported(self): | |
""" | |
It fails to start when on an OS which is not supported.. | |
""" | |
sut = self.getResource() | |
# On OS without support, the JWS is not imported. | |
sut._JWSClient = None | |
self.assertRunnableFailsToStart( | |
sut, | |
details='Let\'s Encrypt is not yet supported on this OS.') | |
def test_start_account_key_fail_load(self): | |
""" | |
It fails when the existing account key can't be loaded. | |
""" | |
sut = self.getResource( | |
account_key='invalid data', | |
) | |
if self.os_name == 'aix': | |
# On AIX we don't get all the details. | |
message = ( | |
'Failed to load the account key. ' | |
'Could not deserialize key data.' | |
) | |
else: | |
message = ( | |
'Failed to load the account key. ' | |
'Could not deserialize key data. ' | |
'The data may be in an incorrect format or ' | |
'it may be encrypted with an unsupported algorithm.' | |
) | |
self.assertRunnableFailsToStart( | |
sut, details=self.Contains(message)) | |
def test_start_fail_to_start_http_server(self): | |
""" | |
It fails when it can't start the HTTP server. | |
""" | |
sut = self.getResource(configuration={ | |
'address': '1.2.3.4', | |
'port': 43, | |
}) | |
# Put back the real reactor, | |
sut._scheduler = reactor | |
if self.TEST_LANGUAGE == 'FR': | |
details = self.Contains( | |
'Failed to start the local HTTP-01 challenge server. ' | |
'CannotListenError(') | |
else: | |
details = self.Contains( | |
'Failed to start the local HTTP-01 challenge server. ' | |
'Couldn\'t listen on 1.2.3.4:43: [Errno ') | |
self.assertRunnableFailsToStart(sut, details=details) | |
def test_start_account_key_not_defined(self): | |
""" | |
When no account key is defined, it will automatically create one and | |
save it. | |
In this test, client registration fails, but we only care that the | |
key is created and registration is attempted. | |
""" | |
with HTTPServerContext([]) as httpd: | |
self._check_start_account_key_not_defined(httpd) | |
def assertFactoryError(self, message): | |
""" | |
Check that when running the reactor the factory fails with `on_error` | |
hook. | |
""" | |
self.executeReactor() | |
self.clock.advance(1) | |
# The ACME url is validated. | |
self.assertEvent('40033', reason='HTTP connection made.') | |
self.assertEventUnordered('40032', reason='HTTP connection close.') | |
self.assertEvent( | |
'20158', | |
data={'details': message}, | |
reason='Resource failed to start.', | |
) | |
self.assertEvent('20157', reason='Resource stopped.') | |
def _check_start_account_key_not_defined(self, http_server): | |
""" | |
Low level implementation to execute while ACME server is up. | |
""" | |
stream = mk.setReWritableBytesIO(self.process.configuration) | |
acme_url = 'http://%s:%s/directory' % ( | |
http_server.ip, http_server.port) | |
sut = self.getResource( | |
configuration={'acme_url': acme_url}, | |
account_key='', | |
) | |
sut._KEY_SIZE = 512 | |
self.assertIsEmpty(sut.configuration._account_key) | |
sut.start() | |
# New account is updated and store. | |
self.assertIsNotEmpty(sut.configuration._account_key) | |
self.assertContains( | |
b'_account_key = -----BEGIN RSA PRIVATE KEY', | |
stream.content_at_close, | |
) | |
self.assertFactoryError( | |
self.Contains('Failed to discover the ACME server.')) | |
# Let the HTTP client to close. | |
self.executeReactor() | |
def test_start_acme_url_not_reachabe(self): | |
""" | |
It fails to start when the configured ACME url is not available. | |
""" | |
# Hope nobody is listening on this port. | |
sut = self.getResource( | |
configuration={'acme_url': 'http://127.0.0.1:4214'}, | |
) | |
deferred = sut.start() | |
self.getDeferredResult(deferred) | |
self.assertEvent( | |
'20158', | |
data={ | |
'details': self.Contains( | |
'Failed to discover the ACME server. ' | |
'Connection was refused') | |
}, | |
reason='Resource failed to start.', | |
) | |
self.assertEvent('20157', reason='Resource stopped.') | |
def test_start_acme_url_not_directory(self): | |
""" | |
It fails to start when the configured ACME url available but is not | |
an ACME directory. | |
""" | |
directory_response = ResponseDefinition( | |
url='/directory', | |
response_content='not-json', | |
) | |
with HTTPServerContext([directory_response]) as httpd: | |
acme_url = 'http://%s:%s/directory' % (httpd.ip, httpd.port) | |
sut = self.getResource( | |
configuration={'acme_url': acme_url}, | |
) | |
sut.start() | |
self.assertFactoryError( | |
'Failed to discover the ACME server. ' | |
"Unexpected response Content-Type: 'text/html'. " | |
"Expecting 'application/json'." | |
) | |
def test_start_acme_url_directory_bad_json(self): | |
""" | |
It fails to start when the configured ACME url available but is not | |
an ACME directory. | |
""" | |
directory_response = ResponseDefinition( | |
url='/directory', | |
response_content='not-json', | |
content_type='application/json', | |
) | |
with HTTPServerContext([directory_response]) as httpd: | |
acme_url = 'http://%s:%s/directory' % (httpd.ip, httpd.port) | |
sut = self.getResource( | |
configuration={'acme_url': acme_url}, | |
) | |
sut.start() | |
self.assertFactoryError( | |
'Failed to discover the ACME server. ' | |
'Missing JSON body.') | |
def test_start_acme_url_empty_directory(self): | |
""" | |
It fails to start when the configured ACME has a directory in JSON, | |
but not all fields are found. | |
""" | |
directory_response = ResponseDefinition( | |
url='/directory', | |
response_content='{}', | |
content_type='application/json', | |
) | |
with HTTPServerContext([directory_response]) as httpd: | |
acme_url = 'http://%s:%s/directory' % (httpd.ip, httpd.port) | |
sut = self.getResource( | |
configuration={'acme_url': acme_url}, | |
) | |
sut.start() | |
self.assertFactoryError( | |
'Failed to discover the ACME server. ' | |
'Directory has no newNonce URL') | |
def test_start_register_bad_response(self): | |
""" | |
It fails to start when the configured ACME has a valid directory | |
but client registration fails. | |
""" | |
responses = self.getServerResponses() | |
responses = responses[:3] | |
with HTTPServerContext(responses) as httpd: | |
acme_url = 'http://%s:%s/directory' % (httpd.ip, httpd.port) | |
sut = self.getResource( | |
configuration={'acme_url': acme_url}, | |
) | |
self.updateResponse(responses[0], httpd) | |
responses[2].response_headers = {} | |
sut.start() | |
self.assertFactoryError(self.Contains( | |
'Failed to register the ACME server. ' | |
'Server POST response did not include a replay nonce' | |
)) | |
def test_getCertificatePEMs_stoped_no_cache(self): | |
""" | |
Will return the placeholder cert and does nothing else when | |
the Let's Encrypt resource is stopped. | |
""" | |
sut = self.getResource() | |
result = sut.getCertificatePEMs('any.host.com') | |
self.assertEqual(_PEMS_PLACEHOLDER, result) | |
def test_getCertificatePEMs_stoped_with_cache(self): | |
""" | |
Will return the cached cert and does nothing else when | |
the Let's Encrypt resource is stopped. | |
""" | |
existing_storage = ( | |
'http://localhost.acme:12/directory\n' + | |
'|||localhost|||\n' + | |
SSL_DATA['SERVER_CERT_DATA'] | |
) | |
sut = self.getResource( | |
configuration={'acme_url': 'http://localhost.acme:12/directory'}, | |
raw_storage=existing_storage | |
) | |
result = sut.getCertificatePEMs('localhost') | |
self.assertEqual(SSL_DATA['SERVER_CERT_DATA'].strip(), result) | |
def assertCleanStop(self, sut): | |
""" | |
Check that the service is stopped without any errors and without | |
extra actions. | |
""" | |
# It stops clean. | |
deferred = sut.stop() | |
self.getDeferredResult(deferred) | |
# The connection is only closed in the end as a single connection | |
# is used for all the requests. | |
self.assertEvent('40032', reason='HTTP connection close.') | |
self.assertEvent('20157', reason='Resource stoped.') | |
def checkStartWithoutAnyCheck(self, sut): | |
""" | |
Check that starting the resource does not trigger any new | |
certificate generation. | |
""" | |
deferred = sut.start() | |
self.getResult(deferred) | |
# The ACME url is validated. | |
self.assertEvent('40033', reason='HTTP connection made.') | |
self.assertEvent('20156', reason='Resource started.') | |
# No check is running. | |
self.getResult(sut._acme_service._ongoing_check) | |
self.assertEventsQueueIsEmpty() | |
def test_getCertificatePEMs_started_no_cache_at_start(self): | |
""" | |
When the service exists then resource start but no cert is cached, | |
it will trigger a certificate and return a placeholder. | |
""" | |
responses = self.getServerResponses() | |
with HTTPServerContext(responses) as httpd: | |
acme_url = 'http://%s:%s/directory' % (httpd.ip, httpd.port) | |
sut = self.getResource( | |
configuration={'acme_url': acme_url}, | |
) | |
self.updateResponse(responses[0], httpd) | |
self.process.configuration.services.createProperty('', { | |
'type': TYPE_NAME.HTTPS, | |
'configuration/ssl_domains': ['any.host.com'], | |
}) | |
self.clock.advance(1) | |
deferred = sut.start() | |
self.getResult(deferred) | |
# The ACME url is validated. | |
self.assertEvent('40033', reason='HTTP connection made.') | |
self.assertEvent('20156', reason='Resource started.') | |
# Let the first check to try to get the configured cert. | |
self.getResult(sut._acme_service._ongoing_check) | |
self.assertEvent( | |
'20017', | |
data={ | |
'domains': 'any.host.com', | |
}, | |
reason='Failed to get cert from ACME.', | |
) | |
# Getting the certificate returns the placeholder. | |
result = sut.getCertificatePEMs('any.host.com') | |
self.assertEqual(_PEMS_PLACEHOLDER, result) | |
self.assertCleanStop(sut) | |
def test_getCertificatePEMs_started_no_cache_after_start(self): | |
""" | |
When the service is started and certificate for a new service | |
is requested, a placeholder is returned and a new certificate | |
creation is triggered. | |
""" | |
responses = self.getServerResponses() | |
with HTTPServerContext(responses) as httpd: | |
acme_url = 'http://%s:%s/directory' % (httpd.ip, httpd.port) | |
sut = self.getResource( | |
configuration={'acme_url': acme_url}, | |
) | |
self.updateResponse(responses[0], httpd) | |
self.checkStartWithoutAnyCheck(sut) | |
# A new service is created after the resource is started. | |
self.process.configuration.services.createProperty('', { | |
'type': TYPE_NAME.HTTPS, | |
'configuration/ssl_domains': ['any.host.com'], | |
}) | |
self.clock.advance(1) | |
# Getting the certificate returns the placeholder. | |
result = sut.getCertificatePEMs('any.host.com') | |
self.assertEqual(_PEMS_PLACEHOLDER, result) | |
self.getResult(sut._ongoing_new_cert) | |
self.assertEvent( | |
'20017', | |
data={ | |
'domains': 'any.host.com', | |
}, | |
reason='Failed to get cert from ACME.', | |
) | |
self.assertCleanStop(sut) | |
def test_getCertificatePEMs_started_with_cache(self): | |
""" | |
Will return the cached cert and does nothing else when | |
the Let's Encrypt resource is started, as the cert is ok. | |
""" | |
responses = self.getServerResponses() | |
with HTTPServerContext(responses) as httpd: | |
acme_url = 'http://%s:%s/directory' % (httpd.ip, httpd.port) | |
sut = self.getResource( | |
configuration={'acme_url': acme_url}, | |
) | |
self.updateResponse(responses[0], httpd) | |
# We have a service which is no SSL. | |
self.process.configuration.services.createProperty('', { | |
'type': TYPE_NAME.HTTP, | |
}) | |
# We have a service which is SSL, but has no let's encrypt. | |
self.process.configuration.services.createProperty('', { | |
'type': TYPE_NAME.HTTPS, | |
}) | |
self.checkStartWithoutAnyCheck(sut) | |
result = sut.getCertificatePEMs('localhost') | |
self.assertEqual(SSL_DATA['SERVER_CERT_DATA'].strip(), result) | |
self.assertCleanStop(sut) | |
def test_start_register_ok(self): | |
""" | |
Once registration succeeds and no certs need to be updated nothing is | |
done. | |
A new check is done for the next day. | |
""" | |
responses = self.getServerResponses() | |
with HTTPServerContext(responses) as httpd: | |
acme_url = 'http://%s:%s/directory' % (httpd.ip, httpd.port) | |
sut = self.getResource( | |
configuration={'acme_url': acme_url}, | |
) | |
self.updateResponse(responses[0], httpd) | |
self.checkStartWithoutAnyCheck(sut) | |
# Reset the certificates for localhost to trigger a cert update | |
# on next check, without saving the file. | |
sut._acme_service.cert_store._storage['localhost'] = [] | |
self.clock.advance( | |
sut._acme_service.check_interval.total_seconds() + 1) | |
self.getResult(sut._acme_service._ongoing_check) | |
self.assertEvent( | |
'20017', | |
data={ | |
'domains': 'localhost', | |
}, | |
reason='Failed to get cert from ACME.', | |
) | |
self.assertCleanStop(sut) | |
def test_start_new_acme_url(self): | |
""" | |
If the existing configuration has a different ACME URL, | |
the cache is invalidated and a new certificate check is triggered. | |
""" | |
# The cache has a valid cert for localhost, but is marked | |
# as being issued by http://other.acme.com/directory | |
existing_storage = ( | |
'http://other.acme.com/directory\n' + | |
'|||localhost|||\n' + | |
SSL_DATA['SERVER_CERT_DATA'] | |
) | |
responses = self.getServerResponses() | |
with HTTPServerContext(responses) as httpd: | |
acme_url = 'http://%s:%s/directory' % (httpd.ip, httpd.port) | |
sut = self.getResource( | |
configuration={'acme_url': acme_url}, | |
raw_storage=existing_storage | |
) | |
self.updateResponse(responses[0], httpd) | |
deferred = sut.start() | |
self.getResult(deferred) | |
# The ACME url is validated. | |
self.assertEvent('40033', reason='HTTP connection made.') | |
self.assertEvent('20156', reason='Resource started.') | |
# A new certificate check is scheduled without waiting for next | |
# day. | |
# The renewal fails, but we only want to make sure that the | |
# check is running. | |
self.getResult(sut._acme_service._ongoing_check) | |
self.assertEvent( | |
'20017', | |
data={ | |
'domains': 'localhost', | |
}, | |
reason='Failed to get cert from ACME.', | |
) | |
self.assertCleanStop(sut) | |
@attr('slow') | |
def test_start_functional_staging_ok(self): | |
""" | |
It can start with the staging public Let's encrypt server and | |
provides an HTTP server for ACME HTTP-01 challenge. | |
""" | |
acme_url = 'https://acme-staging-v02.api.letsencrypt.org/directory' | |
sut = self.getResource( | |
configuration={ | |
'acme_url': acme_url, | |
'port': 3245, | |
}, | |
fake_reactor=False, | |
) | |
deferred = sut.start() | |
# This will hit the internet...so it can be slow. | |
self.getResult(deferred, timeout=10) | |
# The ACME url is validated. | |
self.assertEvent('40033', reason='HTTP connection made.') | |
self.assertEvent('20156', reason='Resource started.') | |
agent = Agent(reactor) | |
d = agent.request( | |
b'GET', | |
b'http://127.0.0.1:3245/.well-known/acme-challenge/test.txt', | |
None) | |
result = self.getResult(d) | |
self.assertEqual(200, result.code) | |
self.assertEqual([b'Let\'s Encrypt Ready'], result._bodyBuffer) | |
self.assertCleanStop(sut) | |
# Wait for HTTPS SSL to close. | |
self.executeReactor(timeout=2) | |
@attr('slow') | |
def test_start_functional_prod_v1_to_v2(self): | |
""" | |
It can start with a V1 PROD directory and and will automatically | |
upgrade to v2 URL. | |
""" | |
if not self.os_version.startswith('ubuntu'): | |
raise self.skipTest('Reduce usage of ACME PROD server.') | |
acme_url = 'https://ACME-V1.api.letsencrypt.org/directory' | |
sut = self.getResource( | |
configuration={ | |
'acme_url': acme_url, | |
'contact_email': '[email protected]', | |
'port': 3245, | |
}, | |
fake_reactor=False, | |
) | |
deferred = sut.start() | |
# This will hit the Internet...so it can be slow. | |
self.getResult(deferred, timeout=10) | |
# The ACME url is validated. | |
self.assertEvent( | |
'40033', | |
# FIXME:5626: | |
# See how to preserve the 'acme-v02.api.letsencrypt.org' hostname. | |
# Or have both hostname and actual IP | |
data={'hostname': '172.65.32.248'}, | |
reason='HTTP connection made.', | |
) | |
self.assertEvent('20156', reason='Resource started.') | |
self.assertCleanStop(sut) | |
# Wait for HTTPS SSL to close. | |
self.executeReactor() | |
def test_update_service_not_started(self): | |
""" | |
When a new certificate is produced for a service and the service | |
is stopped, it will just emit an event that the new certificate was | |
create and will not start the service. | |
""" | |
responses = self.getServerResponses() | |
with HTTPServerContext(responses) as httpd: | |
acme_url = 'http://%s:%s/directory' % (httpd.ip, httpd.port) | |
sut = self.getResource( | |
configuration={'acme_url': acme_url}, | |
) | |
self.updateResponse(responses[0], httpd) | |
self.checkStartWithoutAnyCheck(sut) | |
self.process.configuration.services.createProperty('', { | |
'type': TYPE_NAME.HTTPS, | |
'configuration/ssl_domains': ['other.domain.com'], | |
}) | |
self.clock.advance(1) | |
sut._onUpdate('localhost', []) | |
self.assertEvent( | |
'20016', | |
data={ | |
'domains': 'localhost', | |
}, | |
reason='New certificate generated.', | |
) | |
self.assertCleanStop(sut) | |
def test_update_service_started(self): | |
""" | |
When a new certificate is produced for a service and the service | |
is started, it will emit an event that the new certificate was | |
create and will restart the service. | |
""" | |
responses = self.getServerResponses() | |
with HTTPServerContext(responses) as httpd: | |
acme_url = 'http://%s:%s/directory' % (httpd.ip, httpd.port) | |
sut = self.getResource( | |
configuration={'acme_url': acme_url}, | |
) | |
self.updateResponse(responses[0], httpd) | |
self.checkStartWithoutAnyCheck(sut) | |
# Create a service which is actually running. | |
# We create after the let's encrypt resource is start to not | |
# trigger a new certificate generation when the resource starts. | |
new_config = self.process.configuration.services.createProperty( | |
'', | |
{ | |
'type': TYPE_NAME.HTTPS, | |
'port': mk.portNumber(), | |
'name': 'new-https-service', | |
'configuration/ssl_domains': ['new.domain'], | |
} | |
) | |
self.clock.advance(1) | |
new_service = self.process.services.getSection(new_config.uuid) | |
deferred = new_service.start() | |
self.getResult(deferred) | |
self.assertEvent('20156', reason='Service started.') | |
self.assertEventsQueueIsEmpty() | |
sut._onUpdate('new.domain', []) | |
self.assertEvent( | |
'20016', | |
data={ | |
'domains': 'new.domain', | |
}, | |
reason='New certificate generated.', | |
) | |
self.iterateReactor() | |
self.assertEvent('20157', reason='Service stopped.') | |
self.assertEvent('20156', reason='Service started.') | |
self.getResult(new_service.stop()) | |
self.assertEvent('20157', reason='Service stopped.') | |
self.assertCleanStop(sut) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment