Skip to content

Instantly share code, notes, and snippets.

@coldfusion39
Last active February 15, 2023 12:52
Show Gist options
  • Save coldfusion39/d4044debddde14133107 to your computer and use it in GitHub Desktop.
Save coldfusion39/d4044debddde14133107 to your computer and use it in GitHub Desktop.
Import Nessus results into a Postgresql database
#!/usr/bin/env python
# Copyright (c) 2017, Brandan Geise [coldfusion]
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import argparse
import re
import xml.etree.ElementTree as ET
from progress.bar import Bar
from sqlalchemy import *
from sqlalchemy.orm import *
def main():
parser = argparse.ArgumentParser(description='Import Nessus results into a Postgresql database')
parser.add_argument('-f', '--file', help='Nessus report file in ".nessus" format', nargs='+', required=True)
parser.add_argument('-u', '--username', help='Postgresql username', required=True)
parser.add_argument('-p', '--password', help='Postgresql password', nargs='+', required=True)
parser.add_argument('-d', '--database', help='Postgresql database name', required=True)
parser.add_argument('-t', '--table', help='Postgresql database IP address', required=True)
parser.add_argument('-i', '--ip', help='Postgresql database IP address', default='localhost', required=False)
args = parser.parse_args()
report_file = ''.join(args.file)
if (report_file.split('.')[1]).lower() != 'nessus':
print 'You must provide a Nessus report file in ".nessus" format!'
else:
print "Parsing {0}...".format(report_file)
host_reports = parse_vulns(report_file)
print "Inserting {0} findings into the {1} database...".format(len(host_reports), args.database)
engine = create_engine("postgresql://{0}:{1}@{2}/{3}".format(args.username, ''.join(args.password), args.ip, args.database))
metadata = MetaData(engine)
Session = sessionmaker(bind=engine)
session = Session()
bar = Bar(suffix='%(percent)d%%')
for host_report in host_reports:
status = insert_vulns(args.table, metadata, host_report)
if status == 1:
bar.next()
else:
session.rollback()
print "Error: {0}".format(status)
break
session.close()
bar.finish()
print "Successfully imported data from {0}".format(report_file)
def parse_vulns(infile):
header = ['ip', 'host', 'os', 'port', 'vulnerability', 'severity', 'description', 'solution', 'cve', 'cvss']
host_reports = []
xml = ET.parse(infile)
root_element = xml.getroot()
hosts = root_element.findall('./Report/ReportHost')
for host in hosts:
host_properties = get_host_properties(host)
report_items = host.findall('./ReportItem')
for report_item in report_items:
report_item_dict = {}
report_item_properties = get_report_item_properties(report_item)
report_item_details = get_report_item_details(report_item)
transform_if_available(host_properties, 'host-ip', report_item_dict, header[0])
transform_if_available(host_properties, 'netbios-name', report_item_dict, header[1])
transform_if_available(host_properties, 'operating-system', report_item_dict, header[2])
transform_if_available(report_item_properties, 'port', report_item_dict, header[3])
transform_if_available(report_item_properties, 'pluginName', report_item_dict, header[4])
transform_if_available(report_item_properties, 'severity', report_item_dict, header[5])
transform_if_available(report_item_details, 'description', report_item_dict, header[6])
transform_if_available(report_item_details, 'solution', report_item_dict, header[7])
transform_if_available(report_item_details, 'cve', report_item_dict, header[8])
transform_if_available(report_item_details, 'cvss_base_score', report_item_dict, header[9])
host_reports.append(report_item_dict)
return host_reports
def get_host_properties(host):
properties = {}
host_properties = host.findall('./HostProperties')[0]
temp_ip = host_properties.findall("./tag[@name='host-ip']")
if len(temp_ip) > 0 and temp_ip is not None:
properties['host-ip'] = temp_ip[0].text
else:
properties['host-ip'] = host.attrib['name']
hostnames = host_properties.findall("./tag[@name='netbios-name']")
if len(hostnames) >= 1:
properties['netbios-name'] = hostnames[0].text
else:
hostnames = host_properties.findall("./tag[@name='host-fqdn']")
if len(hostnames) >= 1:
properties['netbios-name'] = hostnames[0].text
os = host_properties.findall("./tag[@name='operating-system']")
if len(os) >= 1:
properties['operating-system'] = os[0].text
else:
os = host_properties.findall("./tag[@name='os']")
if len(os) >= 1:
properties['operating-system'] = os[0].text
return properties
def get_report_item_properties(reportItem):
properties = reportItem.attrib
if properties.has_key('pluginFamily'):
del properties['pluginFamily']
return properties
def get_report_item_details(reportItem):
details = {}
details['description'] = reportItem.findall('./description')[0].text
plugin_elements = reportItem.findall('./plugin_output')
if len(plugin_elements) >= 1:
details['plugin_output'] = plugin_elements[0].text
solution_elements = reportItem.findall('./solution')
if len(solution_elements) >= 1:
details['solution'] = solution_elements[0].text
see_also_elements = reportItem.findall('./see_also')
if len(see_also_elements) >= 1:
details['see_also'] = see_also_elements[0].text
cve_elements = reportItem.findall('./cve')
if len(cve_elements) >= 1:
details['cve'] = cve_elements[0].text
cvss_elements = reportItem.findall('./cvss_base_score')
if len(cvss_elements) >= 1:
details['cvss_base_score'] = cvss_elements[0].text
return details
def transform_if_available(inputDict, inputKey, outputDict, outputKey):
if inputKey == 'cve' and not inputDict.has_key(inputKey):
outputDict[outputKey] = 'NONE'
elif inputKey == 'cvss_base_score' and not inputDict.has_key(inputKey):
outputDict[outputKey] = '0.0'
elif inputKey == 'severity':
severity_list = {'0':'Informational',
'1':'Low',
'2':'Medium',
'3':'High',
'4':'Extreme'
}
outputDict[outputKey] = severity_list[(inputDict[inputKey])]
else:
if inputDict.has_key(inputKey):
inputDict[inputKey] = re.sub("\s\s+", " ", (inputDict[inputKey]).replace('\n', ''))
if len(inputDict[inputKey]) > 32000:
inputDict[inputKey] = "{0} [Text Cut Due To Length]".format(inputDict[inputKey][:32000])
outputDict[outputKey] = inputDict[inputKey]
def insert_vulns(table, metadata, data):
vulns = Table(table, metadata, autoload=True)
try:
i = vulns.insert()
i.execute(ip=data['ip'],
host=data['host'],
os=data['os'],
port=data['port'],
vulnerability=data['vulnerability'],
severity=data['severity'],
description=data['description'],
solution=data['solution'],
cve=data['cve'],
cvss=data['cvss']
)
except Exception as error:
return error
return 1
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment