-
-
Save mattisz/d112ebfe1869c56ce111ecbd2cbbd04d to your computer and use it in GitHub Desktop.
Supermicro IPMI certificate updater
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# vim: autoindent tabstop=4 shiftwidth=4 expandtab softtabstop=4 filetype=python | |
# This file is part of Supermicro IPMI certificate updater. | |
# Supermicro IPMI certificate updater is free software: you can | |
# redistribute it and/or modify it under the terms of the GNU General Public | |
# License as published by the Free Software Foundation, version 2. | |
# | |
# This program is distributed in the hope that it will be useful, but WITHOUT | |
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS | |
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more | |
# details. | |
# | |
# You should have received a copy of the GNU General Public License along with | |
# this program; if not, write to the Free Software Foundation, Inc., 51 | |
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | |
# | |
# Copyright (c) Jari Turkia | |
import os | |
import argparse | |
import re | |
import requests | |
import logging | |
import json | |
from base64 import b64encode | |
from datetime import datetime | |
import xml.etree.ElementTree as etree | |
from urllib.parse import urlparse | |
REQUEST_TIMEOUT = 5.0 | |
class IPMIUpdater: | |
def __init__(self, session, ipmi_url): | |
self.session = session | |
self.ipmi_url = ipmi_url | |
self.login_url = f'{ipmi_url}/cgi/login.cgi' | |
self.cert_info_url = f'{ipmi_url}/cgi/ipmi.cgi' | |
self.upload_cert_url = f'{ipmi_url}/cgi/upload_ssl.cgi' | |
self.url_redirect_template = f'{ipmi_url}/cgi/url_redirect.cgi?url_name=%s' | |
self.use_b64encoded_login = True | |
self._csrf_token = None | |
error_log = logging.getLogger("IPMIUpdater") | |
error_log.setLevel(logging.ERROR) | |
self.setLogger(error_log) | |
def setLogger(self, logger): | |
self.logger = logger | |
def get_csrf_token(self, url_name): | |
if self._csrf_token is not None: | |
return self._csrf_token | |
page_url = self.url_redirect_template % url_name | |
result = self.session.get(page_url) | |
result.raise_for_status() | |
match = re.search(r'SmcCsrfInsert\s*\("CSRF_TOKEN",\s*"([^"]*)"\);', result.text) | |
if match: | |
return match.group(1) | |
def get_csrf_headers(self, url_name): | |
page_url = self.url_redirect_template % url_name | |
headers = { | |
"Origin": self.ipmi_url, | |
"Referer": page_url, | |
} | |
csrf_token = self.get_csrf_token(url_name) | |
if csrf_token is not None: | |
headers["CSRF_TOKEN"] = csrf_token | |
self.logger.debug("HEADERS:%s" % headers) | |
return headers | |
def get_xhr_headers(self, url_name): | |
headers = self.get_csrf_headers(url_name) | |
headers["X-Requested-With"] = "XMLHttpRequest" | |
return headers | |
def login(self, username, password, model): | |
""" | |
Log into IPMI interface | |
:param username: username to use for logging in | |
:param password: password to use for logging in | |
:return: bool | |
""" | |
if model != "X12": | |
if self.use_b64encoded_login: | |
login_data = { | |
'name': b64encode(username.encode("UTF-8")), | |
'pwd': b64encode(password.encode("UTF-8")), | |
'check': '00' | |
} | |
else: | |
login_data = { | |
'name': username, | |
'pwd': password | |
} | |
try: | |
result = self.session.post(self.login_url, login_data, timeout=REQUEST_TIMEOUT, verify=False) | |
except ConnectionError: | |
return False | |
if not result.ok: | |
return result.status_code | |
if '/cgi/url_redirect.cgi?url_name=mainmenu' not in result.text: | |
return False | |
# Set mandatory cookies: | |
url_parts = urlparse(self.ipmi_url) | |
# Cookie: langSetFlag=0; language=English; SID=<dynamic session ID here!>; mainpage=configuration; subpage=config_ssl | |
mandatory_cookies = { | |
'langSetFlag': '0', | |
'language': 'English' | |
} | |
for cookie_name, cookie_value in mandatory_cookies.items(): | |
self.session.cookies.set(cookie_name, cookie_value, domain=url_parts.hostname) | |
return True | |
else: | |
login_data = { | |
'UserName': username, | |
'Password': password | |
} | |
request_headers = {'Content-Type': 'application/json'} | |
try: | |
result = self.session.post(self.login_url, data=json.dumps(login_data), headers=request_headers, timeout=REQUEST_TIMEOUT, verify=False) | |
except ConnectionError: | |
return False | |
if not result.ok: | |
return result.status_code | |
return result | |
def get_ipmi_cert_info(self, model, token): | |
""" | |
Verify existing certificate information | |
:return: dict | |
""" | |
if model == "X12": | |
request_headers = { | |
'Content-Type': 'application/json', | |
'X-Auth-Token': token | |
} | |
try: | |
r = self.session.get(self.cert_info_url, headers=request_headers, verify=False) | |
except ConnectionError: | |
return False | |
if not r.ok: | |
return False | |
data = r.json() | |
valid_from = datetime.strptime(data['VaildFrom'].rstrip(re.split('\d{4}', data['VaildFrom'])[1]), r"%b %d %H:%M:%S %Y") | |
valid_until = datetime.strptime(data['GoodTHRU'].rstrip(re.split('\d{4}', data['GoodTHRU'])[1]), r"%b %d %H:%M:%S %Y") | |
return { | |
'has_cert': True, | |
'valid_from': valid_from, | |
'valid_until': valid_until | |
} | |
headers = self.get_xhr_headers("config_ssl") | |
cert_info_data = self._get_op_data('SSL_STATUS.XML', '(0,0)') | |
try: | |
result = self.session.post(self.cert_info_url, cert_info_data, headers=headers, timeout=REQUEST_TIMEOUT, verify=False) | |
except ConnectionError: | |
return False | |
if not result.ok: | |
return False | |
self.logger.debug(result.text) | |
root = etree.fromstring(result.text) | |
# <?xml> <IPMI> <SSL_INFO> <STATUS> | |
status = root.findall('.//SSL_INFO/STATUS') | |
if not status: | |
return False | |
# Since xpath will return a list, just pick the first one from it. | |
status = status[0] | |
has_cert = bool(int(status.get('CERT_EXIST'))) | |
if has_cert: | |
valid_from = datetime.strptime(status.get('VALID_FROM'), r"%b %d %H:%M:%S %Y") | |
valid_until = datetime.strptime(status.get('VALID_UNTIL'), r"%b %d %H:%M:%S %Y") | |
return { | |
'has_cert': has_cert, | |
'valid_from': valid_from, | |
'valid_until': valid_until | |
} | |
def get_ipmi_cert_valid(self): | |
""" | |
Verify existing certificate information | |
:return: bool | |
""" | |
headers = self.get_xhr_headers("config_ssl") | |
cert_info_data = self._get_op_data('SSL_VALIDATE.XML', '(0,0)') | |
try: | |
result = self.session.post(self.cert_info_url, cert_info_data, headers=headers, timeout=REQUEST_TIMEOUT, verify=False) | |
except ConnectionError: | |
return False | |
if not result.ok: | |
return False | |
self.logger.debug(result.text) | |
root = etree.fromstring(result.text) | |
# <?xml> <IPMI> <SSL_INFO> | |
status = root.findall('.//SSL_INFO') | |
if not status: | |
return False | |
# Since xpath will return a list, just pick the first one from it. | |
status = status[0] | |
return bool(int(status.get('VALIDATE'))) | |
def upload_cert(self, key_file, cert_file, model, token): | |
""" | |
Send X.509 certificate and private key to server | |
:param session: Current session object | |
:type session requests.session | |
:param url: base-URL to IPMI | |
:param key_file: filename to X.509 certificate private key | |
:param cert_file: filename to X.509 certificate PEM | |
:return: | |
""" | |
with open(key_file, 'rb') as filehandle: | |
key_data = filehandle.read() | |
with open(cert_file, 'rb') as filehandle: | |
cert_data = filehandle.read() | |
# extract certificates only (IMPI doesn't like DH PARAMS) | |
cert_data = b'\n'.join(re.findall(b'-----BEGIN CERTIFICATE-----.*?-----END CERTIFICATE-----', cert_data, re.DOTALL)) + b'\n' | |
if model == 'X12': | |
substr = b'-----END CERTIFICATE-----\n' | |
cert_data = cert_data.split(substr)[0] + substr | |
files_to_upload = self._get_upload_data(cert_data, key_data) | |
request_headers = {'X-Auth-Token': token} | |
try: | |
result = self.session.post(self.upload_cert_url, files=files_to_upload, headers=request_headers, timeout=REQUEST_TIMEOUT, verify=False) | |
except ConnectionError: | |
return False | |
if not 'SSL certificate and private key were successfully uploaded' in result.text: | |
return False | |
return True | |
else: | |
files_to_upload = self._get_upload_data(cert_data, key_data) | |
headers = self.get_csrf_headers("config_ssl") | |
csrf_token = self.get_csrf_token("config_ssl") | |
csrf_data = {} | |
if csrf_token is not None: | |
csrf_data["CSRF_TOKEN"] = csrf_token | |
try: | |
result = self.session.post(self.upload_cert_url, csrf_data, files=files_to_upload, headers=headers, timeout=REQUEST_TIMEOUT, verify=False) | |
except ConnectionError: | |
return False | |
if not result.ok: | |
return False | |
if 'Content-Type' not in result.headers.keys() or result.headers['Content-Type'] != 'text/html': | |
# On failure, Content-Type will be 'text/plain' and 'Transfer-Encoding' is 'chunked' | |
return False | |
if 'CONFPAGE_RESET' not in result.text: | |
return False | |
return True | |
def _check_reboot_result(self, result): | |
return True | |
def reboot_ipmi(self, model, token): | |
if model != 'X12': | |
# do we need a different Referer here? | |
headers = self.get_xhr_headers("config_ssl") | |
reboot_data = self._get_op_data('main_bmcreset', None) | |
try: | |
result = self.session.post(self.reboot_url, reboot_data, headers=headers, timeout=REQUEST_TIMEOUT, verify=False) | |
except ConnectionError: | |
return False | |
if not result.ok: | |
return False | |
if not self._check_reboot_result(result): | |
return False | |
return True | |
else: | |
request_headers = {'X-Auth-Token': token} | |
try: | |
result = self.session.post(self.reboot_url, headers=request_headers, timeout=REQUEST_TIMEOUT, verify=False) | |
except ConnectionError: | |
return False | |
if not result.ok: | |
return False | |
return True | |
class IPMIX9Updater(IPMIUpdater): | |
class TLSv1HttpAdapter(requests.adapters.HTTPAdapter): | |
""""Transport adapter" that allows us to use SSLv3.""" | |
def init_poolmanager(self, connections, maxsize, block=False): | |
import ssl | |
from urllib3.poolmanager import PoolManager | |
ctx = ssl.SSLContext(protocol=ssl.PROTOCOL_TLSv1) | |
ctx.load_default_certs() | |
ctx.set_ciphers('DEFAULT@SECLEVEL=1') | |
self.poolmanager = PoolManager( | |
num_pools=connections, maxsize=maxsize, | |
block=block, ssl_context=ctx) | |
def __init__(self, session, ipmi_url): | |
super().__init__(session, ipmi_url) | |
self.reboot_url = f'{ipmi_url}/cgi/BMCReset.cgi' | |
self.use_b64encoded_login = False | |
self.session.mount('https://', IPMIX9Updater.TLSv1HttpAdapter()) | |
def _get_op_data(self, op, r): | |
timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT') | |
data = { | |
'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)' | |
} | |
if r is not None: | |
data[op] = r | |
return data | |
def _get_upload_data(self, cert_data, key_data): | |
return [ | |
('sslcrt_file', ('cert.pem', cert_data, 'application/octet-stream')), | |
('privkey_file', ('privkey.pem', key_data, 'application/octet-stream')) | |
] | |
def _check_reboot_result(self, result): | |
self.logger.debug(result.text) | |
root = etree.fromstring(result.text) | |
# <?xml> <IPMI> <SSL_INFO> | |
status = root.findall('.//BMC_RESET/STATE') | |
if not status: | |
return False | |
if status[0].get('CODE') == 'OK': | |
return True | |
return False | |
#if '<STATE CODE="OK"/>' not in result.text: | |
# return False | |
def get_ipmi_cert_valid(self): | |
""" | |
Verify existing certificate information | |
:return: bool | |
""" | |
headers = self.get_xhr_headers("config_ssl") | |
cert_info_data = self._get_op_data('SSL_VALIDATE.XML', '(0,0)') | |
try: | |
result = self.session.post(self.cert_info_url, cert_info_data, headers=headers, timeout=REQUEST_TIMEOUT, verify=False) | |
except ConnectionError: | |
return False | |
if not result.ok: | |
return False | |
self.logger.debug(result.text) | |
root = etree.fromstring(result.text) | |
# <?xml> <IPMI> <SSL_INFO> <VALIDATE> | |
status = root.findall('.//SSL_INFO/VALIDATE') | |
if not status: | |
return False | |
# Since xpath will return a list, just pick the first one from it. | |
status = status[0] | |
return bool(int(status.get('CERT'))) and bool(int(status.get('KEY'))) | |
class IPMIX10Updater(IPMIUpdater): | |
def __init__(self, session, ipmi_url): | |
super().__init__(session, ipmi_url) | |
self.reboot_url = f'{ipmi_url}/cgi/BMCReset.cgi' | |
self.use_b64encoded_login = False | |
def _get_op_data(self, op, r): | |
timestamp = datetime.utcnow().strftime('%a %d %b %Y %H:%M:%S GMT') | |
data = { | |
'time_stamp': timestamp # 'Thu Jul 12 2018 19:52:48 GMT+0300 (FLE Daylight Time)' | |
} | |
if r is not None: | |
data[op] = r | |
return data | |
def _get_upload_data(self, cert_data, key_data): | |
return [ | |
('cert_file', ('cert.pem', cert_data, 'application/octet-stream')), | |
('key_file', ('privkey.pem', key_data, 'application/octet-stream')) | |
] | |
def _check_reboot_result(self, result): | |
self.logger.debug(result.text) | |
root = etree.fromstring(result.text) | |
# <?xml> <IPMI> <SSL_INFO> | |
status = root.findall('.//BMC_RESET/STATE') | |
if not status: | |
return False | |
if status[0].get('CODE') == 'OK': | |
return True | |
return False | |
#if '<STATE CODE="OK"/>' not in result.text: | |
# return False | |
class IPMIX11Updater(IPMIUpdater): | |
def __init__(self, session, ipmi_url): | |
super().__init__(session, ipmi_url) | |
self.reboot_url = f'{ipmi_url}/cgi/op.cgi' | |
self.use_b64encoded_login = True | |
def _get_op_data(self, op, r): | |
data = { | |
'op': op | |
} | |
if r is not None: | |
data['r'] = r | |
data['_'] = '' | |
return data | |
def _get_upload_data(self, cert_data, key_data): | |
return [ | |
('cert_file', ('fullchain.pem', cert_data, 'application/octet-stream')), | |
('key_file', ('privkey.pem', key_data, 'application/octet-stream')) | |
] | |
class IPMIX12Updater(IPMIUpdater): | |
def __init__(self, session, ipmi_url): | |
super().__init__(session, ipmi_url) | |
self.login_url = f'{ipmi_url}/redfish/v1/SessionService/Sessions' | |
self.cert_info_url = f'{ipmi_url}/redfish/v1/UpdateService/Oem/Supermicro/SSLCert' | |
self.upload_cert_url = f'{ipmi_url}/redfish/v1/UpdateService/Oem/Supermicro/SSLCert/Actions/SmcSSLCert.Upload' | |
self.reboot_url = f'{ipmi_url}/redfish/v1/Managers/1/Actions/Manager.Reset' | |
self.use_b64encoded_login = False | |
def _get_upload_data(self, cert_data, key_data): | |
return { | |
'cert_file' : cert_data, | |
'key_file' : key_data | |
} | |
def parse_valid_until(pem): | |
from datetime import datetime | |
from OpenSSL import crypto as c | |
with open(pem, 'rb') as fh: | |
cert = c.load_certificate(c.FILETYPE_PEM, fh.read()) | |
return datetime.strptime(cert.get_notAfter().decode('utf8'), "%Y%m%d%H%M%SZ") | |
def create_updater(args): | |
session = requests.session() | |
if not args.quiet: | |
print("Board model is " + args.model) | |
if args.model == "X10": | |
return IPMIX10Updater(session, args.ipmi_url) | |
elif args.model == "X11": | |
return IPMIX11Updater(session, args.ipmi_url) | |
elif args.model == "X9": | |
return IPMIX9Updater(session, args.ipmi_url) | |
elif args.model == "X12": | |
return IPMIX12Updater(session, args.ipmi_url) | |
else: | |
raise Exception(f"Unknown model: {args.model}") | |
def main(): | |
parser = argparse.ArgumentParser(description='Update Supermicro IPMI SSL certificate') | |
parser.add_argument('--ipmi-url', required=True, | |
help='Supermicro IPMI 2.0 URL') | |
parser.add_argument('--model', required=True, | |
help='Board model: X9, X10, X11, X12, X13') | |
parser.add_argument('--key-file', required=True, | |
help='X.509 Private key filename') | |
parser.add_argument('--cert-file', required=True, | |
help='X.509 Certificate filename') | |
parser.add_argument('--username', required=True, | |
help='IPMI username with admin access') | |
parser.add_argument('--password', required=True, | |
help='IPMI user password') | |
parser.add_argument('--no-reboot', action='store_true', | |
help='The default is to reboot the IPMI after upload for the change to take effect.') | |
parser.add_argument('--force-update', action='store_true', | |
help='Ignore the cert end date check, always replace the cert.') | |
parser.add_argument('--quiet', action='store_true', | |
help='Do not output anything if successful') | |
parser.add_argument('--debug', action='store_true', | |
help='Output additional debugging') | |
args = parser.parse_args() | |
# Confirm args | |
if not os.path.isfile(args.key_file): | |
print("--key-file '%s' doesn't exist!" % args.key_file) | |
exit(2) | |
if not os.path.isfile(args.cert_file): | |
print("--cert-file '%s' doesn't exist!" % args.cert_file) | |
exit(2) | |
if args.ipmi_url[-1] == '/': | |
args.ipmi_url = args.ipmi_url[0:-1] | |
if args.debug: | |
import http.client as http_client | |
http_client.HTTPConnection.debuglevel = 1 | |
# Enable request logging | |
logging.basicConfig() | |
logging.getLogger().setLevel(logging.DEBUG) | |
requests_log = logging.getLogger("requests.packages.urllib3") | |
requests_log.setLevel(logging.DEBUG) | |
requests_log.propagate = True | |
if args.model == "X13": | |
args.model = "X12" | |
# Start the operation | |
requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning) | |
updater = create_updater(args) | |
if args.debug: | |
debug_log = logging.getLogger("IPMIUpdater") | |
debug_log.setLevel(logging.DEBUG) | |
updater.setLogger(debug_log) | |
login_response = updater.login(args.username, args.password, args.model) | |
if not login_response: | |
print("Login failed. Cannot continue!") | |
exit(2) | |
elif args.model == 'X12': | |
try: | |
token = login_response.headers['X-Auth-Token'] | |
except: | |
print(f'ERROR: Login failed with error {login_response}') | |
exit(2) | |
else: | |
token = None | |
cert_info = updater.get_ipmi_cert_info(args.model, token) | |
if not cert_info: | |
print("Failed to extract certificate information from IPMI!") | |
exit(2) | |
current_valid_until = cert_info.get('valid_until', None) | |
if not args.quiet and cert_info['has_cert']: | |
print("There exists a certificate, which is valid until: %s" % cert_info['valid_until']) | |
new_valid_until = parse_valid_until(args.cert_file) | |
if current_valid_until == new_valid_until: | |
if not args.force_update: | |
print("New cert validity period matches existing cert, nothing to do") | |
exit(0) | |
else: | |
print("New cert validity period matches existing cert, will update regardless") | |
# Go upload! | |
if not updater.upload_cert(args.key_file, args.cert_file, args.model, token): | |
print("Failed to upload X.509 files to IPMI!") | |
exit(2) | |
if args.model != 'X12': | |
cert_valid = updater.get_ipmi_cert_valid() | |
if not cert_valid: | |
print("Uploads failed validation") | |
exit(2) | |
if not args.quiet: | |
print("Uploaded files ok.") | |
cert_info = updater.get_ipmi_cert_info(args.model, token) | |
if not cert_info: | |
print("Failed to extract certificate information from IPMI!") | |
exit(2) | |
if not args.quiet and cert_info['has_cert']: | |
print("After upload, there exists a certificate, which is valid until: %s" % cert_info['valid_until']) | |
if not args.no_reboot: | |
if not args.quiet: | |
print("Rebooting IPMI to apply changes.") | |
if not updater.reboot_ipmi(args.model, token): | |
print("Rebooting failed! Go reboot it manually?") | |
if not args.quiet: | |
print("All done!") | |
if __name__ == "__main__": | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Has anyone seen an issue where the SSL certificate seems to be installed / updated but the admin interface is still using the Supermicro certificate (I tried manually rebooting the BMC too)?