Skip to content

Instantly share code, notes, and snippets.

@anubhavg-icpl
Created September 3, 2024 05:51
Show Gist options
  • Save anubhavg-icpl/86f1f4cb9dcffeb8e6f669dceba6ec90 to your computer and use it in GitHub Desktop.
Save anubhavg-icpl/86f1f4cb9dcffeb8e6f669dceba6ec90 to your computer and use it in GitHub Desktop.
"""Tool to replace vulnerability dashboard in Wazuh.
Connects to Wazuh manager API, retrieves vulnerability data, and displays it.
Ensure server socket and credentials under Configuration are correct!
Output can be filtered by using the commandline switches:
-n , --name : Only return data for specified package
-s , --severity : Only return vulnerabilties of that severity
-c , --cve : Only return vulnerabilities with that CVE
-g , --group : Only return vulnerabilities for group members
--short : Only display summary of vulnerabilties
Use caution when supplying arguments as no sanitizing is done!
Referenced from:
https://documentation.wazuh.com/current/user-manual/api/index.html
Written in Python 3.10 using Visual Studio Code.
Stephen Vose 2022
"""
def get_args() -> tuple[str, str, bool]:
"""Take arguments from command line for filtering output."""
import argparse
parser = argparse.ArgumentParser(
usage='vuln_dash.py [-h] [-n, --name NAME] [-s, --severity SEVERITY]\n\
[-c, --cve CVE] [-g, --group GROUP] [--short]')
parser.add_argument('-n', '--name',
default="",
dest='name_arg',
help='Filter output by package name',
type=str
)
parser.add_argument('-s', '--severity',
default="",
dest='sev_arg',
help='Filter output by severity',
type=str
)
parser.add_argument('-c', '--cve',
default="",
dest='cve_arg',
help='Filter output by CVE',
type=str
)
parser.add_argument('-g', '--group',
default="",
dest='group_arg',
help='Filter agents by group',
type=str
)
parser.add_argument('--short',
action='store_true',
help='Return only summary of agent vulnerabilities',
)
args = parser.parse_args()
# Format string for appending to API request
if args.name_arg != "":
arg_name = str('&name=' + args.name_arg)
else:
arg_name = ""
if args.sev_arg != "":
arg_sev = str('&severity=' + args.sev_arg)
else:
arg_sev = ""
if args.cve_arg != "":
arg_cve = str('&cve=' + args.cve_arg)
else:
arg_cve = ""
if args.group_arg != "":
arg_group = str('&group=' + args.group_arg)
else:
arg_group = ""
arg_string = arg_name + arg_sev + arg_cve
arg_short = bool(args.short)
return arg_string, arg_group, arg_short
def api_calls(filters: str, group: str) -> tuple[list, list, dict]:
""" Set up and make API calls to server for retrieval of information."""
import json
from base64 import b64encode
try:
import requests
import urllib3
except ModuleNotFoundError:
print('Error: ensure modules "urllib3" and "requests" are installed')
exit()
# Disable insecure https warnings (for self-signed SSL certificates)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Configuration
protocol = 'https'
host = 'X.X.X.X' # Supply server address
port = 55000
user = 'XXXXX' # Supply API credentials
password = 'XXXXX'
login_endpoint = 'security/user/authenticate'
login_url = f"{protocol}://{host}:{port}/{login_endpoint}"
basic_auth = f"{user}:{password}".encode()
login_headers = {'Content-Type': 'application/json',
'Authorization': f'Basic {b64encode(basic_auth).decode()}'
}
# API call with login to get auth token
try:
response = requests.get(login_url, headers=login_headers, verify=False)
response.raise_for_status()
token = json.loads(response.content.decode())['data']['token']
# New authorization header with the JWT token we got
requests_headers = {'Content-Type': 'application/json',
'Authorization': f'Bearer {token}'}
# Retrieve list of active agents from the server for querying vulns,
# filter output by group if specified.
login_endpoint = f"agents/?status=active&select=id,name{group}"
login_url = f"{protocol}://{host}:{port}/{login_endpoint}"
response = requests.get(login_url,
headers=requests_headers, verify=False)
response.raise_for_status()
jlist = json.loads(response.content.decode())
active_agents = []
agent_names = []
for i in jlist['data']['affected_items']:
active_agents.append(i['id'])
agent_names.append(i['name'])
# Retrieve list of vulnerabilities for each active agent, filtering
# with any supplied arguments; format {&argtype=[arg]}
vulns = {}
for k in active_agents:
response = requests.get(
f"{protocol}://{host}:{port}/vulnerability/{k}?limit=10000\
&select=cve,name,severity,condition{filters}",
headers=requests_headers, verify=False
)
response.raise_for_status()
vulns[k] = json.loads(response.content.decode())
# Catch problems talking to server
except requests.exceptions.HTTPError:
print("An error occurred: %s, %s" % (
json.loads(response.content)['title'],
json.loads(response.content)['detail']))
quit()
except requests.exceptions.ConnectionError as e:
print("Connection error:", str(e))
quit()
return active_agents, agent_names, vulns
class ProcessInfo():
"""Class to extract and compile a specific type of data from bulk
vulnerability information.
"""
# Create class-specific variables.
def __init__(self, agent_list: list, vuln_dict: dict, type_filter: str):
self.type_filter = type_filter
self.item_tuple = self.process_info(agent_list, vuln_dict)
def process_info(self, agent_list, vuln_dict) -> list:
"""Process vulnerability metadata from each
agent and compile a list of unique entries, with counts.
"""
item_compendium = []
item_occurrence = []
for agent in agent_list:
for vuln in vuln_dict[agent]['data']['affected_items']:
# Append vulnerability condition to name results.
if self.type_filter == 'name':
item_compendium.append(vuln[self.type_filter] +
', ' + vuln['condition'])
else:
item_compendium.append(vuln[self.type_filter])
item_header = set(item_compendium)
for item in item_header:
item_occurrence.append(item_compendium.count(item))
item_tuple = tuple(zip(item_occurrence, item_header))
# End program early if no matches detected
if len(item_tuple) == 0:
print('No results found.')
exit()
return sorted(item_tuple, reverse=True)
def print_results(self, arg_short: bool) -> None:
"""Format and display compiled information."""
if not arg_short:
print(f'\nList of results by {self.type_filter}:\n')
for s in self.item_tuple:
print(*s, sep='\t')
else:
print(f'\nTop results by {self.type_filter}:\n')
for s in self.item_tuple[:25:]:
print(*s, sep='\t')
print('\nTotal number detected:', len(self.item_tuple))
def agent_counts(agent_list: list, names_list: list, vuln_dict: dict) -> None:
"""Shows results per agent, separated by severity."""
print('Agent\tCrit\tHigh\tMedium\tLow\tTotal\tName')
for a, n in zip(agent_list, names_list):
sev_comp = []
sev_list = []
for vuln in vuln_dict[a]['data']['affected_items']:
sev_comp.append(vuln['severity'])
sev_list.append(a)
sev_list.append(sev_comp.count('Critical'))
sev_list.append(sev_comp.count('High'))
sev_list.append(sev_comp.count('Medium'))
sev_list.append(sev_comp.count('Low'))
sev_list.append(vuln_dict[a]['data']['total_affected_items'])
sev_list.append(n)
print(*sev_list, sep='\t')
def main():
# Interpret command-line switches
arg_string, arg_group, arg_short = get_args()
# Function to retrieve and process data from server API
list_of_agents, names, dict_of_vulns = api_calls(arg_string, arg_group)
# Build classes to store each type of data separately
sev_occurrence = ProcessInfo(list_of_agents, dict_of_vulns, 'severity')
name_occurrence = ProcessInfo(list_of_agents, dict_of_vulns, 'name')
cve_occurrence = ProcessInfo(list_of_agents, dict_of_vulns, 'cve')
# Output data by type
print('\nList of agent IDs and number of vulnerabilities on each:\n')
if not arg_short:
agent_counts(list_of_agents, names, dict_of_vulns)
else: # Print reduced results for '--short'
for a, n in zip(list_of_agents, names):
num = dict_of_vulns[a]['data']['total_affected_items']
if num != 0:
print(f'{a}\t{num}\t{n}')
for result in [sev_occurrence, name_occurrence, cve_occurrence]:
result.print_results(arg_short)
if __name__ == "__main__":
main()

To use this vulnerability dashboard tool for Wazuh, follow these steps:

  1. Ensure you have Python 3.10 installed on your system.

  2. Install the required dependencies:

    • requests
    • urllib3

    You can install these using pip:

    pip install requests urllib3
    
  3. Set up the configuration:

    • Replace X.X.X.X with your Wazuh manager's IP address
    • Replace XXXXX with your API username and password in the respective fields
  4. Save the script as vuln_dash.py (or any preferred name).

  5. Run the script from the command line. You can use various options to filter and customize the output:

    • Basic usage:

      python vuln_dash.py
      
    • Filter by package name:

      python vuln_dash.py -n packagename
      
    • Filter by severity:

      python vuln_dash.py -s Critical
      
    • Filter by CVE:

      python vuln_dash.py -c CVE-2023-12345
      
    • Filter by agent group:

      python vuln_dash.py -g groupname
      
    • Display only a summary:

      python vuln_dash.py --short
      
    • Combine multiple filters:

      python vuln_dash.py -n packagename -s High -g groupname --short
      
  6. The script will connect to your Wazuh manager, retrieve vulnerability data, and display the results based on your specified filters.

Remember to use caution when supplying arguments, as the script doesn't sanitize inputs. Also, ensure you have the necessary permissions to access the Wazuh API.

The output will include:

  • A list of agent IDs and the number of vulnerabilities on each
  • Vulnerability occurrences grouped by severity
  • Vulnerability occurrences grouped by package name
  • Vulnerability occurrences grouped by CVE

This tool can be very useful for quickly assessing the vulnerability status across your Wazuh-monitored environment, especially when you need to focus on specific types of vulnerabilities or particular agent groups.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment