Created
October 11, 2019 08:40
-
-
Save 3lpsy/2c2f4cea8f68e6e032a9d9b879122ee4 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
''' | |
A simple Zap Parser to filter on certain things like port and plugin ID. More filters may be implemented later. | |
''' | |
import sys | |
from pathlib import Path | |
import lxml.etree as et | |
import csv | |
import argparse | |
ALERTITEM_XML_KEY_PROP_MAP = { | |
"pluginid": "plugin_id", | |
"alert": "alert", | |
"name": "alert_name", | |
"riskcode": "risk_code", | |
"confidence": "confidence", | |
"riskdesc": "risk_desc", | |
"desc": "description", | |
"instances": "instances", | |
"count": "count", | |
"solution": "solution", | |
"reference": "reference", | |
"cweid": "cwe_id", | |
"wascid": "wasc_id", | |
"sourceid": "source_id", | |
} | |
class Alert: | |
def __init__( | |
self, | |
plugin_id=None, | |
alert_name=None, | |
alert=None, | |
risk_code=None, | |
risk_description=None, | |
confidence=None, | |
description=None, | |
instances=None, | |
count=None, | |
reference=None, | |
solution=None, | |
cwe_id=None, | |
wasc_id=None, | |
source_id=None, | |
): | |
self.plugin_id = plugin_id | |
self.alert_name = alert_name or "" | |
self.alert = alert or "" | |
self.risk_code = risk_code or "" | |
self.risk_description = risk_description or "" | |
self.confidence = confidence or "" | |
self.instances = instances or [] | |
self.count = count or 0 | |
self.reference = reference or "" | |
self.solution = solution or "" | |
self.cwe_id = cwe_id or "" | |
self.wasc_id = wasc_id or "" | |
self.source_id = source_id or "" | |
@classmethod | |
def make(cls, item): | |
alert = cls() | |
for i in item.getchildren(): | |
if i.tag in ALERTITEM_XML_KEY_PROP_MAP.keys(): | |
setattr( | |
alert, ALERTITEM_XML_KEY_PROP_MAP[i.tag], i.text.replace("\n", "|") | |
) | |
return alert | |
@classmethod | |
def row_keys(cls): | |
return list(ALERTITEM_XML_KEY_PROP_MAP.values()) | |
def get_row_data(self, keys): | |
data = {} | |
for k in keys: | |
if hasattr(self, k): | |
data[k] = getattr(self, k) | |
return data | |
class Asset: | |
def __init__(self, name, host=None, port=None, ssl=False, alerts=None): | |
self.name = name | |
self.host = host # host-ip | |
self.port = port # host-ip | |
self.ssl = ssl # host-ip | |
self.alerts = alerts or [] | |
@classmethod | |
def row_keys(cls): | |
return ["name", "host", "port", "ssl"] + Alert.row_keys() | |
def get_row_data(self, keys): | |
data = {} | |
for k in keys: | |
if hasattr(self, k): | |
data[k] = getattr(self, k) | |
return data | |
def get_rows(self, keys): | |
# returns a list or json rows like to_csv_dicts | |
rows = [] | |
asset_data = self.get_row_data(keys) | |
for alert in self.alerts: | |
alert_data = alert.get_row_data(keys) | |
# merge data | |
row = {**asset_data, **alert_data} | |
rows.append(row) | |
return rows | |
# TODO: fix this | |
# can't do "any" for both port and alert prop specicif stuff | |
def should_collect_alert(asset, alert, options): | |
if "any" in options: | |
any_ = options["any"] | |
else: | |
any_ = False | |
plugin_id_passed = True | |
if "plugin_id" in options and options["plugin_id"]: | |
if alert.plugin_id == options["plugin_id"]: | |
if any_: | |
# any has been satisfied | |
return True | |
else: | |
plugin_id_passed = False | |
return plugin_id_passed | |
def should_collect_host(asset, options): | |
if "any" in options: | |
any_ = options["any"] | |
else: | |
any_ = False | |
if "ports" in options: | |
ports = options["ports"] | |
if isinstance(ports, list) and len(ports) > 0: | |
ports_passed = [] | |
for p in ports: | |
asset_p = asset.port | |
if asset_p and str(asset_p) == str(p): | |
if any_: | |
# any has been satisfied | |
return True | |
else: | |
if str(p) not in ports_passed: | |
ports_passed.append(str(p)) | |
if len(ports_passed) != len(ports): | |
return False | |
return True | |
def parse(tree, options): | |
assets = [] | |
for elem in tree.xpath("//site"): | |
name = elem.attrib.get("name") | |
if not name: | |
print("Error with", elem) | |
continue | |
asset = Asset( | |
name, | |
host=elem.attrib.get("host", ""), | |
port=elem.attrib.get("port", ""), | |
ssl=elem.attrib.get("ssl", "false").lower() == "true", | |
) # assumes each ReportHost appears once | |
if not should_collect_host(asset, options): | |
continue | |
for child in elem.getchildren(): | |
if child.tag == "alerts": | |
for c in child.getchildren(): | |
if c.tag == "alertitem": | |
alert = Alert.make(c) | |
if should_collect_alert(asset, alert, options): | |
asset.alerts.append(alert) | |
assets.append(asset) | |
return assets | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="Process some integers.") | |
parser.add_argument("xml", type=str, help="the zap xml to parse") | |
parser.add_argument( | |
"-p", "--port", action="append", help="filter for services that have port open" | |
) | |
parser.add_argument("--plugin-id", action="store", help="plugin id to filter on") | |
parser.add_argument( | |
"--no-csv-header", action="store_true", help="do not include csv header" | |
) | |
parser.add_argument( | |
"--prop", | |
action="append", | |
help="properties to include", | |
choices=Asset.row_keys(), | |
) | |
parser.add_argument( | |
"-a", | |
"--any-filter", | |
action="store_true", | |
help="passes any filter. otherwise all filters must apply", | |
) | |
args = parser.parse_args() | |
file = args.xml | |
# this will take a long time for large files | |
# possible to optimize this with chunking | |
if isinstance(file, str): | |
file_path = Path(file) | |
content = file_path.read_text() | |
else: | |
content = file.read().decode("utf-8") | |
file.close() | |
tree = et.fromstring(content) | |
options = { | |
"any": bool(args.any_filter), | |
"ports": args.port, | |
"plugin_id": args.plugin_id, | |
} | |
assets = parse(tree, options) | |
if len(assets) > 0: | |
keys = args.prop or Asset.row_keys() | |
writer = csv.DictWriter(sys.stdout, fieldnames=keys) | |
if not args.no_csv_header: | |
writer.writeheader() | |
rows = [row for asset in assets for row in asset.get_rows(keys)] | |
writer.writerows(rows) | |
sys.exit(0) | |
else: | |
print("No assets found!") | |
sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment