Skip to content

Instantly share code, notes, and snippets.

@gkhays
Created May 7, 2025 15:04
Show Gist options
  • Save gkhays/f13a5f5572ccedf56232fbc9228418b1 to your computer and use it in GitHub Desktop.
Save gkhays/f13a5f5572ccedf56232fbc9228418b1 to your computer and use it in GitHub Desktop.
Retrieves the CVEs for a given year
def fetch_cves_by_year(year, max_results=None, delay=6):
"""
Fetch CVEs from the NVD API for a specific year.
Args:
year (int): The year to fetch CVEs for
max_results (int, optional): Maximum number of results to return. If None, fetch all.
delay (int, optional): Delay between API requests in seconds to respect rate limits
Returns:
list: List of CVE items
"""
current_date = datetime.now()
current_year = datetime.now().year
now = current_date.isoformat()
# Create date range parameters for the specified year
start_date = f"{year}-01-01T00:00:00.000"
if year == current_year:
end_date = current_date.strftime("%Y-%m-%dT%H:%M:%S.000Z")
print(f"End date set to: {end_date}")
else:
end_date = now
# Initial parameters
params = {
"pubStartDate": start_date,
"pubEndDate": end_date,
"resultsPerPage": 2000 # Maximum allowed by the API
}
all_cves = []
total_retrieved = 0
print(f"Fetching CVEs for {year}...")
while True:
try:
# Add a user-agent header to avoid being blocked
headers = {
"User-Agent": "NVD-CVE-Fetcher/1.0"
}
response = requests.get(base_url, params=params)
print(f"URL: {response.url}")
# Check if request was successful
if response.status_code == 200:
data = response.json()
# Extract results
vulnerabilities = data.get('vulnerabilities', [])
# Get result metadata
total_results = data.get('totalResults', 0)
results_per_page = data.get('resultsPerPage', 0)
start_index = data.get('startIndex', 0)
# Add to our collection
all_cves.extend(vulnerabilities)
total_retrieved += len(vulnerabilities)
print(f"Retrieved {total_retrieved} of {total_results} CVEs...")
# Check if we have fetched all results or reached the max_results limit
if total_retrieved >= total_results or (max_results and total_retrieved >= max_results):
break
# Set up the next request with an updated startIndex
params['startIndex'] = start_index + results_per_page
# Respect rate limits (NVD API has a limit of 5 requests per 30 seconds for unauthenticated users)
time.sleep(delay)
elif response.status_code == 403:
print("Error: API rate limit exceeded. Please try again later.")
break
elif response.status_code == 404:
print("Error: API endpoint not found. Checking for diagnostic information...")
print(f"Response: {response.text}")
break
else:
print(f"Error: API returned status code {response.status_code}")
print(f"Response: {response.text}")
break
except Exception as e:
print(f"An error occurred: {e}")
break
# Apply max_results limit if specified
if max_results and len(all_cves) > max_results:
all_cves = all_cves[:max_results]
return all_cves
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment