|
#!/usr/bin/env python |
|
import argparse |
|
import ConfigParser |
|
import sys |
|
import os |
|
import logging |
|
import time |
|
|
|
|
|
from collections import namedtuple |
|
from datetime import date, datetime |
|
from dateutil.parser import parse |
|
|
|
|
|
try: |
|
import boto3 |
|
import botocore.session |
|
except: |
|
logging.error("Could not import \'boto3\' library. Please install.") |
|
sys.exit(3) |
|
|
|
try: |
|
from tabulate import tabulate |
|
except: |
|
logging.error("Could not import \'tabulate\' library. Please install.") |
|
sys.exit(3) |
|
|
|
|
|
ELB = namedtuple('ELB', [ |
|
'stack', |
|
'name', |
|
'has_ssl', |
|
'has_acm', |
|
'expiry_date', |
|
'days_until_expiry' |
|
] |
|
) |
|
|
|
ACM = namedtuple('ACM', [ |
|
'domain', |
|
'arn', |
|
'cloudFormationArn' |
|
] |
|
) |
|
|
|
|
|
def get_elb_listener_summary(name, stack, listener_descriptions): |
|
def listens_on_ssl(l): |
|
return l["Listener"]["Protocol"] == "HTTPS" |
|
|
|
def is_using_acm(l): |
|
return ":certificate/" in l["Listener"].get("SSLCertificateId", "") |
|
|
|
def cert_expiry(l): |
|
try: |
|
now = datetime.now() |
|
exp = l["Listener"].get( |
|
"SSLCertificateId", |
|
"").split("-exp")[-1][0:10] |
|
exp_date = parse(exp) |
|
return exp, (exp_date-now).days |
|
except: |
|
return None, None |
|
|
|
ssl = True in [listens_on_ssl(l) for l in listener_descriptions] |
|
acm = True in [is_using_acm(l) for l in listener_descriptions] |
|
expiry_date, days_until_expiry = cert_expiry(l) |
|
|
|
return ELB(stack, name, ssl, acm, expiry_date, days_until_expiry) |
|
|
|
|
|
def list_elbs(elb_client): |
|
elb_summaries = [] |
|
try: |
|
elbs = elb_client.describe_load_balancers()['LoadBalancerDescriptions'] |
|
except Exception as e: |
|
logging.error("Could not list ELBs: %s" % e) |
|
for elb in elbs: |
|
elb_name = elb["LoadBalancerName"] |
|
try: |
|
tags = elb_client.describe_tags(LoadBalancerNames=[elb_name]) |
|
stack = [t["Value"] for t in tags["TagDescriptions"][0]["Tags"] if t["Key"] == "aws:cloudformation:stack-name"][0] |
|
except Exception as e: |
|
logging.error("Could not get tags for ELB %s: %s" % (elb_name, e)) |
|
stack = None |
|
|
|
elb_summaries.append( |
|
get_elb_listener_summary( |
|
elb_name, |
|
stack, |
|
elb["ListenerDescriptions"] |
|
) |
|
) |
|
return [elb for elb in elb_summaries if elb.has_ssl] |
|
|
|
|
|
def print_tabulated_elbs(elbs): |
|
def sorter(elb): |
|
if not elb.days_until_expiry: |
|
return 10000 |
|
return elb.days_until_expiry |
|
|
|
elbs.sort(key=sorter) |
|
|
|
table = [ |
|
[ |
|
elb.stack, |
|
elb.name, |
|
str(elb.has_acm), |
|
elb.expiry_date or "-"] |
|
for elb in elbs |
|
] |
|
|
|
print tabulate(table, headers=["Stack", "ELB Name", "Uses ACM?", "Expiry"]) |
|
|
|
|
|
def list_acm_certificates(acm_client): |
|
def quote_string(s): |
|
return '"%s"' % s |
|
|
|
def cf_safe_arn(arn): |
|
arn_parts = arn.split(':') |
|
arn_prefix = quote_string(':'.join(arn_parts[0:4])) |
|
arn_suffix = arn_parts[5].split("/") |
|
arn_parts_quoted = [ |
|
arn_prefix, |
|
'{ "Ref": "AWS::AccountId" }', |
|
quote_string('/'.join(arn_suffix)) |
|
] |
|
return '{"Fn::Join" : [ ":", [%s] ] }' % ', '.join( |
|
arn_parts_quoted |
|
) |
|
|
|
acm_summaries = [] |
|
try: |
|
acms = acm_client.list_certificates( |
|
CertificateStatuses=['ISSUED'] |
|
) |
|
except Exception as e: |
|
logging.error("Could not list ACMs: %s" % e) |
|
for acm in acms["CertificateSummaryList"]: |
|
acm_summaries.append( |
|
ACM( |
|
acm["DomainName"], |
|
acm["CertificateArn"], |
|
cf_safe_arn(acm["CertificateArn"]) |
|
) |
|
) |
|
return acm_summaries |
|
|
|
|
|
def print_tabulated_acms(acms): |
|
table = [ |
|
[ |
|
acm.domain, |
|
acm.arn, |
|
acm.cloudFormationArn] |
|
for acm in acms |
|
] |
|
|
|
print tabulate(table, headers=["Domain name", "ARN", "CloudFormation Template Safe ARN"]) |
|
|
|
|
|
def setup_clients(profile_name, region_name): |
|
session = boto3.session.Session(profile_name=profile_name) |
|
elb_client = session.client('elb', region_name=region_name) |
|
acm_client = session.client('acm', region_name=region_name) |
|
return elb_client, acm_client |
|
|
|
|
|
def setup_logging(debug=False): |
|
root = logging.getLogger() |
|
ch = logging.StreamHandler(sys.stdout) |
|
formatter = logging.Formatter( |
|
'%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
|
ch.setFormatter(formatter) |
|
root.addHandler(ch) |
|
if debug: |
|
ch.setLevel(logging.DEBUG) |
|
root.setLevel(logging.DEBUG) |
|
else: |
|
ch.setLevel(logging.ERROR) |
|
root.setLevel(logging.ERROR) |
|
|
|
|
|
def parse_arguments(): |
|
"""Parse command line arguments""" |
|
args_parser = argparse.ArgumentParser() |
|
args_parser.add_argument( |
|
"--debug", |
|
help="Increase output verbosity", |
|
action="store_true") |
|
args_parser.add_argument( |
|
"--profile", |
|
required=False, |
|
default="ec2", |
|
help="AWS profile to use") |
|
args_parser.add_argument( |
|
"--region", |
|
required=False, |
|
default="eu-west-1", |
|
help="AWS region to use") |
|
args_parser.add_argument( |
|
"--list-certificates", |
|
required=False, |
|
action="store_true", |
|
help="List existing certificates") |
|
args = args_parser.parse_args() |
|
return args |
|
|
|
|
|
def main(): |
|
arguments = parse_arguments() |
|
setup_logging(arguments.debug) |
|
|
|
elb_client, acm_client = setup_clients( |
|
profile_name=arguments.profile, |
|
region_name=arguments.region |
|
) |
|
|
|
if not arguments.list_certificates: |
|
print_tabulated_elbs(list_elbs(elb_client)) |
|
else: |
|
print_tabulated_acms(list_acm_certificates(acm_client)) |
|
|
|
if __name__ == '__main__': |
|
main() |