Last active
October 18, 2022 15:33
-
-
Save tessus/cdbd2367f02211be9224b756b1ecddae to your computer and use it in GitHub Desktop.
opnsense_check: check via API call, if OPNsense update/upgrade is available
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
################################################################### | |
# This script makes an API connection to OPNsense # | |
# and checks if there are any pending updates # | |
# if there are, it sends an email with details # | |
# # | |
# Authors: Bart J. Smit, 'ObecalpEffect' and Franco Fichtner # | |
# Helmut K. C. Tessarek # | |
# # | |
# Version 1.2 07/05/2016 # | |
# Version 1.3 2022-05-25 # | |
# Version 1.4 2022-08-11 # | |
# Version 1.5 2022-10-18 # | |
################################################################### | |
# import libraries | |
import sys, os, string, argparse, json, configparser, re, subprocess, signal, pprint, pytz, datetime | |
from dateutil.parser import parse | |
import json | |
import requests | |
import smtplib | |
import time | |
from requests.packages.urllib3.exceptions import InsecureRequestWarning | |
requests.packages.urllib3.disable_warnings(InsecureRequestWarning) | |
def cleanup(signal, frame): | |
frame.f_locals['fp'].terminate() | |
frame.f_locals['client'].close() | |
sys.exit(0) | |
signal.signal(signal.SIGINT, cleanup) | |
signal.signal(signal.SIGTERM, cleanup) | |
def pmsg(msg): | |
print("{0}: {1}".format(os.path.basename(__file__), msg)) | |
def cfgerr(): | |
pmsg("config file not found") | |
sys.exit(1) | |
def mydate(date): | |
if ctx.timezone in pytz.all_timezones: | |
return pytz.UTC.localize(date).astimezone(pytz.timezone(ctx.timezone)).strftime("%Y-%m-%d %H:%M:%S %z") | |
else: | |
return pytz.UTC.localize(date).strftime("%Y-%m-%d %H:%M:%S %z") | |
def opnsense_check(ctx): | |
config = configparser.ConfigParser(interpolation=configparser.ExtendedInterpolation()) | |
dotrc = os.path.expanduser('~/.config/opnsense_check.rc') | |
# config file precedence: argument, ./opnsense_check.ini, ~/.config/opnsense_check.rc | |
if ctx.configfile: | |
if os.path.isfile(ctx.configfile): | |
config.read(ctx.configfile) | |
else: | |
cfgerr() | |
elif (os.path.isfile('./opnsense_check.ini')): | |
config.read('./opnsense_check.ini') | |
elif (os.path.isfile(dotrc)): | |
config.read(dotrc) | |
else: | |
cfgerr() | |
api = config['API'] | |
opnsense = config['OPNsense'] | |
email = config['Email'] | |
api_key = '' | |
api_secret = '' | |
host = '' | |
if ctx.api_key: | |
api_key = ctx.api_key | |
elif api.get('key') != None: | |
api_key = api.get('key') | |
else: | |
pmsg("API key not set") | |
sys.exit(2) | |
if ctx.api_secret: | |
api_secret = ctx.api_secret | |
elif api.get('secret') != None: | |
api_secret = api.get('secret') | |
else: | |
pmsg("API secret not set") | |
sys.exit(2) | |
if ctx.host: | |
host = ctx.host | |
else: | |
host = opnsense.get('host','localhost') | |
if ctx.d: | |
print("[Debug] API key: {0}".format(api_key)) | |
print("[Debug] API secret: {0}".format(api_secret)) | |
print("[Debug] OPNsense host: {0}".format(host)) | |
print("[Debug] TLS verify: {0}".format(ctx.tls_verify)) | |
#if opnsense.get('nonexitent') == None: | |
# print("var not in ini file") | |
#sys.exit(0) | |
url = 'https://' + host + '/api/core/firmware/status' | |
sender = email.get('sender','notifications@opnsense') | |
recipient = email.get('recipient','root') | |
message = 'From: ' + email.get('sender_name','OPNsense Firewall') + '<' + sender + '>\r\n' | |
message += 'To: ' + email.get('recipient_name','') + '<' + recipient + '>\r\n' | |
message += 'MIME-Version: 1.0\r\n' | |
message += 'Content-type: text/html\r\n' | |
#message += formatdate(localtime=True) + '\r\n' | |
#message += time.strftime('%Y-%m-%d %H:%M:%S %z', time.localtime(time.time())) + '\r\n' | |
if ctx.d: | |
print("[Debug] API URL: {0}".format(url)) | |
print("[Debug] Email sender: {0}".format(sender)) | |
print("[Debug] Email recipient: {0}".format(recipient)) | |
print("[Debug] Send email: {0}".format(ctx.send_email)) | |
#sys.exit(0) | |
# request data, POST (required for synchronous check) | |
r = requests.post(url,verify=ctx.tls_verify,auth=(api_key, api_secret)) | |
if r.status_code == 200: | |
response = json.loads(r.text) | |
if ctx.d: | |
print(json.dumps(response)) | |
#sys.exit(0) | |
update_or_upgrade = False | |
if response['status'] == 'update': | |
update_or_upgrade = True | |
message += 'Subject: Updates for OPNsense\r\n' | |
message += '<h2>Firewall Updates Available</h2>' | |
message += '<br>' + response['status_msg'] + '<br>\r\n' | |
nps = response['new_packages'] | |
if len(nps) > 0: | |
message += '\r\n<br><b>New:</b><br>\r\n' | |
if type(nps) == dict: | |
for n in nps: | |
message += nps[n]['name'] + ' version ' + nps[n]['version'] + '<br>\r\n' | |
else: | |
for n in nps: | |
message += n['name'] + ' version ' + n['version'] + '<br>\r\n' | |
ups = response['upgrade_packages'] | |
if len(ups) > 0: | |
message += '\r\n<br><b>Upgrade:</b><br>\r\n' | |
if type(ups) == dict: | |
for u in ups: | |
message += ups[u]['name'] + ' from ' + ups[u]['current_version'] + ' to ' + ups[u]['new_version'] + '<br>\r\n' | |
else: | |
for u in ups: | |
message += u['name'] + ' from ' + u['current_version'] + ' to ' + u['new_version'] + '<br>\r\n' | |
rps = response['reinstall_packages'] | |
if len(rps) > 0: | |
message += '\r\n<br><b>Reinstall:</b><br>\r\n' | |
if type(rps) == dict: | |
for r in rps: | |
message += rps[r]['name'] + ' version ' + rps[r]['version'] + '<br>\r\n' | |
else: | |
for r in rps: | |
message += r['name'] + ' version ' + r['version'] + '<br>\r\n' | |
message += '<br>Click <a href=\"https://' + host + '/ui/core/firmware#checkupdate\">here</a> to fetch them.<br>\r\n' | |
if response['needs_reboot'] == '1': | |
message += '<h3>This update requires a reboot.</h3>' | |
if response['status'] == 'upgrade': | |
update_or_upgrade = True | |
from_to = response['product_version'] + ' -> ' + response['upgrade_major_version'] | |
message += 'Subject: Upgrade for OPNsense ' + from_to + '\r\n' | |
message += '<h2>Firewall Upgrade Available: ' + from_to + '</h2>' | |
message += '<br>' + response['upgrade_major_message'] + '<br>\r\n' | |
message += '<br>Click <a href=\"https://' + host + '/ui/core/firmware#checkupdate\">here</a> to fetch them.<br>\r\n' | |
if response['upgrade_needs_reboot'] == '1': | |
message += '<h3>This upgrade requires a reboot.</h3>' | |
if ctx.d: | |
print(message) | |
if ctx.send_email and update_or_upgrade: | |
s = smtplib.SMTP('localhost') | |
s.sendmail(sender, recipient, message) | |
else: | |
print('Connection / Authentication issue, response received:') | |
print(r.text) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description='checks for OPNsense updates/upgrades') | |
# TODO: add email arguments | |
#email = parser.add_argument_group('Email', 'Options that can be set for sending email') | |
api = parser.add_argument_group('API', 'Options that can be set for API') | |
parser.add_argument('-c', '--config', action="store", dest='configfile', help='read config from CONFIGFILE') | |
api.add_argument('--api-key', action="store", dest='api_key', help='API key for OPNsense') | |
api.add_argument('--api-secret', action="store", dest='api_secret', help='API secret for OPNsense') | |
parser.add_argument('-n', '--no-email', action="store_false", dest='send_email', help='do not send an email' , default=True) | |
parser.add_argument('--host', action="store", dest='host', help='OPNsense host') | |
parser.add_argument('-v', '--verbose', action="store_true", dest='v', help='display info output', default=False) | |
parser.add_argument('--tls-no-verify', action="store_false", dest='tls_verify', help='do not verify the TLS certificate', default=True) | |
parser.add_argument('-d', '--debug', action="store_true", dest='d', help='display debug output', default=False) | |
parser.add_argument('-V', '--version', action="version", version='%(prog)s 1.5') | |
ctx = parser.parse_args() | |
opnsense_check(ctx) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
opnsense_check.ini.template