Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save jonashaag/c43e02fa37a7af1b64ef3eb298ea79d7 to your computer and use it in GitHub Desktop.

Select an option

Save jonashaag/c43e02fa37a7af1b64ef3eb298ea79d7 to your computer and use it in GitHub Desktop.
Submit Pixi dependencies to GitHub Dependency Graph for vulnerability scanning
#!/usr/bin/env python3
"""Submit Pixi dependency data to GitHub Dependency Graph."""
import argparse
import json
import os
import re
import sys
from datetime import UTC
from datetime import datetime
from pathlib import Path
from typing import Any
from urllib.error import HTTPError
from urllib.parse import quote
from urllib.request import Request
from urllib.request import urlopen
import yaml
PYPI_NAME_RE = re.compile(r"^pkg:pypi/([^@?#]+)(?:@([^?#]+))?")
PYPI_NORMALIZE_RE = re.compile(r"[-_.]+")
def _normalize_pypi_name(name: str) -> str:
return PYPI_NORMALIZE_RE.sub("-", name).lower()
def _pypi_package_url(name: str, version: str | None = None) -> str:
quoted_name = quote(_normalize_pypi_name(name), safe="._-")
if version:
return f"pkg:pypi/{quoted_name}@{quote(version, safe='._-+!')}"
return f"pkg:pypi/{quoted_name}"
def _mapped_pypi_package_url(purl: str, version: str | None) -> str | None:
match = PYPI_NAME_RE.match(purl)
if not match:
return None
name, mapped_version = match.groups()
return _pypi_package_url(name, mapped_version or version)
def _package_key(package_url: str) -> str:
match = PYPI_NAME_RE.match(package_url)
if not match:
return package_url
name, version = match.groups()
return f"{name}@{version}" if version else name
def _with_unique_key(resolved: dict[str, dict[str, str]], key: str, package_url: str) -> str:
if key not in resolved:
return key
if resolved[key]["package_url"] == package_url:
return key
index = 2
while f"{key}#{index}" in resolved and resolved[f"{key}#{index}"]["package_url"] != package_url:
index += 1
return f"{key}#{index}"
def _load_lock(lock_path: Path) -> dict[str, Any]:
with lock_path.open() as f:
return yaml.safe_load(f)
def _conda_package_version(package: dict[str, Any]) -> str | None:
if version := package.get("version"):
return version
filename = package["conda"].rsplit("/", 1)[-1]
stem = filename.removesuffix(".conda").removesuffix(".tar.bz2")
try:
_name, version, _build = stem.rsplit("-", 2)
except ValueError:
return None
return version
def _package_indexes(lock_data: dict[str, Any]) -> tuple[dict[str, dict[str, Any]], dict[str, dict[str, Any]]]:
conda_packages = {}
pypi_packages = {}
for package in lock_data.get("packages", []):
if url := package.get("conda"):
conda_packages[url] = package
elif url := package.get("pypi"):
pypi_packages[url] = package
return conda_packages, pypi_packages
def build_resolved_dependencies(lock_data: dict[str, Any], environment: str, platform: str) -> dict[str, dict[str, str]]:
"""Build dependency-submission entries from Pixi lock data."""
env_data = lock_data["environments"][environment]
package_refs = env_data["packages"][platform]
conda_packages, pypi_packages = _package_indexes(lock_data)
resolved: dict[str, dict[str, str]] = {}
for package_ref in package_refs:
package_urls: list[str] = []
if conda_url := package_ref.get("conda"):
package = conda_packages[conda_url]
version = _conda_package_version(package)
package_urls.extend(
mapped_url for purl in package.get("purls", []) if (mapped_url := _mapped_pypi_package_url(purl, version)) is not None
)
elif pypi_url := package_ref.get("pypi"):
package = pypi_packages[pypi_url]
if name := package.get("name"):
package_urls.append(_pypi_package_url(name, package.get("version")))
for package_url in package_urls:
key = _with_unique_key(resolved, _package_key(package_url), package_url)
resolved[key] = {"package_url": package_url}
return resolved
def build_snapshot(
*,
resolved: dict[str, dict[str, str]],
environment: str,
platform: str,
repository: str,
sha: str,
ref: str,
workflow: str,
job: str,
run_id: str,
) -> dict[str, Any]:
"""Build a GitHub Dependency Submission API snapshot."""
manifest_name = f"pixi.lock:{environment}:{platform}:pypi-mapped"
repository_url = f"https://github.com/{repository}"
return {
"version": 0,
"sha": sha,
"ref": ref,
"job": {
"correlator": f"{workflow} {job} {environment} {platform}",
"id": run_id,
},
"detector": {
"name": "pixi-lock-pypi-mapping",
"version": "1.0.0",
"url": repository_url,
},
"scanned": datetime.now(tz=UTC).isoformat(timespec="seconds").replace("+00:00", "Z"),
"manifests": {
manifest_name: {
"name": manifest_name,
"file": {
"source_location": "pixi.lock",
},
"resolved": resolved,
},
},
}
def submit_snapshot(snapshot: dict[str, Any], *, repository: str, token: str, api_url: str) -> dict[str, Any]:
"""Submit a dependency snapshot to GitHub."""
request = Request(
f"{api_url.rstrip('/')}/repos/{repository}/dependency-graph/snapshots",
data=json.dumps(snapshot).encode(),
headers={
"Accept": "application/vnd.github+json",
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"User-Agent": "pixi-dependency-submission",
"X-GitHub-Api-Version": "2022-11-28",
},
method="POST",
)
try:
with urlopen(request, timeout=30) as response:
response_data = response.read().decode()
except HTTPError as exc:
error_body = exc.read().decode(errors="replace")
raise RuntimeError(f"GitHub dependency submission failed with HTTP {exc.code}: {error_body}") from exc
return json.loads(response_data)
def parse_args() -> argparse.Namespace:
"""Parse command line arguments."""
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--lock-file", type=Path, default=Path("pixi.lock"))
parser.add_argument("--environment", default="default")
parser.add_argument("--platform", default="linux-64")
parser.add_argument("--dry-run", action="store_true")
return parser.parse_args()
def main() -> int:
"""Run the dependency submission."""
args = parse_args()
lock_data = _load_lock(args.lock_file)
resolved = build_resolved_dependencies(lock_data, args.environment, args.platform)
repository = os.environ.get("GITHUB_REPOSITORY", "example/example")
snapshot = build_snapshot(
resolved=resolved,
environment=args.environment,
platform=args.platform,
repository=repository,
sha=os.environ.get("GITHUB_SHA", "0000000000000000000000000000000000000000"),
ref=os.environ.get("GITHUB_REF", "refs/heads/main"),
workflow=os.environ.get("GITHUB_WORKFLOW", "dependency-submission"),
job=os.environ.get("GITHUB_JOB", "pixi-dependency-submission"),
run_id=os.environ.get("GITHUB_RUN_ID", "0"),
)
print(f"Prepared {len(resolved)} mapped PyPI dependencies from {args.lock_file} ({args.environment}/{args.platform}).")
if args.dry_run:
print(json.dumps(snapshot, indent=2, sort_keys=True))
return 0
token = os.environ.get("GITHUB_TOKEN")
if not token:
print("GITHUB_TOKEN is required unless --dry-run is used.", file=sys.stderr)
return 2
result = submit_snapshot(
snapshot,
repository=repository,
token=token,
api_url=os.environ.get("GITHUB_API_URL", "https://api.github.com"),
)
print(result.get("message", "Dependency snapshot submitted."))
return 0
if __name__ == "__main__":
raise SystemExit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment