-
-
Save mattisz/d112ebfe1869c56ce111ecbd2cbbd04d to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3 | |
# vim: autoindent tabstop=4 shiftwidth=4 expandtab softtabstop=4 filetype=python | |
# This file is part of Supermicro IPMI certificate updater. | |
# Supermicro IPMI certificate updater is free software: you can | |
# redistribute it and/or modify it under the terms of the GNU General Public | |
# License as published by the Free Software Foundation, version 2. | |
# | |
# This program is distributed in the hope that it will be useful, but WITHOUT | |
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS | |
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more | |
# details. | |
# | |
# You should have received a copy of the GNU General Public License along with | |
# this program; if not, write to the Free Software Foundation, Inc., 51 | |
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | |
# | |
# Copyright (c) Jari Turkia | |
import os | |
import argparse | |
import re | |
import requests | |
import logging | |
import json | |
from base64 import b64encode | |
from datetime import datetime | |
import xml.etree.ElementTree as etree | |
from urllib.parse import urlparse | |
REQUEST_TIMEOUT = 5.0 | |
class IPMIUpdater: | |
def __init__(self, session, ipmi_url): | |
self.session = session | |
self.ipmi_url = ipmi_url | |
self.login_url = f'{ipmi_url}/cgi/login.cgi' | |
self.cert_info_url = f'{ipmi_url}/cgi/ipmi.cgi' | |
self.upload_cert_url = f'{ipmi_url}/cgi/upload_ssl.cgi' | |
self.url_redirect_template = f'{ipmi_url}/cgi/url_redirect.cgi?url_name=%s' | |
self.use_b64encoded_login = True | |
self._csrf_token = None | |
error_log = logging.getLogger("IPMIUpdater") | |
error_log.setLevel(logging.ERROR) | |
self.setLogger(error_log) | |
def setLogger(self, logger): | |
self.logger = logger | |
def get_csrf_token(self, url_name): | |
if self._csrf_token is not None: | |
return self._csrf_token | |
page_url = self.url_redirect_template % url_name | |
result = self.session.get(page_url) | |
result.raise_for_status() | |
match = re.search(r'SmcCsrfInsert\s*\("CSRF_TOKEN",\s*"([^"]*)"\);', result.text) | |
if match: | |
return match.group(1) | |
def get_csrf_headers(self, url_name): | |
page_url = self.url_redirect_template % url_name | |
headers = { | |
"Origin": self.ipmi_url, | |
"Referer": page_url, | |
} | |
csrf_token = self.get_csrf_token(url_name) | |
if csrf_token is not None: | |
headers["CSRF_TOKEN"] = csrf_token | |
self.logger.debug("HEADERS:%s" % headers) | |
return headers | |
def get_xhr_headers(self, url_name): | |
headers = self.get_csrf_headers(url_name) | |
headers["X-Requested-With"] = "XMLHttpRequest" | |
return headers | |
def login(self, username, password, model): | |
""" | |
Log into IPMI interface | |
:param username: username to use for logging in | |
:param password: password to use for logging in | |
:return: bool | |
""" | |
if model != "X12": | |
if self.use_b64encoded_login: | |
login_data = { | |
'name': b64encode(username.encode("UTF-8")), | |
'pwd': b64encode(password.encode("UTF-8")), | |
'check': '00' | |
} | |
else: | |
login_data = { | |
'name': username, | |
'pwd': password | |
} | |
try: | |
result = self.session.post(self.login_url, login_data, timeout=REQUEST_TIMEOUT, verify=False) | |
except ConnectionError: | |
return False | |
if not result.ok: | |
return result.status_code | |
if '/cgi/url_redirect.cgi?url_name=mainmenu' not in result.text: | |
return False | |
# Set mandatory cookies: | |
url_parts = urlparse(self.ipmi_url) | |
# Cookie: langSetFlag=0; language=English; SID=<dynamic session ID here!>; mainpage=configuration; subpage=config_ssl | |
mandatory_cookies = { | |
'langSetFlag': '0', | |
'language': 'English' | |
} | |
for cookie_name, cookie_value in mandatory_cookies.items(): | |
self.session.cookies.set(cookie_name, cookie_value, domain=url_parts.hostname) | |
return True | |
else: | |
login_data = { | |
'UserName': username, | |
'Password': password | |
} | |
request_headers = {'Content-Type': 'application/json'} | |
try: | |
result = self.session.post(self.login_url, data=json.dumps(login_data), headers=request_headers, timeout=REQUEST_TIMEOUT, verify=False) | |
except ConnectionError: | |
return False | |
if not result.ok: | |
return result.status_code | |
return result | |
def get_ipmi_cert_info(self, model, token): | |
""" | |
Verify existing certificate information | |
:return: dict | |
""" | |
if model == "X12": | |
request_headers = { | |
'Content-Type': 'application/json', | |
'X-Auth-Token': token | |
} | |
try: | |
r = self.session.get(self.cert_info_url, headers=request_headers, verify=False) | |
except ConnectionError: | |
return False | |
if not r.ok: | |
return False | |
data = r.json() | |
valid_from = datetime.strptime(data['VaildFrom'].rstrip(re.split('\d{4}', data['VaildFrom'])[1]), r"%b %d %H:%M:%S %Y") | |
valid_until = datetime.strptime(data['GoodTHRU'].rstrip(re.split('\d{4}', data['GoodTHRU'])[1]), r"%b %d %H:%M:%S %Y") | |
return { | |
'has_cert': True, | |
'valid_from': valid_from, | |
'valid_until': valid_until | |
} | |
headers = self.get_xhr_headers("config_ssl") | |
cert_info_data = self._get_op_data('SSL_STATUS.XML', '(0,0)') | |
try: | |
result = self.session.post(self.cert_info_url, cert_info_data, headers=headers, timeout=REQUEST_TIMEOUT, verify=False) | |
except ConnectionError: | |
return False | |
if not result.ok: | |
return False | |
self.logger.debug(result.text) | |
root = etree.fromstring(result.text) | |
# <?xml> <IPMI> <SSL_INFO> <STATUS> | |
status = root.findall('.//SSL_INFO/STATUS') | |
if not status: | |
return False | |
# Since xpath will return a list, just pick the first one from it. | |
status = status[0] | |
has_cert = bool(int(status.get('CERT_EXIST'))) | |
if has_cert: | |
valid_from = datetime.strptime(status.get('VALID_FROM'), r"%b %d %H:%M:%S %Y") | |
valid_until = datetime.strptime(status.get('VALID_UNTIL'), r"%b %d %H:%M:%S %Y") | |
return { | |
'has_cert': has_cert, | |
'valid_from': valid_from, | |
'valid_until': valid_until | |
} | |
def get_ipmi_cert_valid(self): | |
""" | |
Verify existing certificate information | |
:return: bool | |
""" | |
headers = self.get_xhr_headers("config_ssl") | |
cert_info_data = self._get_op_data('SSL_VALIDATE.XML', '(0,0)') | |
try: | |
result = self.session.post(self.cert_info_url, cert_info_data, headers=headers, timeout=REQUEST_TIMEOUT, verify=False) | |
except ConnectionError: | |
return False | |
if not result.ok: | |
return False | |
self.logger.debug(result.text) | |
root = etree.fromstring(result.text) | |
# <?xml> <IPMI> <SSL_INFO> | |
status = root.findall('.//SSL_INFO') | |
if not status: | |
return False | |
# Since xpath will return a list, just pick the first one from it. | |
status = status[0] | |
return bool(int(status.get('VALIDATE'))) | |
def upload_cert(self, key_file, cert_file, model, token): | |
""" | |
Send X.509 certificate and private key to server | |
:param session: Current session object | |
:type session requests.session | |
:param url: base-URL to IPMI | |
:param key_file: filename to X.509 certificate private key | |
:param cert_file: filename to X.509 certificate PEM | |
:return: | |
""" | |
with open(key_file, 'rb') as filehandle: | |
key_data = filehandle.read() | |
with open(cert_file, 'rb') as filehandle: | |
cert_data = filehandle.read() | |
# extract certificates only (IMPI doesn't like DH PARAMS) | |
cert_data = b'\n'.join(re.findall(b'-----BEGIN CERTIFICATE-----.*?-----END CERTIFICATE-----', cert_data, re.DOTALL)) + b'\n' | |
if model == 'X12': | |
substr = b'-----END CERTIFICATE-----\n' | |
cert_data = cert_data.split(substr)[0] + substr | |
files_to_upload = self._get_upload_data(cert_data, key_data) | |
request_headers = {'X-Auth-Token': token} | |
try: | |
result = self.session.post(self.upload_cert_url, files=files_to_upload, headers=request_headers, timeout=REQUEST_TIMEOUT, verify=False) | |
except ConnectionError: | |
return False | |
if not 'SSL certificate and private key were successfully uploaded' in result.text: | |
return False | |
return True | |
else: | |
files_to_upload = self._get_upload_data(cert_data, key_data) | |
headers = self.get_csrf_headers("config_ssl") | |
csrf_token = self.get_csrf_token("config_ssl") | |
csrf_data = {} | |
if csrf_token is not None: | |
csrf_data["CSRF_TOKEN"] = csrf_token | |
try: | |
result = self.session.post(self.upload_cert_url, csrf_data, files=files_to_upload, headers=headers, timeout=REQUEST_TIMEOUT, verify=False) | |
except ConnectionError: | |
return False | |
if not result.ok: | |
return False | |
if 'Content-Type' not in result.headers.keys() or result.headers['Content-Type'] != 'text/html': | |
# On failure, Content-Type will be 'text/plain' and 'Transfer-Encoding' is 'chunked' | |
return False | |
if 'CONFPAGE_RESET' not in result.text: | |
return False | |
return True | |
def _check_reboot_result(self, result): | |
return True | |
def reboot_ipmi(self, model, token): | |
if model != 'X12': | |
# do we need a different Referer here? | |
headers = self.get_xhr_headers("config_ssl") | |
reboot_data = self._get_op_data('main_bmcreset', None) | |
try: | |
result = self.session.post(self.reboot_url, reboot_data, headers=headers, timeout=REQUEST_TIMEOUT, verify=False) | |
except ConnectionError: | |
return False | |
if not result.ok: | |
return False | |
if not self._check_reboot_result(result): | |
return False | |
return True | |
else: | |
request_headers = {'X-Auth-Token': token} | |
try: | |
result = self.session.post(self.reboot_url, headers=request_headers, timeout=REQUEST_TIMEOUT, verify=False) | |
except ConnectionError: | |
return False | |
if not result.ok: | |
return False | |
return True | |
class IPMIX9Updater(IPMIUpdater): | |
class TLSv1HttpAdapter(requests.adapters.HTTPAdapter): | |
""""Transport adapter" that allows us to use SSLv3.""" | |
def init_poolmanager(self, connections, maxsize, block=False): | |
import ssl | |
from urllib3.poolmanager import PoolManager | |
ctx = ssl.SSLContext(protocol=ssl.PROTOCOL_TLSv1) | |
ctx.load_default_certs() | |
ctx.set_ciphers('DEFAULT@SECLEVEL=1') | |
self.poolmanager = PoolManager( | |
num_pools=connections, maxsize=maxsize, | |
block=block, ssl_context=ctx) | |
def __init__(self, session, ipmi_url): | |
super().__init__(session, ipmi_url) | |
self.reboot_url = f'{ipmi_url}/cgi/BMCReset.cgi' | |
self.use_b64encoded_login = False | |
self.session.mount('https://', IPMIX9Updater.TLSv1HttpAdapter()) | |
def _get_op_data(self, op, r): | |
timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT') | |
data = { | |
'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)' | |
} | |
if r is not None: | |
data[op] = r | |
return data | |
def _get_upload_data(self, cert_data, key_data): | |
return [ | |
('sslcrt_file', ('cert.pem', cert_data, 'application/octet-stream')), | |
('privkey_file', ('privkey.pem', key_data, 'application/octet-stream')) | |
] | |
def _check_reboot_result(self, result): | |
self.logger.debug(result.text) | |
root = etree.fromstring(result.text) | |
# <?xml> <IPMI> <SSL_INFO> | |
status = root.findall('.//BMC_RESET/STATE') | |
if not status: | |
return False | |
if status[0].get('CODE') == 'OK': | |
return True | |
return False | |
#if '<STATE CODE="OK"/>' not in result.text: | |
# return False | |
def get_ipmi_cert_valid(self): | |
""" | |
Verify existing certificate information | |
:return: bool | |
""" | |
headers = self.get_xhr_headers("config_ssl") | |
cert_info_data = self._get_op_data('SSL_VALIDATE.XML', '(0,0)') | |
try: | |
result = self.session.post(self.cert_info_url, cert_info_data, headers=headers, timeout=REQUEST_TIMEOUT, verify=False) | |
except ConnectionError: | |
return False | |
if not result.ok: | |
return False | |
self.logger.debug(result.text) | |
root = etree.fromstring(result.text) | |
# <?xml> <IPMI> <SSL_INFO> <VALIDATE> | |
status = root.findall('.//SSL_INFO/VALIDATE') | |
if not status: | |
return False | |
# Since xpath will return a list, just pick the first one from it. | |
status = status[0] | |
return bool(int(status.get('CERT'))) and bool(int(status.get('KEY'))) | |
class IPMIX10Updater(IPMIUpdater): | |
def __init__(self, session, ipmi_url): | |
super().__init__(session, ipmi_url) | |
self.reboot_url = f'{ipmi_url}/cgi/BMCReset.cgi' | |
self.use_b64encoded_login = False | |
def _get_op_data(self, op, r): | |
timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT') | |
data = { | |
'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)' | |
} | |
if r is not None: | |
data[op] = r | |
return data | |
def _get_upload_data(self, cert_data, key_data): | |
return [ | |
('cert_file', ('cert.pem', cert_data, 'application/octet-stream')), | |
('key_file', ('privkey.pem', key_data, 'application/octet-stream')) | |
] | |
def _check_reboot_result(self, result): | |
self.logger.debug(result.text) | |
root = etree.fromstring(result.text) | |
# <?xml> <IPMI> <SSL_INFO> | |
status = root.findall('.//BMC_RESET/STATE') | |
if not status: | |
return False | |
if status[0].get('CODE') == 'OK': | |
return True | |
return False | |
#if '<STATE CODE="OK"/>' not in result.text: | |
# return False | |
class IPMIX11Updater(IPMIUpdater): | |
def __init__(self, session, ipmi_url): | |
super().__init__(session, ipmi_url) | |
self.reboot_url = f'{ipmi_url}/cgi/op.cgi' | |
self.use_b64encoded_login = True | |
def _get_op_data(self, op, r): | |
data = { | |
'op': op | |
} | |
if r is not None: | |
data['r'] = r | |
data['_'] = '' | |
return data | |
def _get_upload_data(self, cert_data, key_data): | |
return [ | |
('cert_file', ('fullchain.pem', cert_data, 'application/octet-stream')), | |
('key_file', ('privkey.pem', key_data, 'application/octet-stream')) | |
] | |
class IPMIX12Updater(IPMIUpdater): | |
def __init__(self, session, ipmi_url): | |
super().__init__(session, ipmi_url) | |
self.login_url = f'{ipmi_url}/redfish/v1/SessionService/Sessions' | |
self.cert_info_url = f'{ipmi_url}/redfish/v1/UpdateService/Oem/Supermicro/SSLCert' | |
self.upload_cert_url = f'{ipmi_url}/redfish/v1/UpdateService/Oem/Supermicro/SSLCert/Actions/SmcSSLCert.Upload' | |
self.reboot_url = f'{ipmi_url}/redfish/v1/Managers/1/Actions/Manager.Reset' | |
self.use_b64encoded_login = False | |
def _get_upload_data(self, cert_data, key_data): | |
return { | |
'cert_file' : cert_data, | |
'key_file' : key_data | |
} | |
def parse_valid_until(pem): | |
from datetime import datetime | |
from OpenSSL import crypto as c | |
with open(pem, 'rb') as fh: | |
cert = c.load_certificate(c.FILETYPE_PEM, fh.read()) | |
return datetime.strptime(cert.get_notAfter().decode('utf8'), "%Y%m%d%H%M%SZ") | |
def create_updater(args): | |
session = requests.session() | |
if not args.quiet: | |
print("Board model is " + args.model) | |
if args.model == "X10": | |
return IPMIX10Updater(session, args.ipmi_url) | |
elif args.model == "X11": | |
return IPMIX11Updater(session, args.ipmi_url) | |
elif args.model == "X9": | |
return IPMIX9Updater(session, args.ipmi_url) | |
elif args.model == "X12": | |
return IPMIX12Updater(session, args.ipmi_url) | |
else: | |
raise Exception(f"Unknown model: {args.model}") | |
def main(): | |
parser = argparse.ArgumentParser(description='Update Supermicro IPMI SSL certificate') | |
parser.add_argument('--ipmi-url', required=True, | |
help='Supermicro IPMI 2.0 URL') | |
parser.add_argument('--model', required=True, | |
help='Board model: X9, X10, X11, X12, X13') | |
parser.add_argument('--key-file', required=True, | |
help='X.509 Private key filename') | |
parser.add_argument('--cert-file', required=True, | |
help='X.509 Certificate filename') | |
parser.add_argument('--username', required=True, | |
help='IPMI username with admin access') | |
parser.add_argument('--password', required=True, | |
help='IPMI user password') | |
parser.add_argument('--no-reboot', action='store_true', | |
help='The default is to reboot the IPMI after upload for the change to take effect.') | |
parser.add_argument('--force-update', action='store_true', | |
help='Ignore the cert end date check, always replace the cert.') | |
parser.add_argument('--quiet', action='store_true', | |
help='Do not output anything if successful') | |
parser.add_argument('--debug', action='store_true', | |
help='Output additional debugging') | |
args = parser.parse_args() | |
# Confirm args | |
if not os.path.isfile(args.key_file): | |
print("--key-file '%s' doesn't exist!" % args.key_file) | |
exit(2) | |
if not os.path.isfile(args.cert_file): | |
print("--cert-file '%s' doesn't exist!" % args.cert_file) | |
exit(2) | |
if args.ipmi_url[-1] == '/': | |
args.ipmi_url = args.ipmi_url[0:-1] | |
if args.debug: | |
import http.client as http_client | |
http_client.HTTPConnection.debuglevel = 1 | |
# Enable request logging | |
logging.basicConfig() | |
logging.getLogger().setLevel(logging.DEBUG) | |
requests_log = logging.getLogger("requests.packages.urllib3") | |
requests_log.setLevel(logging.DEBUG) | |
requests_log.propagate = True | |
if args.model == "X13": | |
args.model = "X12" | |
# Start the operation | |
requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning) | |
updater = create_updater(args) | |
if args.debug: | |
debug_log = logging.getLogger("IPMIUpdater") | |
debug_log.setLevel(logging.DEBUG) | |
updater.setLogger(debug_log) | |
login_response = updater.login(args.username, args.password, args.model) | |
if not login_response: | |
print("Login failed. Cannot continue!") | |
exit(2) | |
elif args.model == 'X12': | |
try: | |
token = login_response.headers['X-Auth-Token'] | |
except: | |
print(f'ERROR: Login failed with error {login_response}') | |
exit(2) | |
else: | |
token = None | |
cert_info = updater.get_ipmi_cert_info(args.model, token) | |
if not cert_info: | |
print("Failed to extract certificate information from IPMI!") | |
exit(2) | |
current_valid_until = cert_info.get('valid_until', None) | |
if not args.quiet and cert_info['has_cert']: | |
print("There exists a certificate, which is valid until: %s" % cert_info['valid_until']) | |
new_valid_until = parse_valid_until(args.cert_file) | |
if current_valid_until == new_valid_until: | |
if not args.force_update: | |
print("New cert validity period matches existing cert, nothing to do") | |
exit(0) | |
else: | |
print("New cert validity period matches existing cert, will update regardless") | |
# Go upload! | |
if not updater.upload_cert(args.key_file, args.cert_file, args.model, token): | |
print("Failed to upload X.509 files to IPMI!") | |
exit(2) | |
if args.model != 'X12': | |
cert_valid = updater.get_ipmi_cert_valid() | |
if not cert_valid: | |
print("Uploads failed validation") | |
exit(2) | |
if not args.quiet: | |
print("Uploaded files ok.") | |
cert_info = updater.get_ipmi_cert_info(args.model, token) | |
if not cert_info: | |
print("Failed to extract certificate information from IPMI!") | |
exit(2) | |
if not args.quiet and cert_info['has_cert']: | |
print("After upload, there exists a certificate, which is valid until: %s" % cert_info['valid_until']) | |
if not args.no_reboot: | |
if not args.quiet: | |
print("Rebooting IPMI to apply changes.") | |
if not updater.reboot_ipmi(args.model, token): | |
print("Rebooting failed! Go reboot it manually?") | |
if not args.quiet: | |
print("All done!") | |
if __name__ == "__main__": | |
main() | |
services:
ipmi-updater-x13:
image: certbot/dns-route53
volumes:
- ipmi-updater:/certs
environment:
- AWS_ACCESS_KEY_ID=XXXX
- AWS_SECRET_ACCESS_KEY=XXXX
- IPMI_URL=https://xxx.yyy.com
- USER=abc
- PASS=abc
- KEY_FILE=/certs/config/live/xxx.yyy.com/privkey.pem
- CERT_FILE=/certs/config/live/xxx.yyy.com/fullchain.pem
- MODEL=X13
# - NO_REBOOT=True
# - FORCE_UPDATE=True
# - QUIET=True
# - DEBUG=True
user: 1000:1000
command:
- certonly
- --non-interactive
- --agree-tos
- --logs-dir
- /certs/logs
- --config-dir
- /certs/config
- --work-dir
- /certs/work
- --cert-name
- xxx.yyy.com
- --key-type
- rsa
- -m [email protected]
- -d xxx.yyy.com
- --dns-route53
- --post-hook
- "/bin/sh /certs/ipmi-updater.sh"
Seems to work well - thank you! Don't suppose with X10/X11 (and X12/X13 I suppose too) that there is a way to extract the board so I can pass that to the --model option?
@CrypNZ Glad the script is working for you.
There is already a way for the X9-X11 boards. Look at the code I removed in the fork history. I cut it out because I didn't need it and didn't feel like implementing it for the X12+ boards. It can definitely be done if you have some basic Python and REST API knowledge but it's not on my agenda.
If your boards are all X9-X11 I might suggest using the gist I forked this from with that functionality still built in.
My suggestion would be this patch:
+286
if 'Content-Type' not in result.headers.keys() or 'text/html' not in result.headers['Content-Type']:
Because in my case it replied on an X11SCL-IF fw: 1.74:
result.headers:
{..., 'Content-Type': 'text/html; charset=utf-8', ....}
Line 445 needs to be "self.use_b64encoded_login = False" or else it won't work with X11
Has anyone seen an issue where the SSL certificate seems to be installed / updated but the admin interface is still using the Supermicro certificate (I tried manually rebooting the BMC too)?
Added support for X12/X13 boards thanks to the expanded Redfish API functionality.
Pro tip for the docker fans out there: The official certbot image contains every dependency required to run this gist so you can just add the script to your cert folder along with a little helper shell script to set the arguments. Then you can have cerbot run the helper script as a --deploy-hook. Get everything set up in a compose file or a one liner and call it with a cron job however frequently you see fit. Finally, automated IPMI certs!
Example helper script