Skip to content

Instantly share code, notes, and snippets.

@stuart-warren
Created November 29, 2018 10:53
Show Gist options
  • Save stuart-warren/b32cb5f480d3a8d8b9ff73f9e20ff745 to your computer and use it in GitHub Desktop.
Save stuart-warren/b32cb5f480d3a8d8b9ff73f9e20ff745 to your computer and use it in GitHub Desktop.
kubernetes mutating admission webhook (Python)
#!/usr/bin/env python
from http import HTTPStatus
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
import base64
import json
import logging
import os
import os.path
import ssl
CERT_PATH = os.getenv("CERT_PATH", "/certs/tls.crt")
KEY_PATH = os.getenv("KEY_PATH", "/certs/tls.key")
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
numeric_level = getattr(logging, LOG_LEVEL.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError('Invalid LOG_LEVEL: %s' % LOG_LEVEL)
logging.basicConfig(level=numeric_level)
class Webhook(BaseHTTPRequestHandler):
def _build_response(self, uid):
response = {
"response": {
"uid": uid,
"allowed": True,
"patch": "[]",
"patchType": "JSONPatch",
}
}
patch = [{'op': 'add',
'path': '/metadata/labels/hasbeenmutated',
'value': 'true']
response['response']['patch'] = base64.b64encode(json.dumps(patch).encode('utf-8')).decode()
return response
def _send_json(self, data):
body = json.dumps(data).encode('utf-8')
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
logging.info(str(body))
self.wfile.write(body)
def do_POST(self):
try:
raw = self.rfile.read(int(self.headers.get('content-length')))
logging.debug(str(raw))
recieved = json.loads(raw)
except json.decoder.JSONDecodeError as j:
error_debug = "JSONDecodeError: {0}".format(j.msg)
self.send_error(HTTPStatus.BAD_REQUEST, error_debug)
return
except Exception as e:
error_debug = "Error: {0}".format(e)
self.send_error(HTTPStatus.BAD_REQUEST, error_debug)
return
if recieved.get('kind') != "AdmissionReview":
self.send_error(HTTPStatus.BAD_REQUEST, "Not an AdmissionReview")
return
uid = recieved.get('request')['uid']
self._send_json(self._build_response(uid))
def do_PUT(self):
self.do_POST()
def run():
logging.info('CLUSTER_LOCATION set to '+CLUSTER_LOCATION)
httpd = ThreadingHTTPServer(('', 8080), Webhook)
if os.path.isfile(CERT_PATH) and os.path.isfile(KEY_PATH):
logging.info('using tls cert at '+CERT_PATH)
logging.info('using tls key at '+KEY_PATH)
httpd.socket = ssl.wrap_socket(httpd.socket, certfile=CERT_PATH, keyfile=KEY_PATH, server_side=True)
logging.info('starting on 8080...')
httpd.serve_forever()
if __name__ == '__main__':
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment