Skip to content

Instantly share code, notes, and snippets.

@kshcherban
Last active November 26, 2025 12:38
Show Gist options
  • Select an option

  • Save kshcherban/80c7cac1cfd24dc39d716c97d12cc3bb to your computer and use it in GitHub Desktop.

Select an option

Save kshcherban/80c7cac1cfd24dc39d716c97d12cc3bb to your computer and use it in GitHub Desktop.
script to find shai-hulud 2 in github org using search
#!/usr/bin/env python3
# ABOUTME: Scans GitHub orgs or local directories for npm/yarn/pnpm lockfiles with compromised packages.
# ABOUTME: Matches dependencies against Shai-Hulud 2.0 IOC list from Wiz security research.
"""
Scan a GitHub organization or local directory for lockfiles (npm/yarn/pnpm)
that reference packages listed in the Shai-Hulud 2.0 IOC CSV published by Wiz.
Usage:
python shai-hulud-finder.py --org my-org [--token $GITHUB_TOKEN]
python shai-hulud-finder.py --dir /path/to/project
The script finds lockfiles, parses dependencies, and reports any matches with
the IOC list.
https://www.wiz.io/blog/shai-hulud-2-0-ongoing-supply-chain-attack
"""
import argparse
import base64
import csv
import json
import os
import sys
import time
import urllib.parse
import urllib.request
from collections import defaultdict
CSV_URL = "https://github.com/wiz-sec-public/wiz-research-iocs/raw/refs/heads/main/reports/shai-hulud-2-packages.csv"
MAX_RETRIES = 5
RETRY_BASE_DELAY = 2.0
RETRYABLE_STATUS_CODES = {403, 429, 500, 502, 503, 504}
LOCK_FILENAMES = [
"package-lock.json",
"npm-shrinkwrap.json",
"yarn.lock",
"pnpm-lock.yaml",
"pnpm-lock.yml",
]
GITHUB_API = "https://api.github.com"
def fetch_url(url: str, token: str | None = None, accept: str | None = None) -> bytes:
req = urllib.request.Request(url)
if token:
req.add_header("Authorization", f"Bearer {token}")
if accept:
req.add_header("Accept", accept)
from urllib.error import HTTPError
for attempt in range(MAX_RETRIES):
try:
with urllib.request.urlopen(req) as resp:
return resp.read()
except HTTPError as e:
if e.code not in RETRYABLE_STATUS_CODES:
raise
if attempt == MAX_RETRIES - 1:
raise
delay = RETRY_BASE_DELAY * (2**attempt)
print(
f"Rate limited ({e.code}), retrying in {delay:.1f}s "
f"(attempt {attempt + 1}/{MAX_RETRIES})",
file=sys.stderr,
)
time.sleep(delay)
def load_iocs(source: str, token: str | None = None) -> dict[str, set[str]]:
if source.startswith("http://") or source.startswith("https://"):
raw = fetch_url(source, token)
text = raw.decode("utf-8", errors="replace")
else:
text = open(source, "r", encoding="utf-8").read()
reader = csv.DictReader(text.splitlines())
iocs: dict[str, set[str]] = defaultdict(set)
for row in reader:
pkg = row.get("Package", "").strip().lower()
versions = row.get("Version", "")
if not pkg or not versions:
continue
for clause in versions.split("||"):
clause = clause.strip()
if clause.startswith("="):
clause = clause[1:].strip()
iocs[pkg].add(clause)
return iocs
def github_search_code(org: str, filename: str, token: str | None, limit: int | None):
page = 1
per_page = 100
seen = 0
while True:
q = urllib.parse.quote(f"org:{org} filename:{filename}")
url = f"{GITHUB_API}/search/code?q={q}&per_page={per_page}&page={page}"
body = fetch_url(url, token, accept="application/vnd.github.v3+json")
data = json.loads(body)
items = data.get("items", [])
for item in items:
yield item
seen += 1
if limit and seen >= limit:
return
if len(items) < per_page:
break
page += 1
time.sleep(0.2)
def fetch_file_content(item: dict, token: str | None) -> str | None:
# item.url points to contents API; returns base64 encoded content
url = item.get("url")
if not url:
return None
try:
body = fetch_url(url, token, accept="application/vnd.github.v3+json")
except Exception:
return None
data = json.loads(body)
content = data.get("content")
encoding = data.get("encoding")
if not content or encoding != "base64":
return None
try:
raw = base64.b64decode(content)
return raw.decode("utf-8", errors="replace")
except Exception:
return None
def iter_npm_dependencies(obj):
deps = obj.get("dependencies", {})
for name, meta in deps.items():
version = meta.get("version")
if version:
yield name, clean_version(version)
child = meta.get("dependencies")
if child:
yield from iter_npm_dependencies({"dependencies": child})
def parse_package_lock(text: str):
out = []
try:
data = json.loads(text)
except Exception:
return out
# npm v2 lockfiles keep data in "packages"
packages = data.get("packages")
if isinstance(packages, dict):
for path, meta in packages.items():
if not isinstance(meta, dict):
continue
if not path:
continue
# path like "node_modules/lodash" or "node_modules/@scope/name"
if path.startswith("node_modules/"):
name = path.rsplit("node_modules/", 1)[1]
else:
name = path
version = meta.get("version")
if version:
out.append((name, clean_version(version)))
# npm v1 style dependencies tree
out.extend(iter_npm_dependencies(data))
return out
def parse_yarn_lock(text: str):
out = []
current_pkgs: list[str] = []
for line in text.splitlines():
if not line:
current_pkgs = []
continue
# Package entry line (not indented, ends with colon)
if not line.startswith(" ") and line.endswith(":"):
key = line[:-1].strip()
specs = [s.strip().strip("\"'") for s in key.split(",")]
pkgs = []
for spec in specs:
if not spec:
continue
# Handle npm: and workspace: protocols (yarn berry)
if "@npm:" in spec or "@workspace:" in spec:
name = (
spec.split("@npm:")[0]
if "@npm:" in spec
else spec.split("@workspace:")[0]
)
name = name.strip("\"'")
else:
name, _, _ = spec.rpartition("@")
# Skip entries without a valid package name
if name:
pkgs.append(name)
current_pkgs = pkgs
continue
# Version line (indented)
stripped = line.lstrip()
if current_pkgs and stripped.startswith("version"):
# Handle both yarn classic (version "X.Y.Z") and berry (version: X.Y.Z)
if ":" in stripped:
# Yarn berry format: version: X.Y.Z
ver = stripped.split(":", 1)[1].strip().strip("\"'")
else:
# Yarn classic format: version "X.Y.Z"
_, _, ver_part = stripped.partition(" ")
ver = ver_part.strip().strip("\"'")
ver = clean_version(ver)
for pkg in current_pkgs:
out.append((pkg, ver))
current_pkgs = []
return out
def parse_pnpm_lock(text: str):
out = []
in_packages = False
for line in text.splitlines():
stripped = line.strip()
if stripped == "packages:":
in_packages = True
continue
if in_packages:
# Exit packages section if we hit a non-indented line
if stripped and not line.startswith(" "):
in_packages = False
continue
# Skip empty lines or sub-properties (resolution:, etc.)
if not stripped or not stripped.endswith(":"):
continue
# Skip lines that are clearly sub-properties (contain spaces)
if " " in stripped.split(":", 1)[0]:
continue
key = stripped.split(":", 1)[0].strip("\"'")
key = key.lstrip("/")
if "@" not in key:
continue
name, version = key.rsplit("@", 1)
out.append((name, clean_version(version)))
return out
def clean_version(version: str) -> str:
version = version.strip()
if version.startswith("npm:"):
# yarn alias format npm:package@version
alias = version[4:]
if "@" in alias:
_, _, ver = alias.rpartition("@")
return ver
return alias
# Strip v prefix (e.g., v1.0.0 -> 1.0.0)
if version.startswith("v") and len(version) > 1 and version[1].isdigit():
version = version[1:]
return version
def parse_lockfile(path: str, text: str):
if path.endswith("package-lock.json") or path.endswith("npm-shrinkwrap.json"):
return parse_package_lock(text)
if path.endswith("yarn.lock"):
return parse_yarn_lock(text)
if path.endswith("pnpm-lock.yaml") or path.endswith("pnpm-lock.yml"):
return parse_pnpm_lock(text)
return []
def find_matches(deps, iocs):
matches = []
for name, version in deps:
pkg_key = name.lower()
ver = version.strip()
if pkg_key in iocs and ver in iocs[pkg_key]:
matches.append((name, ver))
return matches
def scan_org(org: str, token: str | None, ioc_source: str, limit: int | None = None):
iocs = load_iocs(ioc_source, token=None)
report = []
for filename in LOCK_FILENAMES:
for item in github_search_code(org, filename, token, limit):
path = item.get("path")
repo = item.get("repository", {}).get("full_name")
content = fetch_file_content(item, token)
if content is None:
continue
deps = parse_lockfile(path, content)
matches = find_matches(deps, iocs)
if matches:
report.append(
{
"repository": repo,
"path": path,
"matches": [{"package": n, "version": v} for n, v in matches],
}
)
return report
def scan_local_directory(directory: str, iocs: dict[str, set[str]]) -> list[dict]:
"""Scan a local directory for lockfiles and check against IOCs.
Args:
directory: Path to the directory to scan
iocs: Dictionary mapping package names to sets of compromised versions
Returns:
List of findings with path and matches
"""
report = []
lock_filenames_set = set(LOCK_FILENAMES)
skip_dirs = {"node_modules", ".git", ".hg", ".svn", "__pycache__"}
for root, dirs, files in os.walk(directory):
# Skip directories we don't want to traverse
dirs[:] = [d for d in dirs if d not in skip_dirs]
for filename in files:
if filename not in lock_filenames_set:
continue
filepath = os.path.join(root, filename)
try:
with open(filepath, encoding="utf-8") as f:
content = f.read()
except (OSError, UnicodeDecodeError):
continue
deps = parse_lockfile(filename, content)
matches = find_matches(deps, iocs)
if matches:
report.append(
{
"path": filepath,
"matches": [{"package": n, "version": v} for n, v in matches],
}
)
return report
def main():
parser = argparse.ArgumentParser(
description="Scan GitHub org or local directory for Shai-Hulud 2.0 infected npm packages"
)
parser.add_argument("--org", help="GitHub organization name")
parser.add_argument("--dir", help="Local directory path to scan for lockfiles")
parser.add_argument(
"--token", help="GitHub token (or set GITHUB_TOKEN)", default=None
)
parser.add_argument(
"--iocs",
default=CSV_URL,
help="Path or URL to IOC CSV (default: official Wiz CSV)",
)
parser.add_argument("--output", help="Write JSON report to this path")
parser.add_argument(
"--limit",
type=int,
default=None,
help="Limit processed lockfiles for quick runs (GitHub mode only)",
)
args = parser.parse_args()
if not args.org and not args.dir:
parser.error("Either --org or --dir must be specified")
if args.org and args.dir:
parser.error("Cannot specify both --org and --dir")
token = args.token or None
ioc_source = args.iocs
try:
if args.dir:
iocs = load_iocs(ioc_source, token=None)
report = scan_local_directory(args.dir, iocs)
else:
report = scan_org(args.org, token, ioc_source, args.limit)
except KeyboardInterrupt:
print("Interrupted", file=sys.stderr)
sys.exit(1)
if args.output:
with open(args.output, "w", encoding="utf-8") as f:
json.dump(report, f, indent=2)
print(f"Wrote {len(report)} findings to {args.output}")
else:
print(json.dumps(report, indent=2))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment