Created
May 7, 2025 15:04
-
-
Save gkhays/f13a5f5572ccedf56232fbc9228418b1 to your computer and use it in GitHub Desktop.
Retrieves the CVEs for a given year
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def fetch_cves_by_year(year, max_results=None, delay=6): | |
""" | |
Fetch CVEs from the NVD API for a specific year. | |
Args: | |
year (int): The year to fetch CVEs for | |
max_results (int, optional): Maximum number of results to return. If None, fetch all. | |
delay (int, optional): Delay between API requests in seconds to respect rate limits | |
Returns: | |
list: List of CVE items | |
""" | |
current_date = datetime.now() | |
current_year = datetime.now().year | |
now = current_date.isoformat() | |
# Create date range parameters for the specified year | |
start_date = f"{year}-01-01T00:00:00.000" | |
if year == current_year: | |
end_date = current_date.strftime("%Y-%m-%dT%H:%M:%S.000Z") | |
print(f"End date set to: {end_date}") | |
else: | |
end_date = now | |
# Initial parameters | |
params = { | |
"pubStartDate": start_date, | |
"pubEndDate": end_date, | |
"resultsPerPage": 2000 # Maximum allowed by the API | |
} | |
all_cves = [] | |
total_retrieved = 0 | |
print(f"Fetching CVEs for {year}...") | |
while True: | |
try: | |
# Add a user-agent header to avoid being blocked | |
headers = { | |
"User-Agent": "NVD-CVE-Fetcher/1.0" | |
} | |
response = requests.get(base_url, params=params) | |
print(f"URL: {response.url}") | |
# Check if request was successful | |
if response.status_code == 200: | |
data = response.json() | |
# Extract results | |
vulnerabilities = data.get('vulnerabilities', []) | |
# Get result metadata | |
total_results = data.get('totalResults', 0) | |
results_per_page = data.get('resultsPerPage', 0) | |
start_index = data.get('startIndex', 0) | |
# Add to our collection | |
all_cves.extend(vulnerabilities) | |
total_retrieved += len(vulnerabilities) | |
print(f"Retrieved {total_retrieved} of {total_results} CVEs...") | |
# Check if we have fetched all results or reached the max_results limit | |
if total_retrieved >= total_results or (max_results and total_retrieved >= max_results): | |
break | |
# Set up the next request with an updated startIndex | |
params['startIndex'] = start_index + results_per_page | |
# Respect rate limits (NVD API has a limit of 5 requests per 30 seconds for unauthenticated users) | |
time.sleep(delay) | |
elif response.status_code == 403: | |
print("Error: API rate limit exceeded. Please try again later.") | |
break | |
elif response.status_code == 404: | |
print("Error: API endpoint not found. Checking for diagnostic information...") | |
print(f"Response: {response.text}") | |
break | |
else: | |
print(f"Error: API returned status code {response.status_code}") | |
print(f"Response: {response.text}") | |
break | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
break | |
# Apply max_results limit if specified | |
if max_results and len(all_cves) > max_results: | |
all_cves = all_cves[:max_results] | |
return all_cves |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment