Created
September 23, 2020 04:49
-
-
Save illuzian/ae8dc676e22e04d315a54b51413917bc to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import requests | |
import re | |
import gzip | |
import json | |
from bs4 import BeautifulSoup | |
from io import BytesIO | |
from datetime import datetime | |
# NVD feed URL. | |
nvd_data_url = 'https://nvd.nist.gov/vuln/data-feeds' | |
# Header to use. Only defining user-agent string. | |
headers = {'user-agent': 'Python 3'} | |
# Get the HTML bodyh. | |
nvd_request = requests.get(nvd_data_url, headers=headers) | |
# Convert to beautiful soup object. | |
soup = BeautifulSoup(nvd_request.text, 'html.parser') | |
# Select tables with the date-testid property. | |
data_feeds = soup.select('table[data-testid]') | |
# Iterate through the tables. | |
for data_feed in data_feeds: | |
# Grab the table head. | |
thead = data_feed.thead | |
# Define a seperator foir a join because we're about to do a regex match. | |
seperator = "" | |
# Make the search string from the beautiful soup thead data. | |
search_in = seperator.join([repr(string) for string in thead.strings]) | |
# If this is the JSON table select it and break out of the for loop. | |
if re.match('.*JSON.*', search_in): | |
feed = data_feed | |
break | |
# Download the files for use. | |
gz_files = [] | |
# Theres some meta and empty rows so we splice them out. | |
for table_row in feed.find_all("tr")[2:]: | |
# We only want the URL so we're grabbing the download column. | |
table_elements = table_row.find_all("td")[-2] | |
# Try to get the URL if. | |
try: | |
# We only want the gzip files so get the type string and compare it. | |
file_format = table_elements.a.string | |
if file_format == "GZ": | |
# Try to extract the URL, download it as a bytes object and append to the gz_files array. | |
try: | |
url = table_elements.a.get('href') | |
gz_files.append(BytesIO(requests.get(url, headers=headers).content)) | |
except AttributeError: | |
pass | |
except AttributeError: | |
pass | |
# Covert the gunzip the JSON content, covert the JSON to a dict and append it to cve_json_files. | |
# Switched from dict.update to a list to ensure no overwrites of valid data. | |
cve_json_files = [] | |
for gz_file in reversed(gz_files): | |
with gzip.open(gz_file, 'rb') as f: | |
file_content = f.read() | |
json_content = json.loads(file_content) | |
cve_json_files.append(json_content) | |
# Define the dict that will hold the cve data. | |
cve_data = {} | |
references_delimiter = "," | |
for cves in cve_json_files: | |
for cve_item in cves['CVE_Items']: | |
# Check if CVE data exists in the entry. | |
if 'cve' in cve_item: | |
# Set CVE null variables to avoid ValueError. | |
last_modified = None | |
vulnerable = "NOT DEFINED" | |
confirmed = False | |
references = [] | |
# Check if vulnerable is True in the cpe_matches. | |
# Bringing you over the top nesting to avoid ValueErrors since 2020. | |
if 'configurations' in cve_item: | |
if 'nodes' in cve_item['configurations']: | |
for node in cve_item['configurations']['nodes']: | |
if 'cpe_match' in node: | |
for cpe_match in node['cpe_match']: | |
if vulnerable == "NOT DEFINED" or vulnerable == "FALSE": | |
if 'vulnerable' in cpe_match: | |
if cpe_match['vulnerable']: | |
vulnerable = "TRUE" | |
else: | |
vulnerable = "FALSE" | |
# Extract all the reference and add to the references list. | |
if 'references' in cve_item['cve']: | |
for reference in cve_item['cve']['references']['reference_data']: | |
if 'refsource' in reference: | |
if reference['refsource'] == 'CONFIRM': | |
confirmed = True | |
references.append(reference['refsource']) | |
# Convert references to a comma delimited string. | |
if references: | |
references = references_delimiter.join(references) | |
else: | |
references = "NONE" | |
# Set last modified. | |
if 'lastModifiedDate' in cve_item: | |
last_modified = datetime.strptime(cve_item['lastModifiedDate'], '%Y-%m-%dT%H:%MZ') | |
# Set CVE ID | |
cve_id = cve_item['cve']['CVE_data_meta']['ID'] | |
# Use CVSS V3 if it exists. | |
if 'baseMetricV3' in cve_item['impact']: | |
cvss_version = 3 | |
cvss_score = cve_item['impact']['baseMetricV3']['cvssV3']['baseScore'] | |
cvss_severity = cve_item['impact']['baseMetricV3']['cvssV3']['baseSeverity'] | |
# Use CVSS V2 if it exists and V3 didn't. | |
elif 'baseMetricV2' in cve_item['impact']: | |
cvss_version = 2 | |
cvss_score = cve_item['impact']['baseMetricV2']['cvssV2']['baseScore'] | |
cvss_severity = cve_item['impact']['baseMetricV2']['severity'] | |
# Null values if neither exist. | |
else: | |
cvss_version = 0 | |
cvss_score = 0 | |
cvss_severity = 'NONE' | |
# Define the CVE data for output. | |
# If cve_item value already in cve_data as a key make sure we have the correct data. | |
if cve_id in cve_data: | |
# Get values for the existing cve entry for use in comparisons. | |
existing_last_modified = cve_data[cve_id]['LAST_MODFIED'] | |
existing_cvss_version = cve_data[cve_id]['VERSION'] | |
existing_cvss_score = cve_data[cve_id]['SCORE'] | |
# Do date checks and replace values if required. | |
# If existing entry does not have a date but the new one does, replace cve entry. | |
if existing_last_modified is None and last_modified is not None: | |
cve_data[cve_id] = {'SCORE': cvss_score, 'SEVERITY': cvss_severity, 'VERSION': cvss_version, | |
'LAST_MODIFIED': last_modified, 'VULNERABLE': vulnerable, 'CONFIRMED': confirmed, | |
'REFERENCES': references, } | |
# If existing and new values are None and the existing score is 0 replace cve entry. | |
elif existing_last_modified is None and last_modified is None and existing_cvss_score == 0: | |
cve_data[cve_id] = {'SCORE': cvss_score, 'SEVERITY': cvss_severity, 'VERSION': cvss_version, | |
'LAST_MODIFIED': last_modified, 'VULNERABLE': vulnerable, 'CONFIRMED': confirmed, | |
'REFERENCES': references, } | |
# If existing last modified date is less than new last modified date, replace cve entry. | |
elif existing_last_modified < last_modified: | |
cve_data[cve_id] = {'SCORE': cvss_score, 'SEVERITY': cvss_severity, 'VERSION': cvss_version, | |
'LAST_MODIFIED': last_modified, 'VULNERABLE': vulnerable, 'CONFIRMED': confirmed, | |
'REFERENCES': references, } | |
else: | |
cve_data[cve_id] = {'SCORE': cvss_score, 'SEVERITY': cvss_severity, 'VERSION': cvss_version, | |
'LAST_MODIFIED': last_modified, 'VULNERABLE': vulnerable, 'CONFIRMED': confirmed, | |
'REFERENCES': references, } | |
# Convert combined dict to a list of individal dicts. | |
# This is easiest way I could think of to covert for use in a pandas df. | |
cve_to_df = [] | |
for cve_id_key, cve_data_values in cve_data.items(): | |
cve_data_values['CVE ID'] = cve_id_key | |
cve_to_df.append(cve_data_values) | |
# Convert the list of dicts to a dataframe. | |
df = pd.DataFrame(cve_to_df) | |
# Print the dataframe. | |
print(df) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment