Skip to content

Instantly share code, notes, and snippets.

@akanchhaS
Last active August 13, 2024 20:59
Show Gist options
  • Save akanchhaS/d92526999314d63e3537606da0a491f3 to your computer and use it in GitHub Desktop.
Save akanchhaS/d92526999314d63e3537606da0a491f3 to your computer and use it in GitHub Desktop.
Python script generates a aggregated report of direct dependencies that need to be updated in your project to address vulnerabilities.

Snyk Dependency Fix Report

Overview

This Python script generates a comprehensive report of direct dependencies that need to be updated in your project to address vulnerabilities. The report includes:

  • Direct dependencies that require updates.
  • The maximum version to which each dependency should be updated.
  • A list of issues that will be fixed by updating, along with the affected packages.

The script leverages the Snyk API's aggregated-issues endpoint to fetch issues related to your project, then processes this data to provide actionable insights.

Requirements

  • Python 3.x
  • requests library
  • pandas library

Installation

  1. Clone this repository or download the script.

  2. Install the required Python packages using pip:

    pip install requests pandas

Usage

  1. Replace the placeholders in the script with your actual Snyk API token, organization ID, and project ID:

    API_TOKEN = "YOUR_SNYK_API_TOKEN" 
    ORG_ID = "YOUR_ORGANIZATION_ID" 
    PROJECT_ID = "SNYK_PROJECT_ID"
  2. Run the script:

    python3 snyk_dependency_fix_report.py
  3. The script will print the aggregated data to the console and save it as a CSV file named dependency_fix_report.csv.

Script Details

Functions

  • fetch_aggregated_issues(org_id, project_id):

    • Fetches aggregated issues from the Snyk API for the specified organization and project.
  • fetch_dependency_fix_info(paths_url):

    • Retrieves dependency information from the provided paths URL, extracting the first name and fix version of the dependency.
  • aggregate_issues_by_dependency(issues):

    • Aggregates issues by direct dependencies, collecting the highest fix version required and the list of issues that will be fixed by updating those dependencies.

Output

The generated CSV file, dependency_fix_report.csv, includes the following columns:

  • direct_dependency: The direct dependency that needs to be updated.
  • max_fix_version: The maximum version to which the dependency should be updated to fix the issues.
  • issues_fixed: A list of issues that will be fixed by updating the dependency, along with the affected packages.

Example Output

The output CSV might look something like this:

direct_dependency max_fix_version issues_fixed
package-name-1 1.2.3 Issue 1 Title (Packages: package-a, package-b); Issue 2 Title (Packages: package-c)
package-name-2 4.5.6 Issue 3 Title (Packages: package-d)
package-name-3 7.8.9 Issue 4 Title (Packages: package-e, package-f); Issue 5 Title (Packages: package-g); Issue 6 Title (Packages: package-h, package-i, package-j)

image

Contributing

Contributions are welcome! Feel free to submit a pull request or open an issue for any improvements or bug fixes.

Script: Snyk_aggregated_dep_fix.py

import requests
import pandas as pd
from collections import defaultdict

# Replace with your actual Snyk API token, organization ID, and project ID
API_TOKEN = "YOUR_API_TOKEN"
ORG_ID = "YOUR_SNYK_ORG_ID"
PROJECT_ID = "YOUR_SNYK_PROJECT_ID"  # Replace with your actual project ID
BASE_URL = "https://api.snyk.io/v1"

# Headers for API request
headers = {
    'Authorization': f'token {API_TOKEN}',
    'Content-Type': 'application/json',
    'Accept': 'application/json'
}

def fetch_aggregated_issues(org_id, project_id):
    """
    Fetches aggregated issues from the Snyk API for a specific organization and project.
    
    Args:
        org_id (str): The organization ID in Snyk.
        project_id (str): The project ID in Snyk.
    
    Returns:
        list: A list of issues retrieved from the Snyk API.
    """
    url = f"{BASE_URL}/org/{org_id}/project/{project_id}/aggregated-issues"
    response = requests.post(url, headers=headers)
    response.raise_for_status()
    response_json = response.json()
    return response_json.get('issues', [])

def fetch_dependency_fix_info(paths_url):
    """
    Fetches the dependency information from the given paths URL and extracts the first name and fix version.
    
    Args:
        paths_url (str): The URL to fetch the dependency path information.
    
    Returns:
        list: A list of tuples containing direct dependency names and fix versions.
    """
    response = requests.get(paths_url, headers=headers)
    response.raise_for_status()
    paths_data = response.json()

    direct_dependencies = []

    for path in paths_data.get('paths', []):
        if path:
            first_dependency = path[0]  # Get the first dependency in each path
            name = first_dependency.get('name', 'Unknown')
            fix_version = first_dependency.get('fixVersion', 'N/A')
            direct_dependencies.append((name, fix_version))

    return direct_dependencies

def aggregate_issues_by_dependency(issues):
    """
    Aggregates issues by direct dependencies and collects the highest fix version.
    
    Args:
        issues (list): A list of issues retrieved from the Snyk API.
    
    Returns:
        dict: A dictionary with direct dependencies as keys and aggregated issue data as values.
    """
    aggregation = defaultdict(lambda: {
        'issues': defaultdict(set),
        'max_fix_version': 'N/A'
    })

    for issue in issues:
        fix_info = issue.get('fixInfo', {})
        if any([fix_info.get('isUpgradable'), fix_info.get('isPinnable'), fix_info.get('isFixable'), fix_info.get('isPartiallyFixable')]):
            issue_id = issue['id']
            issue_title = issue['issueData']['title']
            pkg_name = issue['pkgName']

            links = issue.get('links', {})
            paths_url = links.get('paths', '')
            if paths_url:
                direct_dependencies = fetch_dependency_fix_info(paths_url)
                for dep_name, fix_version in direct_dependencies:
                    key = dep_name
                    aggregation[key]['issues'][issue_title].add(pkg_name)
                    if aggregation[key]['max_fix_version'] == 'N/A' or fix_version > aggregation[key]['max_fix_version']:
                        aggregation[key]['max_fix_version'] = fix_version

    return aggregation

def main():
    """
    Main function that orchestrates the fetching, aggregation, and saving of Snyk issues data.
    """
    issues = fetch_aggregated_issues(ORG_ID, PROJECT_ID)
    if not issues:
        print("No issues found or unable to fetch issues.")
        return

    aggregated_issues = aggregate_issues_by_dependency(issues)

    # Prepare data for DataFrame
    aggregated_data = []
    for dependency, data in aggregated_issues.items():
        issues_fixed = []
        for issue, packages in data['issues'].items():
            issues_fixed.append(f"{issue} (Packages: {', '.join(packages)})")
        aggregated_data.append({
            'direct_dependency': dependency,
            'max_fix_version': data['max_fix_version'],
            'issues_fixed': '; '.join(issues_fixed)
        })

    # Creating a DataFrame for better visualization
    df = pd.DataFrame(aggregated_data)
    print(df)

    # Save to a CSV file
    df.to_csv('dependency_fix_report.csv', index=False)
    print("Data saved to dependency_fix_report.csv")

if __name__ == "__main__":
    main()
    
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment