Last active
March 4, 2025 15:37
-
-
Save kincl/b47a961919e6a67ae6b6 to your computer and use it in GitHub Desktop.
Uses puppet certificate API to list, sign, revoke certificates
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/env python#!/bin/env python | |
# https://github.com/puppetlabs/puppet/blob/stable/api/docs/http_certificate_status.md | |
import os | |
import sys | |
import json | |
import requests | |
import argparse | |
import subprocess | |
os.environ['REQUESTS_CA_BUNDLE'] = '/var/lib/puppet/ssl/certs/ca.pem' | |
# lets get the puppet server from configuration | |
puppet_server,_ = subprocess.Popen('puppet config print ca_server'.split(), stdout=subprocess.PIPE).communicate() | |
url = 'https://{0}:8140/environment'.format(puppet_server.rstrip()) | |
hostname = os.environ['HOSTNAME'] | |
certs = cert=('/var/lib/puppet/ssl/certs/{0}.pem'.format(hostname), | |
'/var/lib/puppet/ssl/private_keys/{0}.pem'.format(hostname)) | |
help = '''Puppet Certificate Manager | |
Makes API calls to the Puppet CA Master, must have client certificate whitelisted on CA for API to work | |
Actions: | |
- list = List all certificates (puppet cert --list --all) | |
- sign = Sign a certificate (puppet cert --sign) | |
- revoke = Revoke a certificate, adds certificate to CRL (puppet cert --revoke) | |
- del = Delete a certificate, actually removes certificate files from system (puppet cert --clean) | |
''' | |
class APIError(Exception): | |
def __init__(self, response): | |
self.response = response | |
self.url = response.url | |
self.status_code = response.status_code | |
self.text = response.text | |
def main(): | |
parser = argparse.ArgumentParser(description=help,formatter_class=argparse.RawDescriptionHelpFormatter) | |
subparsers = parser.add_subparsers(help='Action',dest='action') | |
parser_list = subparsers.add_parser('list', description='List all certs, like puppet cert list --all') | |
parser_get = subparsers.add_parser('get', description='Get a specific certificate') | |
parser_get.add_argument('name', help ="FQDN") | |
parser_set = subparsers.add_parser('set', description='Set desired state for a certificate') | |
parser_set.add_argument('name', help ="FQDN") | |
parser_set.add_argument('state', help="Desired state, either: signed, revoked") | |
parser_sign = subparsers.add_parser('sign', description='Sign a certificate') | |
parser_sign.add_argument('name', help ="FQDN") | |
parser_revoke = subparsers.add_parser('revoke', description='Revoke a certificate') | |
parser_revoke.add_argument('name', help ="FQDN") | |
parser_del = subparsers.add_parser('del', description='Delete a certificate') | |
parser_del.add_argument('name', help ="FQDN") | |
args = parser.parse_args() | |
if args.action == 'list': | |
cmd_getAll() | |
if args.action == 'get': | |
cmd_get(args.name) | |
if args.action == 'set': | |
cmd_set(args.name, args.state) | |
if args.action == 'sign': | |
cmd_set(args.name, 'signed') | |
if args.action == 'revoke': | |
cmd_set(args.name, 'revoked') | |
if args.action == 'del': | |
cmd_del(args.name) | |
def api_call(resource, http_action='get', data=None, success_code=requests.codes.ok): | |
req_func = getattr(requests, http_action) | |
if data: | |
r = req_func('{0}/{1}'.format(url, resource), cert=certs, data=data) | |
else: | |
r = req_func('{0}/{1}'.format(url, resource), cert=certs) | |
if r.status_code == success_code: | |
try: | |
return r.json() | |
except ValueError: | |
return "Success" | |
else: | |
raise APIError(r) | |
def cmd_del(name): | |
if name == hostname: | |
print "Cannot delete own certificate" | |
sys.exit(0) | |
req = api_call('certificate_status/{0}'.format(name), http_action='delete', success_code=204) | |
print req | |
def cmd_set(name, state): | |
body = json.dumps({'desired_state':state}) | |
req = api_call('certificate_status/{0}'.format(name), http_action='put', data=body, success_code=204) | |
print req | |
def cmd_get(name): | |
req = api_call('certificate_status/{0}'.format(name)) | |
for k in ['name', 'state', 'fingerprint', 'dns_alt_names']: | |
print "{0}: {1}".format(k,req[k]) | |
def cmd_getAll(): | |
req = api_call('certificate_statuses/key') | |
max_len = 1 | |
for node in req: | |
if len(node['name']) > max_len: | |
max_len = len(node['name']) | |
print "{0:{width}} {1:15}".format('Node', 'State', width=max_len) | |
for node in sorted(req, key=lambda n: n['name']): | |
print "{0:{width}} {1:15}".format(node['name'], node['state'], width=max_len) | |
if __name__ == '__main__': | |
try: | |
main() | |
except APIError as e: | |
print e.text | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment