|
"""Tool to replace vulnerability dashboard in Wazuh. |
|
|
|
Connects to Wazuh manager API, retrieves vulnerability data, and displays it. |
|
Ensure server socket and credentials under Configuration are correct! |
|
Output can be filtered by using the commandline switches: |
|
-n , --name : Only return data for specified package |
|
-s , --severity : Only return vulnerabilties of that severity |
|
-c , --cve : Only return vulnerabilities with that CVE |
|
-g , --group : Only return vulnerabilities for group members |
|
--short : Only display summary of vulnerabilties |
|
Use caution when supplying arguments as no sanitizing is done! |
|
|
|
Referenced from: |
|
https://documentation.wazuh.com/current/user-manual/api/index.html |
|
Written in Python 3.10 using Visual Studio Code. |
|
Stephen Vose 2022 |
|
""" |
|
|
|
|
|
def get_args() -> tuple[str, str, bool]: |
|
"""Take arguments from command line for filtering output.""" |
|
import argparse |
|
parser = argparse.ArgumentParser( |
|
usage='vuln_dash.py [-h] [-n, --name NAME] [-s, --severity SEVERITY]\n\ |
|
[-c, --cve CVE] [-g, --group GROUP] [--short]') |
|
parser.add_argument('-n', '--name', |
|
default="", |
|
dest='name_arg', |
|
help='Filter output by package name', |
|
type=str |
|
) |
|
parser.add_argument('-s', '--severity', |
|
default="", |
|
dest='sev_arg', |
|
help='Filter output by severity', |
|
type=str |
|
) |
|
parser.add_argument('-c', '--cve', |
|
default="", |
|
dest='cve_arg', |
|
help='Filter output by CVE', |
|
type=str |
|
) |
|
parser.add_argument('-g', '--group', |
|
default="", |
|
dest='group_arg', |
|
help='Filter agents by group', |
|
type=str |
|
) |
|
parser.add_argument('--short', |
|
action='store_true', |
|
help='Return only summary of agent vulnerabilities', |
|
) |
|
args = parser.parse_args() |
|
# Format string for appending to API request |
|
if args.name_arg != "": |
|
arg_name = str('&name=' + args.name_arg) |
|
else: |
|
arg_name = "" |
|
if args.sev_arg != "": |
|
arg_sev = str('&severity=' + args.sev_arg) |
|
else: |
|
arg_sev = "" |
|
if args.cve_arg != "": |
|
arg_cve = str('&cve=' + args.cve_arg) |
|
else: |
|
arg_cve = "" |
|
if args.group_arg != "": |
|
arg_group = str('&group=' + args.group_arg) |
|
else: |
|
arg_group = "" |
|
arg_string = arg_name + arg_sev + arg_cve |
|
arg_short = bool(args.short) |
|
return arg_string, arg_group, arg_short |
|
|
|
|
|
def api_calls(filters: str, group: str) -> tuple[list, list, dict]: |
|
""" Set up and make API calls to server for retrieval of information.""" |
|
import json |
|
from base64 import b64encode |
|
|
|
try: |
|
import requests |
|
import urllib3 |
|
except ModuleNotFoundError: |
|
print('Error: ensure modules "urllib3" and "requests" are installed') |
|
exit() |
|
|
|
# Disable insecure https warnings (for self-signed SSL certificates) |
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) |
|
|
|
# Configuration |
|
protocol = 'https' |
|
host = 'X.X.X.X' # Supply server address |
|
port = 55000 |
|
user = 'XXXXX' # Supply API credentials |
|
password = 'XXXXX' |
|
login_endpoint = 'security/user/authenticate' |
|
|
|
login_url = f"{protocol}://{host}:{port}/{login_endpoint}" |
|
basic_auth = f"{user}:{password}".encode() |
|
login_headers = {'Content-Type': 'application/json', |
|
'Authorization': f'Basic {b64encode(basic_auth).decode()}' |
|
} |
|
|
|
# API call with login to get auth token |
|
try: |
|
response = requests.get(login_url, headers=login_headers, verify=False) |
|
response.raise_for_status() |
|
token = json.loads(response.content.decode())['data']['token'] |
|
|
|
# New authorization header with the JWT token we got |
|
requests_headers = {'Content-Type': 'application/json', |
|
'Authorization': f'Bearer {token}'} |
|
|
|
# Retrieve list of active agents from the server for querying vulns, |
|
# filter output by group if specified. |
|
login_endpoint = f"agents/?status=active&select=id,name{group}" |
|
login_url = f"{protocol}://{host}:{port}/{login_endpoint}" |
|
response = requests.get(login_url, |
|
headers=requests_headers, verify=False) |
|
response.raise_for_status() |
|
jlist = json.loads(response.content.decode()) |
|
active_agents = [] |
|
agent_names = [] |
|
for i in jlist['data']['affected_items']: |
|
active_agents.append(i['id']) |
|
agent_names.append(i['name']) |
|
|
|
# Retrieve list of vulnerabilities for each active agent, filtering |
|
# with any supplied arguments; format {&argtype=[arg]} |
|
vulns = {} |
|
for k in active_agents: |
|
response = requests.get( |
|
f"{protocol}://{host}:{port}/vulnerability/{k}?limit=10000\ |
|
&select=cve,name,severity,condition{filters}", |
|
headers=requests_headers, verify=False |
|
) |
|
response.raise_for_status() |
|
vulns[k] = json.loads(response.content.decode()) |
|
|
|
# Catch problems talking to server |
|
except requests.exceptions.HTTPError: |
|
print("An error occurred: %s, %s" % ( |
|
json.loads(response.content)['title'], |
|
json.loads(response.content)['detail'])) |
|
quit() |
|
except requests.exceptions.ConnectionError as e: |
|
print("Connection error:", str(e)) |
|
quit() |
|
|
|
return active_agents, agent_names, vulns |
|
|
|
|
|
class ProcessInfo(): |
|
"""Class to extract and compile a specific type of data from bulk |
|
vulnerability information. |
|
""" |
|
|
|
# Create class-specific variables. |
|
def __init__(self, agent_list: list, vuln_dict: dict, type_filter: str): |
|
self.type_filter = type_filter |
|
self.item_tuple = self.process_info(agent_list, vuln_dict) |
|
|
|
def process_info(self, agent_list, vuln_dict) -> list: |
|
"""Process vulnerability metadata from each |
|
agent and compile a list of unique entries, with counts. |
|
""" |
|
item_compendium = [] |
|
item_occurrence = [] |
|
for agent in agent_list: |
|
for vuln in vuln_dict[agent]['data']['affected_items']: |
|
# Append vulnerability condition to name results. |
|
if self.type_filter == 'name': |
|
item_compendium.append(vuln[self.type_filter] + |
|
', ' + vuln['condition']) |
|
else: |
|
item_compendium.append(vuln[self.type_filter]) |
|
item_header = set(item_compendium) |
|
for item in item_header: |
|
item_occurrence.append(item_compendium.count(item)) |
|
item_tuple = tuple(zip(item_occurrence, item_header)) |
|
# End program early if no matches detected |
|
if len(item_tuple) == 0: |
|
print('No results found.') |
|
exit() |
|
|
|
return sorted(item_tuple, reverse=True) |
|
|
|
|
|
def print_results(self, arg_short: bool) -> None: |
|
"""Format and display compiled information.""" |
|
if not arg_short: |
|
print(f'\nList of results by {self.type_filter}:\n') |
|
for s in self.item_tuple: |
|
print(*s, sep='\t') |
|
else: |
|
print(f'\nTop results by {self.type_filter}:\n') |
|
for s in self.item_tuple[:25:]: |
|
print(*s, sep='\t') |
|
print('\nTotal number detected:', len(self.item_tuple)) |
|
|
|
|
|
def agent_counts(agent_list: list, names_list: list, vuln_dict: dict) -> None: |
|
"""Shows results per agent, separated by severity.""" |
|
print('Agent\tCrit\tHigh\tMedium\tLow\tTotal\tName') |
|
|
|
for a, n in zip(agent_list, names_list): |
|
sev_comp = [] |
|
sev_list = [] |
|
for vuln in vuln_dict[a]['data']['affected_items']: |
|
sev_comp.append(vuln['severity']) |
|
sev_list.append(a) |
|
sev_list.append(sev_comp.count('Critical')) |
|
sev_list.append(sev_comp.count('High')) |
|
sev_list.append(sev_comp.count('Medium')) |
|
sev_list.append(sev_comp.count('Low')) |
|
sev_list.append(vuln_dict[a]['data']['total_affected_items']) |
|
sev_list.append(n) |
|
print(*sev_list, sep='\t') |
|
|
|
|
|
def main(): |
|
|
|
# Interpret command-line switches |
|
arg_string, arg_group, arg_short = get_args() |
|
|
|
# Function to retrieve and process data from server API |
|
list_of_agents, names, dict_of_vulns = api_calls(arg_string, arg_group) |
|
|
|
# Build classes to store each type of data separately |
|
sev_occurrence = ProcessInfo(list_of_agents, dict_of_vulns, 'severity') |
|
name_occurrence = ProcessInfo(list_of_agents, dict_of_vulns, 'name') |
|
cve_occurrence = ProcessInfo(list_of_agents, dict_of_vulns, 'cve') |
|
|
|
# Output data by type |
|
print('\nList of agent IDs and number of vulnerabilities on each:\n') |
|
if not arg_short: |
|
agent_counts(list_of_agents, names, dict_of_vulns) |
|
else: # Print reduced results for '--short' |
|
for a, n in zip(list_of_agents, names): |
|
num = dict_of_vulns[a]['data']['total_affected_items'] |
|
if num != 0: |
|
print(f'{a}\t{num}\t{n}') |
|
|
|
for result in [sev_occurrence, name_occurrence, cve_occurrence]: |
|
result.print_results(arg_short) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |