Skip to content

Instantly share code, notes, and snippets.

@jonrau1
Created September 7, 2024 18:20
Show Gist options
  • Save jonrau1/aa183eec9d8851b1fe56f23d45882976 to your computer and use it in GitHub Desktop.
Save jonrau1/aa183eec9d8851b1fe56f23d45882976 to your computer and use it in GitHub Desktop.
Retrieve unique CVE IDs from Vulncheck KEV
from os import environ
import logging
import requests
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
VULNCHECK_TOKEN = environ.get("VULNCHECK_TOKEN")
def retrieve_vulncheck_kev() -> list[dict]:
kevDataPages = []
url = "https://api.vulncheck.com/v3/index/vulncheck-kev"
headers = {"accept": "application/json"}
cookies = {"token": VULNCHECK_TOKEN}
params = {"page": 1}
response = requests.get(url, headers=headers, cookies=cookies, params=params)
if response.status_code == 200:
totalPages = response.json()["_meta"]["total_pages"]
kevDataPages.extend(response.json()["data"])
for page in range(2, totalPages + 1):
params["page"] = page
response = requests.get(url, headers=headers, cookies=cookies, params=params)
if response.status_code == 200:
kevDataPages.extend(response.json()["data"])
else:
logger.warning(f"Failed to fetch page {page}. Status code: {response.status_code}")
continue
logger.info(f"Retrieved {len(kevDataPages)} Vulncheck KEV ")
return kevDataPages
def process_vc_kev():
kevCves = []
for x in retrieve_vulncheck_kev():
for cve in x["cve"]:
if cve not in kevCves:
kevCves.append(cve)
# do stuff ... logger.info(f"KEV CVEs: {kevCves}")
process_vc_kev()
@FJSte
Copy link

FJSte commented Sep 9, 2024

ChatGPT suggestions to add some more efficiency. Would love to know if they work

from os import environ
import logging
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

VULNCHECK_TOKEN = environ.get("VULNCHECK_TOKEN")
VULNCHECK_API_URL = "https://api.vulncheck.com/v3/index/vulncheck-kev"


def requests_retry_session(
    retries=3,
    backoff_factor=0.3,
    status_forcelist=(500, 502, 504),
    session=None,
):
    """Sets up a requests.Session with retry logic."""
    session = session or requests.Session()
    retry = Retry(
        total=retries,
        read=retries,
        connect=retries,
        backoff_factor=backoff_factor,
        status_forcelist=status_forcelist,
    )
    adapter = HTTPAdapter(max_retries=retry)
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    return session


def retrieve_vulncheck_kev(api_url: str = VULNCHECK_API_URL, token: str = VULNCHECK_TOKEN):
    """Retrieves Vulncheck KEV data page by page using a generator."""
    session = requests_retry_session()

    headers = {"accept": "application/json"}
    cookies = {"token": token}
    
    page = 1
    while True:
        params = {"page": page}
        try:
            response = session.get(api_url, headers=headers, cookies=cookies, params=params)
            response.raise_for_status()
        except requests.exceptions.RequestException as e:
            logger.warning(f"Failed to fetch page {page}. Error: {e}")
            break

        data = response.json()
        yield data["data"]  # Use generator to yield data page by page

        page += 1
        if page > data["_meta"]["total_pages"]:
            break

    logger.info(f"Completed retrieval of Vulncheck KEV data.")


def process_vc_kev():
    """Processes the Vulncheck KEV data and extracts unique CVEs efficiently."""
    kevCves = set()  # Use a set to automatically handle unique values

    for kev_data_page in retrieve_vulncheck_kev():
        for kev_data in kev_data_page:
            kevCves.update(kev_data["cve"])  # Add multiple CVEs at once to the set

    logger.info(f"Unique KEV CVEs: {len(kevCves)}")

process_vc_kev()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment