Skip to content

Instantly share code, notes, and snippets.

@3lpsy
Created October 11, 2019 08:40
Show Gist options
  • Save 3lpsy/2c2f4cea8f68e6e032a9d9b879122ee4 to your computer and use it in GitHub Desktop.
Save 3lpsy/2c2f4cea8f68e6e032a9d9b879122ee4 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
'''
A simple Zap Parser to filter on certain things like port and plugin ID. More filters may be implemented later.
'''
import sys
from pathlib import Path
import lxml.etree as et
import csv
import argparse
ALERTITEM_XML_KEY_PROP_MAP = {
"pluginid": "plugin_id",
"alert": "alert",
"name": "alert_name",
"riskcode": "risk_code",
"confidence": "confidence",
"riskdesc": "risk_desc",
"desc": "description",
"instances": "instances",
"count": "count",
"solution": "solution",
"reference": "reference",
"cweid": "cwe_id",
"wascid": "wasc_id",
"sourceid": "source_id",
}
class Alert:
def __init__(
self,
plugin_id=None,
alert_name=None,
alert=None,
risk_code=None,
risk_description=None,
confidence=None,
description=None,
instances=None,
count=None,
reference=None,
solution=None,
cwe_id=None,
wasc_id=None,
source_id=None,
):
self.plugin_id = plugin_id
self.alert_name = alert_name or ""
self.alert = alert or ""
self.risk_code = risk_code or ""
self.risk_description = risk_description or ""
self.confidence = confidence or ""
self.instances = instances or []
self.count = count or 0
self.reference = reference or ""
self.solution = solution or ""
self.cwe_id = cwe_id or ""
self.wasc_id = wasc_id or ""
self.source_id = source_id or ""
@classmethod
def make(cls, item):
alert = cls()
for i in item.getchildren():
if i.tag in ALERTITEM_XML_KEY_PROP_MAP.keys():
setattr(
alert, ALERTITEM_XML_KEY_PROP_MAP[i.tag], i.text.replace("\n", "|")
)
return alert
@classmethod
def row_keys(cls):
return list(ALERTITEM_XML_KEY_PROP_MAP.values())
def get_row_data(self, keys):
data = {}
for k in keys:
if hasattr(self, k):
data[k] = getattr(self, k)
return data
class Asset:
def __init__(self, name, host=None, port=None, ssl=False, alerts=None):
self.name = name
self.host = host # host-ip
self.port = port # host-ip
self.ssl = ssl # host-ip
self.alerts = alerts or []
@classmethod
def row_keys(cls):
return ["name", "host", "port", "ssl"] + Alert.row_keys()
def get_row_data(self, keys):
data = {}
for k in keys:
if hasattr(self, k):
data[k] = getattr(self, k)
return data
def get_rows(self, keys):
# returns a list or json rows like to_csv_dicts
rows = []
asset_data = self.get_row_data(keys)
for alert in self.alerts:
alert_data = alert.get_row_data(keys)
# merge data
row = {**asset_data, **alert_data}
rows.append(row)
return rows
# TODO: fix this
# can't do "any" for both port and alert prop specicif stuff
def should_collect_alert(asset, alert, options):
if "any" in options:
any_ = options["any"]
else:
any_ = False
plugin_id_passed = True
if "plugin_id" in options and options["plugin_id"]:
if alert.plugin_id == options["plugin_id"]:
if any_:
# any has been satisfied
return True
else:
plugin_id_passed = False
return plugin_id_passed
def should_collect_host(asset, options):
if "any" in options:
any_ = options["any"]
else:
any_ = False
if "ports" in options:
ports = options["ports"]
if isinstance(ports, list) and len(ports) > 0:
ports_passed = []
for p in ports:
asset_p = asset.port
if asset_p and str(asset_p) == str(p):
if any_:
# any has been satisfied
return True
else:
if str(p) not in ports_passed:
ports_passed.append(str(p))
if len(ports_passed) != len(ports):
return False
return True
def parse(tree, options):
assets = []
for elem in tree.xpath("//site"):
name = elem.attrib.get("name")
if not name:
print("Error with", elem)
continue
asset = Asset(
name,
host=elem.attrib.get("host", ""),
port=elem.attrib.get("port", ""),
ssl=elem.attrib.get("ssl", "false").lower() == "true",
) # assumes each ReportHost appears once
if not should_collect_host(asset, options):
continue
for child in elem.getchildren():
if child.tag == "alerts":
for c in child.getchildren():
if c.tag == "alertitem":
alert = Alert.make(c)
if should_collect_alert(asset, alert, options):
asset.alerts.append(alert)
assets.append(asset)
return assets
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Process some integers.")
parser.add_argument("xml", type=str, help="the zap xml to parse")
parser.add_argument(
"-p", "--port", action="append", help="filter for services that have port open"
)
parser.add_argument("--plugin-id", action="store", help="plugin id to filter on")
parser.add_argument(
"--no-csv-header", action="store_true", help="do not include csv header"
)
parser.add_argument(
"--prop",
action="append",
help="properties to include",
choices=Asset.row_keys(),
)
parser.add_argument(
"-a",
"--any-filter",
action="store_true",
help="passes any filter. otherwise all filters must apply",
)
args = parser.parse_args()
file = args.xml
# this will take a long time for large files
# possible to optimize this with chunking
if isinstance(file, str):
file_path = Path(file)
content = file_path.read_text()
else:
content = file.read().decode("utf-8")
file.close()
tree = et.fromstring(content)
options = {
"any": bool(args.any_filter),
"ports": args.port,
"plugin_id": args.plugin_id,
}
assets = parse(tree, options)
if len(assets) > 0:
keys = args.prop or Asset.row_keys()
writer = csv.DictWriter(sys.stdout, fieldnames=keys)
if not args.no_csv_header:
writer.writeheader()
rows = [row for asset in assets for row in asset.get_rows(keys)]
writer.writerows(rows)
sys.exit(0)
else:
print("No assets found!")
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment