Created
May 10, 2017 01:20
-
-
Save cnelson/f6f46aee04e84e5b9cc2b0fef62dfdfa to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import datetime | |
import json | |
import os | |
import subprocess | |
import yaml | |
import marshmallow as ma | |
import requests | |
from OpenSSL import crypto | |
class Config(ma.Schema): | |
"""Configuratin Options: | |
SLACK_URL: An incoming webhook url | |
SLACK_CHANNEL: The channel to send the report to | |
SLACK_USERNAME: The username to post as (optional, defaults to `cert-expire-check`) | |
SLACK_ICON_EMOJI: The emoji to use as an icon (option, defaults to `:python:`) | |
DAYS_WARN: If a cert expires in <= this number of days a warning will be raised | |
(optional, defaults to 30) | |
DAYS_ERROR: If a cert expires in <= this number of days an error will be raised | |
(optional, defaults to 7) | |
""" | |
slack_url = ma.fields.Str(load_from='SLACK_URL', required=True) | |
slack_channel = ma.fields.Str(load_from='SLACK_CHANNEL', required=True) | |
slack_username = ma.fields.Str(load_from='SLACK_USERNAME', missing='cert-expire-check') | |
slack_icon_emoji = ma.fields.Str(load_from='SLACK_USERNAME', missing=':python:') | |
days_warn = ma.fields.Int(load_from='DAYS_WARN', missing=30) | |
days_error = ma.fields.Int(load_from='DAYS_ERROR', missing=7) | |
def bosh_cli(*args, _bosh_cli="bosh-cli"): | |
"""Run a command with the bosh v2 cli | |
Args: | |
*args(str): The arguments to bash to bosh; '--json' will be prepended to this list | |
_bosh_cli: The path to the bosh v2 cli (optional) | |
Returns: | |
dict: The json output of the commaned parsed by json.loads() | |
Raises: | |
RuntimeError: There was a problem running the bosh command | |
ValueError: There was a problem parsing the bosh output | |
""" | |
returncode = 0 | |
command = [_bosh_cli, '--json'] + list(args) | |
try: | |
output = subprocess.check_output(command, stderr=subprocess.STDOUT) | |
except subprocess.CalledProcessError as exc: | |
output = exc.output | |
returncode = exc.returncode | |
try: | |
response = json.loads(output.decode('utf-8')) | |
except json.decoder.JSONDecodeError as exc: | |
raise ValueError("Could not parse output from `{0}`: {1}; Expected JSON, got: {2}".format( | |
command, | |
exc, | |
output | |
)) | |
if returncode > 0: | |
raise RuntimeError("Error when running {0}: {1}".format( | |
command, | |
"\n".join(response['Lines']) | |
)) | |
return response | |
def get_bosh_deployments(): | |
"""Returns a list of deployments on the bosh director | |
Yields: | |
str: The names of a deployment | |
Raises: | |
See bosh_cli | |
""" | |
response = bosh_cli('deployments') | |
for table in response['Tables']: | |
for row in table['Rows']: | |
yield row['name'] | |
def get_bosh_manifest(deployment): | |
"""Returns the manifest for a given deployment | |
Args: | |
deployment(str): The name of the deployment | |
Returns: | |
dict: The manifest parsed by yaml.load() | |
Raises: | |
See bosh_cli | |
""" | |
response = bosh_cli('-d', deployment, 'manifest') | |
return yaml.load(response['Blocks'][0]) | |
def dict_generator(indict, pre=None): | |
"""Flatten a dict into a list of properties | |
Based on http://stackoverflow.com/questions/12507206/python-recommended-way-to-walk-complex-dictionary-structures-imported-from-json | |
""" | |
pre = pre[:] if pre else [] | |
if isinstance(indict, dict): | |
for key, value in indict.items(): | |
if isinstance(value, dict): | |
for d in dict_generator(value, pre + [key]): | |
yield d | |
elif isinstance(value, list) or isinstance(value, tuple): | |
for v in value: | |
for d in dict_generator(v, pre + [key]): | |
yield d | |
else: | |
yield pre + [key, value] | |
else: | |
yield pre + [indict] | |
def find_certificates(manifest): | |
"""Return any PEM encoded certificates in a manifest | |
Args: | |
manifest(dict): A bosh manifest loaded by yaml.load() | |
Yields: | |
tuple: (path.to.property, certificate) | |
""" | |
for item in dict_generator(manifest): | |
value = item.pop() | |
key = ".".join(item) | |
if isinstance(value, str) and value.strip().startswith('-----BEGIN CERTIFICATE-----'): | |
cert = crypto.load_certificate(crypto.FILETYPE_PEM, value) | |
not_after = datetime.datetime.strptime( | |
cert.get_notAfter().decode('utf-8'), | |
'%Y%m%d%H%M%SZ' | |
) | |
yield (key, not_after) | |
def make_attachment(deployment, prop, expires, color): | |
"""Make a slack attachment for a cert warning/error | |
Args: | |
deployment(str): The name of the deployment containing `prop` | |
prop(str): The property containing the certificate | |
expires(int): The number of days until the cert expires (negative numbers if already expired) | |
color: The color to use for the slack attachment | |
Returns: | |
dict: The formatted slack attachment | |
""" | |
if expires < 0: | |
status = "Expired!" | |
elif expires == 0: | |
status = "Expires today!" | |
elif expires == 1: | |
status = "Expires tomorrow!" | |
elif expires > 1: | |
status = "Expires in {0} days.".format(expires) | |
attachment = { | |
"color": color, | |
"mrkdwn_in": ["text"], | |
"text": "*{deployment}* `{property}`\n{status}".format( | |
deployment=deployment, | |
property=prop, | |
status=status | |
) | |
} | |
return attachment | |
if __name__ == "__main__": | |
# load the config from the environment | |
config = Config(strict=True).load(os.environ).data | |
attachments = [] | |
# find certs in all deployments on the director | |
for name in get_bosh_deployments(): | |
for cert in find_certificates(get_bosh_manifest(name)): | |
expires = (cert[1]-datetime.datetime.utcnow()).days | |
# if it's a problem, stash it | |
if expires <= config['days_error']: | |
attachments.append(make_attachment(name, cert[0], expires, 'danger')) | |
elif expires <= config['days_warn']: | |
attachments.append(make_attachment(name, cert[0], expires, 'warning')) | |
# if we have something to say, say it | |
if attachments: | |
requests.post( | |
config['slack_url'], | |
json={ | |
'username': config['slack_username'], | |
'channel': config['slack_channel'], | |
'icon_emoji': config['slack_icon_emoji'], | |
'text': 'Certificate report:', | |
'attachments': attachments | |
}, | |
).raise_for_status() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment