Skip to content

Instantly share code, notes, and snippets.

@alzabo
Created December 6, 2021 22:32
Show Gist options
  • Save alzabo/82ab38f93560a0d94b3b406b76c2dbb5 to your computer and use it in GitHub Desktop.
Save alzabo/82ab38f93560a0d94b3b406b76c2dbb5 to your computer and use it in GitHub Desktop.
from datetime import (
datetime,
timedelta,
timezone,
)
from typing import (
Generator,
Optional,
)
import boto3
from botocore.config import Config
from checks import AgentCheck
class CertificateCheckError(ValueError):
pass
class CertificateExpiryComparisonError(CertificateCheckError):
pass
class Certificate(object):
def __init__(self, cert_arn: str, client: boto3.session.Session):
self.client = client
self.arn = cert_arn
self.cert_data = self.client.describe_certificate(CertificateArn=cert_arn)[
"Certificate"
]
self.domain_name = self.cert_data["DomainName"]
self.san_names = self.cert_data["SubjectAlternativeNames"]
self.used_by = self.cert_data["InUseBy"]
self.type = self.cert_data["Type"]
self.status = self.cert_data["Status"]
not_after = self.cert_data.get("NotAfter")
if not_after:
self.not_after = not_after.replace(tzinfo=timezone.utc)
self.tags = self.client.list_tags_for_certificate(CertificateArn=self.arn).get(
"Tags", []
)
def __repr__(self) -> str:
return (
f"arn={self.arn}; status={self.status}; not_after={self.not_after}; "
f"type={self.type}; domain_name={self.domain_name}; "
f"san_names={self.san_names}; used_by={self.used_by}; "
f"tags={self.tags}"
)
@property
def datadog_tags(self):
datadog_tags = [f"arn:{self.arn}", f"domain_name:{self.domain_name}"]
if self.san_names:
alt_names = ",".join(self.san_names)
datadog_tags.append(f"subject_alt_names:{alt_names}")
return datadog_tags
def expiry_countdown(self) -> timedelta:
try:
return self.not_after - datetime.now(timezone.utc)
except TypeError:
raise (
CertificateExpiryComparisonError,
f"Calculation of expiry timedelta failed for {self}",
)
@property
def days_until_expiry(self) -> int:
self.expiry_countdown().days
def get_certs(
regions: Optional[list] = None, list_cert_filters: Optional[dict] = None
) -> Generator[Certificate, None, None]:
if regions is None:
regions = ["us-east-1"]
if list_cert_filters is None:
list_cert_filters = {
"CertificateStatuses": ["ISSUED", "EXPIRED"],
"Includes": {"keyTypes": ["RSA_1024", "RSA_2048", "RSA_3072", "RSA_4096"]},
}
for region in regions:
client = boto3.client("acm", config=Config(region_name=region))
certs = (
client.get_paginator("list_certificates")
.paginate(**list_cert_filters)
.build_full_result()
.get("CertificateSummaryList", [])
)
for cert in certs:
yield Certificate(cert["CertificateArn"], client=client)
class ACMCheckExpireDays(AgentCheck):
def check(self, instance):
for cert in get_certs():
try:
days_left = cert.days_until_expiry
except CertificateCheckError:
print(
f"""Certificate {cert.arn} could not be tested for expiry;
{cert.cert_data}"""
)
pass
else:
self.gauge("ssl.expire_in_days", days_left, cert.datadog_tags)
if __name__ == "__main__":
for cert in get_certs():
print(repr(cert))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment