Skip to content

Instantly share code, notes, and snippets.

@kincl
Last active March 4, 2025 15:37
Show Gist options
  • Save kincl/b47a961919e6a67ae6b6 to your computer and use it in GitHub Desktop.
Save kincl/b47a961919e6a67ae6b6 to your computer and use it in GitHub Desktop.
Uses puppet certificate API to list, sign, revoke certificates
#!/bin/env python#!/bin/env python
# https://github.com/puppetlabs/puppet/blob/stable/api/docs/http_certificate_status.md
import os
import sys
import json
import requests
import argparse
import subprocess
os.environ['REQUESTS_CA_BUNDLE'] = '/var/lib/puppet/ssl/certs/ca.pem'
# lets get the puppet server from configuration
puppet_server,_ = subprocess.Popen('puppet config print ca_server'.split(), stdout=subprocess.PIPE).communicate()
url = 'https://{0}:8140/environment'.format(puppet_server.rstrip())
hostname = os.environ['HOSTNAME']
certs = cert=('/var/lib/puppet/ssl/certs/{0}.pem'.format(hostname),
'/var/lib/puppet/ssl/private_keys/{0}.pem'.format(hostname))
help = '''Puppet Certificate Manager
Makes API calls to the Puppet CA Master, must have client certificate whitelisted on CA for API to work
Actions:
- list = List all certificates (puppet cert --list --all)
- sign = Sign a certificate (puppet cert --sign)
- revoke = Revoke a certificate, adds certificate to CRL (puppet cert --revoke)
- del = Delete a certificate, actually removes certificate files from system (puppet cert --clean)
'''
class APIError(Exception):
def __init__(self, response):
self.response = response
self.url = response.url
self.status_code = response.status_code
self.text = response.text
def main():
parser = argparse.ArgumentParser(description=help,formatter_class=argparse.RawDescriptionHelpFormatter)
subparsers = parser.add_subparsers(help='Action',dest='action')
parser_list = subparsers.add_parser('list', description='List all certs, like puppet cert list --all')
parser_get = subparsers.add_parser('get', description='Get a specific certificate')
parser_get.add_argument('name', help ="FQDN")
parser_set = subparsers.add_parser('set', description='Set desired state for a certificate')
parser_set.add_argument('name', help ="FQDN")
parser_set.add_argument('state', help="Desired state, either: signed, revoked")
parser_sign = subparsers.add_parser('sign', description='Sign a certificate')
parser_sign.add_argument('name', help ="FQDN")
parser_revoke = subparsers.add_parser('revoke', description='Revoke a certificate')
parser_revoke.add_argument('name', help ="FQDN")
parser_del = subparsers.add_parser('del', description='Delete a certificate')
parser_del.add_argument('name', help ="FQDN")
args = parser.parse_args()
if args.action == 'list':
cmd_getAll()
if args.action == 'get':
cmd_get(args.name)
if args.action == 'set':
cmd_set(args.name, args.state)
if args.action == 'sign':
cmd_set(args.name, 'signed')
if args.action == 'revoke':
cmd_set(args.name, 'revoked')
if args.action == 'del':
cmd_del(args.name)
def api_call(resource, http_action='get', data=None, success_code=requests.codes.ok):
req_func = getattr(requests, http_action)
if data:
r = req_func('{0}/{1}'.format(url, resource), cert=certs, data=data)
else:
r = req_func('{0}/{1}'.format(url, resource), cert=certs)
if r.status_code == success_code:
try:
return r.json()
except ValueError:
return "Success"
else:
raise APIError(r)
def cmd_del(name):
if name == hostname:
print "Cannot delete own certificate"
sys.exit(0)
req = api_call('certificate_status/{0}'.format(name), http_action='delete', success_code=204)
print req
def cmd_set(name, state):
body = json.dumps({'desired_state':state})
req = api_call('certificate_status/{0}'.format(name), http_action='put', data=body, success_code=204)
print req
def cmd_get(name):
req = api_call('certificate_status/{0}'.format(name))
for k in ['name', 'state', 'fingerprint', 'dns_alt_names']:
print "{0}: {1}".format(k,req[k])
def cmd_getAll():
req = api_call('certificate_statuses/key')
max_len = 1
for node in req:
if len(node['name']) > max_len:
max_len = len(node['name'])
print "{0:{width}} {1:15}".format('Node', 'State', width=max_len)
for node in sorted(req, key=lambda n: n['name']):
print "{0:{width}} {1:15}".format(node['name'], node['state'], width=max_len)
if __name__ == '__main__':
try:
main()
except APIError as e:
print e.text
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment