Skip to content

Instantly share code, notes, and snippets.

@ParadoxGuitarist
Created October 28, 2021 13:44
Show Gist options
  • Save ParadoxGuitarist/452e3be59556aa782be36f6dfa1b7deb to your computer and use it in GitHub Desktop.
Save ParadoxGuitarist/452e3be59556aa782be36f6dfa1b7deb to your computer and use it in GitHub Desktop.
Python Script to make cert-manager.io certificate objects
#!/bin/python3
import os
import json
import argparse
import logging
import subprocess
import yaml
import time
# Set us up for using runtime arguments by defining them.
parser = argparse.ArgumentParser(description='Creates or Recreates certifiates ane related secrets on routes. Requries oc or kubectl and to have the cert-manager operator to be installed in your cluster.')
parser.add_argument('-o', '--oc', dest='command', action='store_const', const='oc', help='Use the oc command for interacting with the cluster.')
parser.add_argument('-k', '--kubectl', dest='command', action='store_const', const='kubectl', help='Use the kubectl command for interacting with the cluster.')
parser.add_argument('-v', '--verbose', dest='logging', action='store_const', const='INFO', help='Enable INFO messages in the logging.')
parser.add_argument('-d', '--debug', dest='logging', action='store_const', const='DEBUG', help='Enable DEBUG Messages in the logging')
runtimeargs = parser.parse_args()
command = runtimeargs.command
#FUNCTIONAL SECTION. heh.
def checkforsecret(route):
try:
if route['metadata']['annotations']['cert-utils-operator.redhat-cop.io/certs-from-secret']:
return True
else:
return False
except:
return False
def clearcerts(route):
logging.debug("Confirming that we're in the right namespace so we can patch the route: {}".format(route['metadata']['namespace']))
try:
subprocess.run([command, 'project', route['metadata']['namespace']], stdout=subprocess.DEVNULL)
logging.info("Clearing the certs for {}".format(route['metadata']['name']))
subprocess.run([command, 'patch', 'route', route['metadata']['name'], '''-p {"spec": {"tls": {"certificate": "", "key": ""}}}' '''], stdout=subprocess.DEVNULL)
except:
logging.error("We couldn't update or switch to the propper namespace. {} ".format(route))
logging.error("Aborting.")
exit(1)
def checkforcert(route):
try:
logging.info("Looking to see if there's already a valid cert.")
logging.debug("Getting the secret object.")
secretprocess = subprocess.run([command, 'get', 'secret', route['metadata']['annotations']['cert-utils-operator.redhat-cop.io/certs-from-secret'], '--namespace={}'.format(route['metadata']['namespace']), '--output=json' ], stdout=subprocess.PIPE, stderr=subprocess.PIPE )
secret = json.loads(secretprocess.stdout)
logging.debug("Got a json object from the secret: {}".format(secret))
certname = secret['metadata']['annotations']['cert-manager.io/certificate-name']
logging.debug("Got a certname from the secret: {}".format(certname))
logging.debug("Checking for the certificate object in the namespace")
certprocess = subprocess.run([command, 'get', 'cert', certname, "--namespace={}".format(route['metadata']['namespace']), '--output=json'], stdout=subprocess.PIPE, stderr=subprocess.PIPE )
if certprocess.returncode == 0:
logging.debug('There iss a cert but we need to check that it is not failing.')
certobj = json.loads(certprocess.stdout)
if certobj['status']['conditions'][0]['status'] == 'Failed' :
logging.debug("Confirming that we're in the right namespace so we can patch the route: {}".format(route['metadata']['namespace']))
try:
subprocess.run([command, 'project', route['metadata']['namespace']], stdout=subprocess.DEVNULL)
logging.info("Deleting failed cert: {}".format(cert['metadata']['name']))
subprocess.run([command, 'delete', 'certificate', cert['metadata']['name'] ], stdout=subprocess.DEVNULL)
return False
except:
logging.error("We couldn't remove the cert or switch to the propper namespace. {} ".format(route))
logging.error("Aborting.")
exit(1)
logging.debug("The cert has not failed!")
return True
else:
logging.info("Couldn't get the certificate object. (Probably because it doesn't exist.)")
return False
except:
logging.info("Couldn't get the certificate object. (Probably because it doesn't exist.)")
return False
def createcert(route):
logging.debug("Creating a cert for a route")
hostname = route['spec']['host']
routename = route['metadata']['name']
certname = routename + "-cert"
certyml = certname + ".yml"
issuer = route['metadata']['annotations']['certmanager.io/cluster-issuer']
namespace = route['metadata']['namespace']
routesecretname = route['metadata']['annotations']['cert-utils-operator.redhat-cop.io/certs-from-secret']
logging.info("Creating an object to write generate a yaml file")
try:
data = dict(
apiVersion = 'cert-manager.io/v1',
kind = 'Certificate',
metadata = dict(
name = certname,
namespace = namespace,
),
spec = dict(
secretName = routesecretname,
issuerRef = dict(
name = issuer,
kind = 'ClusterIssuer',
),
commonName = hostname,
dnsNames = [
'{}'.format(hostname)
]
)
)
with open(certyml, 'w') as outfile:
logging.debug("Writing the file: {}".format(outfile))
yaml.dump(data, outfile, default_flow_style=False)
logging.debug('Creating the cert with the file: {}'.format(certyml))
subprocess.run([command, 'create', '-f={}'.format(certyml)], stdout=subprocess.DEVNULL)
return True
except:
logging.error("Unable to create yaml for certificate. Exiting.")
exit(1)
# Notify users they're going to get a wall of text in verbose mode.
if runtimeargs.logging:
logging.basicConfig(level=runtimeargs.logging)
logging.info("Log level set to {}".format(runtimeargs.logging))
logging.info("Finding if oci connector was specified:")
if command is None:
logging.error("You must specify either --oc or --kubectl when running this program.")
exit(1)
logging.info("Checking that {} is installed".format(command))
try:
subprocess.run(['which', command], stdout=subprocess.DEVNULL)
except:
logging.error("System could not locate {}".format(command))
exit(1)
logging.info("Checking that you're logged into a cluster.")
clusterstatus = subprocess.run([command, "status"], stdout=subprocess.DEVNULL)
if clusterstatus.returncode != 0:
logging.error("You are not connected to a cluster. Please run `{} login` before running this program")
logging.info("Connected to a cluster")
# Get Route Objects
logging.info("Getting Route objects")
routesprocess = subprocess.run([command, 'get', 'routes', '--output=json', '--all-namespaces'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
routes = json.loads(routesprocess.stdout)
for route in routes['items']:
time.sleep(3)
logging.info("Processing route: {}\{}".format(route['metadata']['namespace'], route['metadata']['name']))
# Read the annotations from each route.
if checkforsecret(route):
logging.info("Found secret: {}".format(route['metadata']['annotations']['cert-utils-operator.redhat-cop.io/certs-from-secret']))
# check to see if a cert already exists for the secret.
if checkforcert(route):
logging.debug("A cert exists, skipping")
continue
# Delete the secret in the annotation.
logging.debug("Removing secret: {}".format(route['metadata']['annotations']['cert-utils-operator.redhat-cop.io/certs-from-secret']))
subprocess.run([command, "delete", "secret", route['metadata']['annotations']['cert-utils-operator.redhat-cop.io/certs-from-secret'], "--namespace={}".format(route['metadata']['namespace'])], stdout=subprocess.DEVNULL)
# Delete they certs out of the route objects.
clearcerts(route)
# Create new certificate objects.
logging.debug('Sleeping for 4 seconds so cert-manager will pick up the workload properly.')
time.sleep(4)
createcert(route)
@ParadoxGuitarist
Copy link
Author

This dumps .yaml files into the working directory so you can see the objects you created. You can also run it multiple times.

It looks for certs based off the route annotations which hold a secret with the tls keys. If the certs don't exist, it recreates a cert object pointing to the same secret name. Licensed with MIT (I'll get it into the repo later)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment