Created
October 28, 2021 13:44
-
-
Save ParadoxGuitarist/452e3be59556aa782be36f6dfa1b7deb to your computer and use it in GitHub Desktop.
Python Script to make cert-manager.io certificate objects
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/python3 | |
import os | |
import json | |
import argparse | |
import logging | |
import subprocess | |
import yaml | |
import time | |
# Set us up for using runtime arguments by defining them. | |
parser = argparse.ArgumentParser(description='Creates or Recreates certifiates ane related secrets on routes. Requries oc or kubectl and to have the cert-manager operator to be installed in your cluster.') | |
parser.add_argument('-o', '--oc', dest='command', action='store_const', const='oc', help='Use the oc command for interacting with the cluster.') | |
parser.add_argument('-k', '--kubectl', dest='command', action='store_const', const='kubectl', help='Use the kubectl command for interacting with the cluster.') | |
parser.add_argument('-v', '--verbose', dest='logging', action='store_const', const='INFO', help='Enable INFO messages in the logging.') | |
parser.add_argument('-d', '--debug', dest='logging', action='store_const', const='DEBUG', help='Enable DEBUG Messages in the logging') | |
runtimeargs = parser.parse_args() | |
command = runtimeargs.command | |
#FUNCTIONAL SECTION. heh. | |
def checkforsecret(route): | |
try: | |
if route['metadata']['annotations']['cert-utils-operator.redhat-cop.io/certs-from-secret']: | |
return True | |
else: | |
return False | |
except: | |
return False | |
def clearcerts(route): | |
logging.debug("Confirming that we're in the right namespace so we can patch the route: {}".format(route['metadata']['namespace'])) | |
try: | |
subprocess.run([command, 'project', route['metadata']['namespace']], stdout=subprocess.DEVNULL) | |
logging.info("Clearing the certs for {}".format(route['metadata']['name'])) | |
subprocess.run([command, 'patch', 'route', route['metadata']['name'], '''-p {"spec": {"tls": {"certificate": "", "key": ""}}}' '''], stdout=subprocess.DEVNULL) | |
except: | |
logging.error("We couldn't update or switch to the propper namespace. {} ".format(route)) | |
logging.error("Aborting.") | |
exit(1) | |
def checkforcert(route): | |
try: | |
logging.info("Looking to see if there's already a valid cert.") | |
logging.debug("Getting the secret object.") | |
secretprocess = subprocess.run([command, 'get', 'secret', route['metadata']['annotations']['cert-utils-operator.redhat-cop.io/certs-from-secret'], '--namespace={}'.format(route['metadata']['namespace']), '--output=json' ], stdout=subprocess.PIPE, stderr=subprocess.PIPE ) | |
secret = json.loads(secretprocess.stdout) | |
logging.debug("Got a json object from the secret: {}".format(secret)) | |
certname = secret['metadata']['annotations']['cert-manager.io/certificate-name'] | |
logging.debug("Got a certname from the secret: {}".format(certname)) | |
logging.debug("Checking for the certificate object in the namespace") | |
certprocess = subprocess.run([command, 'get', 'cert', certname, "--namespace={}".format(route['metadata']['namespace']), '--output=json'], stdout=subprocess.PIPE, stderr=subprocess.PIPE ) | |
if certprocess.returncode == 0: | |
logging.debug('There iss a cert but we need to check that it is not failing.') | |
certobj = json.loads(certprocess.stdout) | |
if certobj['status']['conditions'][0]['status'] == 'Failed' : | |
logging.debug("Confirming that we're in the right namespace so we can patch the route: {}".format(route['metadata']['namespace'])) | |
try: | |
subprocess.run([command, 'project', route['metadata']['namespace']], stdout=subprocess.DEVNULL) | |
logging.info("Deleting failed cert: {}".format(cert['metadata']['name'])) | |
subprocess.run([command, 'delete', 'certificate', cert['metadata']['name'] ], stdout=subprocess.DEVNULL) | |
return False | |
except: | |
logging.error("We couldn't remove the cert or switch to the propper namespace. {} ".format(route)) | |
logging.error("Aborting.") | |
exit(1) | |
logging.debug("The cert has not failed!") | |
return True | |
else: | |
logging.info("Couldn't get the certificate object. (Probably because it doesn't exist.)") | |
return False | |
except: | |
logging.info("Couldn't get the certificate object. (Probably because it doesn't exist.)") | |
return False | |
def createcert(route): | |
logging.debug("Creating a cert for a route") | |
hostname = route['spec']['host'] | |
routename = route['metadata']['name'] | |
certname = routename + "-cert" | |
certyml = certname + ".yml" | |
issuer = route['metadata']['annotations']['certmanager.io/cluster-issuer'] | |
namespace = route['metadata']['namespace'] | |
routesecretname = route['metadata']['annotations']['cert-utils-operator.redhat-cop.io/certs-from-secret'] | |
logging.info("Creating an object to write generate a yaml file") | |
try: | |
data = dict( | |
apiVersion = 'cert-manager.io/v1', | |
kind = 'Certificate', | |
metadata = dict( | |
name = certname, | |
namespace = namespace, | |
), | |
spec = dict( | |
secretName = routesecretname, | |
issuerRef = dict( | |
name = issuer, | |
kind = 'ClusterIssuer', | |
), | |
commonName = hostname, | |
dnsNames = [ | |
'{}'.format(hostname) | |
] | |
) | |
) | |
with open(certyml, 'w') as outfile: | |
logging.debug("Writing the file: {}".format(outfile)) | |
yaml.dump(data, outfile, default_flow_style=False) | |
logging.debug('Creating the cert with the file: {}'.format(certyml)) | |
subprocess.run([command, 'create', '-f={}'.format(certyml)], stdout=subprocess.DEVNULL) | |
return True | |
except: | |
logging.error("Unable to create yaml for certificate. Exiting.") | |
exit(1) | |
# Notify users they're going to get a wall of text in verbose mode. | |
if runtimeargs.logging: | |
logging.basicConfig(level=runtimeargs.logging) | |
logging.info("Log level set to {}".format(runtimeargs.logging)) | |
logging.info("Finding if oci connector was specified:") | |
if command is None: | |
logging.error("You must specify either --oc or --kubectl when running this program.") | |
exit(1) | |
logging.info("Checking that {} is installed".format(command)) | |
try: | |
subprocess.run(['which', command], stdout=subprocess.DEVNULL) | |
except: | |
logging.error("System could not locate {}".format(command)) | |
exit(1) | |
logging.info("Checking that you're logged into a cluster.") | |
clusterstatus = subprocess.run([command, "status"], stdout=subprocess.DEVNULL) | |
if clusterstatus.returncode != 0: | |
logging.error("You are not connected to a cluster. Please run `{} login` before running this program") | |
logging.info("Connected to a cluster") | |
# Get Route Objects | |
logging.info("Getting Route objects") | |
routesprocess = subprocess.run([command, 'get', 'routes', '--output=json', '--all-namespaces'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
routes = json.loads(routesprocess.stdout) | |
for route in routes['items']: | |
time.sleep(3) | |
logging.info("Processing route: {}\{}".format(route['metadata']['namespace'], route['metadata']['name'])) | |
# Read the annotations from each route. | |
if checkforsecret(route): | |
logging.info("Found secret: {}".format(route['metadata']['annotations']['cert-utils-operator.redhat-cop.io/certs-from-secret'])) | |
# check to see if a cert already exists for the secret. | |
if checkforcert(route): | |
logging.debug("A cert exists, skipping") | |
continue | |
# Delete the secret in the annotation. | |
logging.debug("Removing secret: {}".format(route['metadata']['annotations']['cert-utils-operator.redhat-cop.io/certs-from-secret'])) | |
subprocess.run([command, "delete", "secret", route['metadata']['annotations']['cert-utils-operator.redhat-cop.io/certs-from-secret'], "--namespace={}".format(route['metadata']['namespace'])], stdout=subprocess.DEVNULL) | |
# Delete they certs out of the route objects. | |
clearcerts(route) | |
# Create new certificate objects. | |
logging.debug('Sleeping for 4 seconds so cert-manager will pick up the workload properly.') | |
time.sleep(4) | |
createcert(route) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This dumps .yaml files into the working directory so you can see the objects you created. You can also run it multiple times.
It looks for certs based off the route annotations which hold a secret with the tls keys. If the certs don't exist, it recreates a cert object pointing to the same secret name. Licensed with MIT (I'll get it into the repo later)