Created
June 12, 2018 05:40
-
-
Save felixfontein/40d817d87a89548e4c227dea80424fa6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Can be used in an Ansible module to check whether a certificate is revoked using OCSP. | |
# Needs filenames of certificate and intermediate certificate. | |
import base64 | |
import os | |
import re | |
import tempfile | |
import traceback | |
from ansible.module_utils.basic import AnsibleModule | |
from ansible.module_utils._text import to_native | |
def _get_host_from_uri(uri): | |
"""Extract the hostname from an URI.""" | |
try: | |
from urllib.parse import urlparse | |
except ImportError: # Python 2 | |
from urlparse import urlparse | |
netloc = urlparse(uri).netloc | |
i = netloc.find(':') | |
return netloc[:i] if i >= 0 else netloc | |
def _run_openssl(module, args, acceptable_rcs=None): | |
"""Run OpenSSL command with given arguments.""" | |
if acceptable_rcs is None: | |
acceptable_rcs = [0] | |
openssl_bin = module.get_bin_path('openssl', True) | |
openssl_cmd = [openssl_bin] + args | |
rc, out, error = module.run_command(openssl_cmd, check_rc=True, encoding=None) | |
if rc not in acceptable_rcs: | |
msg = 'Command "{0}" returned the not acceptable return code {1}. Error output: {2}' | |
raise ModuleFailException(msg.format(' '.join(openssl_cmd), rc, error.decode('utf8'))) | |
return out.decode('utf8') | |
def is_revoked_ocsp(module, certificate, intermediate_certificate): | |
"""Check whether the given certificate is revoked by querying OCSP. | |
``certificate`` must be a path pointing to a certificate (PEM format). | |
``intermediate_certificate`` must be a path pointing to the intermediate | |
certificate, i.e. to the next certificate in the certificate chain. | |
""" | |
# Determine OCSP URL | |
ocsp_uri = _run_openssl(module, ['x509', '-in', certificate, '-noout', '-ocsp_uri']).strip() | |
if not ocsp_uri: | |
raise ModuleFailException('Cannot determine OCSP URI from certificate!') | |
# Determine OpenSSL version | |
version_string = _run_openssl(module, ['version']) | |
m = re.match(r'^OpenSSL (\d+)\.(\d+)\..*', version_string) | |
if not m: | |
raise ModuleFailException('Cannot identify OpenSSL version from version string "{0}"!'.format(version_string)) | |
openssl_version = (int(m[1]), int(m[2])) | |
# Get OCSP response. Note that we need to specify the Host header, | |
# but that the way to specify this depends on the OpenSSL version. | |
if openssl_version < (1, 1): | |
host_header = ['Host', _get_host_from_uri(ocsp_uri)] | |
else: | |
host_header = ['Host=' + _get_host_from_uri(ocsp_uri)] | |
# Compose arguments for OpenSSL | |
args = ['ocsp', '-no_nonce', '-header'] | |
args.extend(host_header) | |
args.extend(['-issuer', intermediate_certificate, '-cert', certificate, '-url', ocsp_uri, '-VAfile', intermediate_certificate]) | |
result = _run_openssl(module, args) | |
# Interpret result | |
m = re.match(r'^.*: ([a-zA-Z]+)(?:\n|\r|$)', result) | |
if not m: | |
m = re.match(r'^Responder Error: (.*)(?:\n|\r|$)', result) | |
if m: | |
raise ModuleFailException('OCSP responder error: {0}'.format(m[1])) | |
raise ModuleFailException('Cannot parse OpenSSL OCSP output: {0}'.format(result)) | |
return m[1] == 'revoked' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment