Last active
November 26, 2025 12:38
-
-
Save kshcherban/80c7cac1cfd24dc39d716c97d12cc3bb to your computer and use it in GitHub Desktop.
script to find shai-hulud 2 in github org using search
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # ABOUTME: Scans GitHub orgs or local directories for npm/yarn/pnpm lockfiles with compromised packages. | |
| # ABOUTME: Matches dependencies against Shai-Hulud 2.0 IOC list from Wiz security research. | |
| """ | |
| Scan a GitHub organization or local directory for lockfiles (npm/yarn/pnpm) | |
| that reference packages listed in the Shai-Hulud 2.0 IOC CSV published by Wiz. | |
| Usage: | |
| python shai-hulud-finder.py --org my-org [--token $GITHUB_TOKEN] | |
| python shai-hulud-finder.py --dir /path/to/project | |
| The script finds lockfiles, parses dependencies, and reports any matches with | |
| the IOC list. | |
| https://www.wiz.io/blog/shai-hulud-2-0-ongoing-supply-chain-attack | |
| """ | |
| import argparse | |
| import base64 | |
| import csv | |
| import json | |
| import os | |
| import sys | |
| import time | |
| import urllib.parse | |
| import urllib.request | |
| from collections import defaultdict | |
| CSV_URL = "https://github.com/wiz-sec-public/wiz-research-iocs/raw/refs/heads/main/reports/shai-hulud-2-packages.csv" | |
| MAX_RETRIES = 5 | |
| RETRY_BASE_DELAY = 2.0 | |
| RETRYABLE_STATUS_CODES = {403, 429, 500, 502, 503, 504} | |
| LOCK_FILENAMES = [ | |
| "package-lock.json", | |
| "npm-shrinkwrap.json", | |
| "yarn.lock", | |
| "pnpm-lock.yaml", | |
| "pnpm-lock.yml", | |
| ] | |
| GITHUB_API = "https://api.github.com" | |
| def fetch_url(url: str, token: str | None = None, accept: str | None = None) -> bytes: | |
| req = urllib.request.Request(url) | |
| if token: | |
| req.add_header("Authorization", f"Bearer {token}") | |
| if accept: | |
| req.add_header("Accept", accept) | |
| from urllib.error import HTTPError | |
| for attempt in range(MAX_RETRIES): | |
| try: | |
| with urllib.request.urlopen(req) as resp: | |
| return resp.read() | |
| except HTTPError as e: | |
| if e.code not in RETRYABLE_STATUS_CODES: | |
| raise | |
| if attempt == MAX_RETRIES - 1: | |
| raise | |
| delay = RETRY_BASE_DELAY * (2**attempt) | |
| print( | |
| f"Rate limited ({e.code}), retrying in {delay:.1f}s " | |
| f"(attempt {attempt + 1}/{MAX_RETRIES})", | |
| file=sys.stderr, | |
| ) | |
| time.sleep(delay) | |
| def load_iocs(source: str, token: str | None = None) -> dict[str, set[str]]: | |
| if source.startswith("http://") or source.startswith("https://"): | |
| raw = fetch_url(source, token) | |
| text = raw.decode("utf-8", errors="replace") | |
| else: | |
| text = open(source, "r", encoding="utf-8").read() | |
| reader = csv.DictReader(text.splitlines()) | |
| iocs: dict[str, set[str]] = defaultdict(set) | |
| for row in reader: | |
| pkg = row.get("Package", "").strip().lower() | |
| versions = row.get("Version", "") | |
| if not pkg or not versions: | |
| continue | |
| for clause in versions.split("||"): | |
| clause = clause.strip() | |
| if clause.startswith("="): | |
| clause = clause[1:].strip() | |
| iocs[pkg].add(clause) | |
| return iocs | |
| def github_search_code(org: str, filename: str, token: str | None, limit: int | None): | |
| page = 1 | |
| per_page = 100 | |
| seen = 0 | |
| while True: | |
| q = urllib.parse.quote(f"org:{org} filename:{filename}") | |
| url = f"{GITHUB_API}/search/code?q={q}&per_page={per_page}&page={page}" | |
| body = fetch_url(url, token, accept="application/vnd.github.v3+json") | |
| data = json.loads(body) | |
| items = data.get("items", []) | |
| for item in items: | |
| yield item | |
| seen += 1 | |
| if limit and seen >= limit: | |
| return | |
| if len(items) < per_page: | |
| break | |
| page += 1 | |
| time.sleep(0.2) | |
| def fetch_file_content(item: dict, token: str | None) -> str | None: | |
| # item.url points to contents API; returns base64 encoded content | |
| url = item.get("url") | |
| if not url: | |
| return None | |
| try: | |
| body = fetch_url(url, token, accept="application/vnd.github.v3+json") | |
| except Exception: | |
| return None | |
| data = json.loads(body) | |
| content = data.get("content") | |
| encoding = data.get("encoding") | |
| if not content or encoding != "base64": | |
| return None | |
| try: | |
| raw = base64.b64decode(content) | |
| return raw.decode("utf-8", errors="replace") | |
| except Exception: | |
| return None | |
| def iter_npm_dependencies(obj): | |
| deps = obj.get("dependencies", {}) | |
| for name, meta in deps.items(): | |
| version = meta.get("version") | |
| if version: | |
| yield name, clean_version(version) | |
| child = meta.get("dependencies") | |
| if child: | |
| yield from iter_npm_dependencies({"dependencies": child}) | |
| def parse_package_lock(text: str): | |
| out = [] | |
| try: | |
| data = json.loads(text) | |
| except Exception: | |
| return out | |
| # npm v2 lockfiles keep data in "packages" | |
| packages = data.get("packages") | |
| if isinstance(packages, dict): | |
| for path, meta in packages.items(): | |
| if not isinstance(meta, dict): | |
| continue | |
| if not path: | |
| continue | |
| # path like "node_modules/lodash" or "node_modules/@scope/name" | |
| if path.startswith("node_modules/"): | |
| name = path.rsplit("node_modules/", 1)[1] | |
| else: | |
| name = path | |
| version = meta.get("version") | |
| if version: | |
| out.append((name, clean_version(version))) | |
| # npm v1 style dependencies tree | |
| out.extend(iter_npm_dependencies(data)) | |
| return out | |
| def parse_yarn_lock(text: str): | |
| out = [] | |
| current_pkgs: list[str] = [] | |
| for line in text.splitlines(): | |
| if not line: | |
| current_pkgs = [] | |
| continue | |
| # Package entry line (not indented, ends with colon) | |
| if not line.startswith(" ") and line.endswith(":"): | |
| key = line[:-1].strip() | |
| specs = [s.strip().strip("\"'") for s in key.split(",")] | |
| pkgs = [] | |
| for spec in specs: | |
| if not spec: | |
| continue | |
| # Handle npm: and workspace: protocols (yarn berry) | |
| if "@npm:" in spec or "@workspace:" in spec: | |
| name = ( | |
| spec.split("@npm:")[0] | |
| if "@npm:" in spec | |
| else spec.split("@workspace:")[0] | |
| ) | |
| name = name.strip("\"'") | |
| else: | |
| name, _, _ = spec.rpartition("@") | |
| # Skip entries without a valid package name | |
| if name: | |
| pkgs.append(name) | |
| current_pkgs = pkgs | |
| continue | |
| # Version line (indented) | |
| stripped = line.lstrip() | |
| if current_pkgs and stripped.startswith("version"): | |
| # Handle both yarn classic (version "X.Y.Z") and berry (version: X.Y.Z) | |
| if ":" in stripped: | |
| # Yarn berry format: version: X.Y.Z | |
| ver = stripped.split(":", 1)[1].strip().strip("\"'") | |
| else: | |
| # Yarn classic format: version "X.Y.Z" | |
| _, _, ver_part = stripped.partition(" ") | |
| ver = ver_part.strip().strip("\"'") | |
| ver = clean_version(ver) | |
| for pkg in current_pkgs: | |
| out.append((pkg, ver)) | |
| current_pkgs = [] | |
| return out | |
| def parse_pnpm_lock(text: str): | |
| out = [] | |
| in_packages = False | |
| for line in text.splitlines(): | |
| stripped = line.strip() | |
| if stripped == "packages:": | |
| in_packages = True | |
| continue | |
| if in_packages: | |
| # Exit packages section if we hit a non-indented line | |
| if stripped and not line.startswith(" "): | |
| in_packages = False | |
| continue | |
| # Skip empty lines or sub-properties (resolution:, etc.) | |
| if not stripped or not stripped.endswith(":"): | |
| continue | |
| # Skip lines that are clearly sub-properties (contain spaces) | |
| if " " in stripped.split(":", 1)[0]: | |
| continue | |
| key = stripped.split(":", 1)[0].strip("\"'") | |
| key = key.lstrip("/") | |
| if "@" not in key: | |
| continue | |
| name, version = key.rsplit("@", 1) | |
| out.append((name, clean_version(version))) | |
| return out | |
| def clean_version(version: str) -> str: | |
| version = version.strip() | |
| if version.startswith("npm:"): | |
| # yarn alias format npm:package@version | |
| alias = version[4:] | |
| if "@" in alias: | |
| _, _, ver = alias.rpartition("@") | |
| return ver | |
| return alias | |
| # Strip v prefix (e.g., v1.0.0 -> 1.0.0) | |
| if version.startswith("v") and len(version) > 1 and version[1].isdigit(): | |
| version = version[1:] | |
| return version | |
| def parse_lockfile(path: str, text: str): | |
| if path.endswith("package-lock.json") or path.endswith("npm-shrinkwrap.json"): | |
| return parse_package_lock(text) | |
| if path.endswith("yarn.lock"): | |
| return parse_yarn_lock(text) | |
| if path.endswith("pnpm-lock.yaml") or path.endswith("pnpm-lock.yml"): | |
| return parse_pnpm_lock(text) | |
| return [] | |
| def find_matches(deps, iocs): | |
| matches = [] | |
| for name, version in deps: | |
| pkg_key = name.lower() | |
| ver = version.strip() | |
| if pkg_key in iocs and ver in iocs[pkg_key]: | |
| matches.append((name, ver)) | |
| return matches | |
| def scan_org(org: str, token: str | None, ioc_source: str, limit: int | None = None): | |
| iocs = load_iocs(ioc_source, token=None) | |
| report = [] | |
| for filename in LOCK_FILENAMES: | |
| for item in github_search_code(org, filename, token, limit): | |
| path = item.get("path") | |
| repo = item.get("repository", {}).get("full_name") | |
| content = fetch_file_content(item, token) | |
| if content is None: | |
| continue | |
| deps = parse_lockfile(path, content) | |
| matches = find_matches(deps, iocs) | |
| if matches: | |
| report.append( | |
| { | |
| "repository": repo, | |
| "path": path, | |
| "matches": [{"package": n, "version": v} for n, v in matches], | |
| } | |
| ) | |
| return report | |
| def scan_local_directory(directory: str, iocs: dict[str, set[str]]) -> list[dict]: | |
| """Scan a local directory for lockfiles and check against IOCs. | |
| Args: | |
| directory: Path to the directory to scan | |
| iocs: Dictionary mapping package names to sets of compromised versions | |
| Returns: | |
| List of findings with path and matches | |
| """ | |
| report = [] | |
| lock_filenames_set = set(LOCK_FILENAMES) | |
| skip_dirs = {"node_modules", ".git", ".hg", ".svn", "__pycache__"} | |
| for root, dirs, files in os.walk(directory): | |
| # Skip directories we don't want to traverse | |
| dirs[:] = [d for d in dirs if d not in skip_dirs] | |
| for filename in files: | |
| if filename not in lock_filenames_set: | |
| continue | |
| filepath = os.path.join(root, filename) | |
| try: | |
| with open(filepath, encoding="utf-8") as f: | |
| content = f.read() | |
| except (OSError, UnicodeDecodeError): | |
| continue | |
| deps = parse_lockfile(filename, content) | |
| matches = find_matches(deps, iocs) | |
| if matches: | |
| report.append( | |
| { | |
| "path": filepath, | |
| "matches": [{"package": n, "version": v} for n, v in matches], | |
| } | |
| ) | |
| return report | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Scan GitHub org or local directory for Shai-Hulud 2.0 infected npm packages" | |
| ) | |
| parser.add_argument("--org", help="GitHub organization name") | |
| parser.add_argument("--dir", help="Local directory path to scan for lockfiles") | |
| parser.add_argument( | |
| "--token", help="GitHub token (or set GITHUB_TOKEN)", default=None | |
| ) | |
| parser.add_argument( | |
| "--iocs", | |
| default=CSV_URL, | |
| help="Path or URL to IOC CSV (default: official Wiz CSV)", | |
| ) | |
| parser.add_argument("--output", help="Write JSON report to this path") | |
| parser.add_argument( | |
| "--limit", | |
| type=int, | |
| default=None, | |
| help="Limit processed lockfiles for quick runs (GitHub mode only)", | |
| ) | |
| args = parser.parse_args() | |
| if not args.org and not args.dir: | |
| parser.error("Either --org or --dir must be specified") | |
| if args.org and args.dir: | |
| parser.error("Cannot specify both --org and --dir") | |
| token = args.token or None | |
| ioc_source = args.iocs | |
| try: | |
| if args.dir: | |
| iocs = load_iocs(ioc_source, token=None) | |
| report = scan_local_directory(args.dir, iocs) | |
| else: | |
| report = scan_org(args.org, token, ioc_source, args.limit) | |
| except KeyboardInterrupt: | |
| print("Interrupted", file=sys.stderr) | |
| sys.exit(1) | |
| if args.output: | |
| with open(args.output, "w", encoding="utf-8") as f: | |
| json.dump(report, f, indent=2) | |
| print(f"Wrote {len(report)} findings to {args.output}") | |
| else: | |
| print(json.dumps(report, indent=2)) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment