Last active
July 17, 2024 20:05
-
-
Save ndavison/d14dbbd9d015eeeef19b923ab80b1f1a to your computer and use it in GitHub Desktop.
Detect potentially vulnerable github actions workflows for orgs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import yaml | |
import re | |
import json | |
import time | |
import logging | |
import sys | |
from argparse import ArgumentParser | |
UNSAFE_CONTEXT_DATA = [ | |
'github.event.issue.title', | |
'github.event.issue.body', | |
'github.event.pull_request.title', | |
'github.event.pull_request.body', | |
'github.event.comment.body', | |
'github.event.review.body', | |
'github.event.review_comment.body', | |
'github.event.pages.*.page_name', | |
'github.event.commits.*.message', | |
'github.event.head_commit.message', | |
'github.event.head_commit.author.email', | |
'github.event.head_commit.author.name', | |
'github.event.commits.*.author.email', | |
'github.event.commits.*.author.name', | |
'github.event.pull_request.head.ref', | |
'github.event.pull_request.head.label', | |
'github.event.pull_request.head.repo.default_branch', | |
'github.head_ref' | |
] | |
class GitHub: | |
def __init__(self, token=None, verbose=False): | |
self.session = requests.session() | |
self.headers = {} | |
if token: | |
self.headers['Authorization'] = 'token %s' % (token) | |
self.logger = logging.getLogger('ghactions') | |
logger_level = logging.DEBUG if verbose else logging.ERROR | |
logger_handler = logging.StreamHandler(sys.stdout) | |
logging_format = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logger_handler.setFormatter(logging_format) | |
self.logger.addHandler(logger_handler) | |
self.logger.setLevel(logger_level) | |
def request(self, url, page=1, attempt=1): | |
data = '' | |
errored = False | |
try: | |
res = self.session.get(url, headers=self.headers, params={'page': page}) | |
if res.status_code == 404: | |
return '' | |
if int(res.headers.get('x-ratelimit-remaining', 1)) == 0: | |
reset_time = int(res.headers.get('x-ratelimit-reset')) | |
sleep_time = (reset_time - int(time.time())) + 1 | |
self.logger.debug('Rate limiting in effect, sleeping for %s seconds...' % (sleep_time)) | |
if sleep_time < 1: | |
sleep_time = 1 | |
time.sleep(sleep_time) | |
return self.request(url, page, attempt) | |
elif res.status_code == 403: | |
self.logger.debug('HTTP 403 unrelated to rate limitng, skipping this request') | |
return '' | |
errored = False if res.ok else True | |
except requests.exceptions.ConnectionError: | |
errored = True | |
if errored: | |
self.logger.debug('Request errored - retrying attempt %s' % (attempt)) | |
attempt += 1 | |
if attempt > 5: | |
self.logger.error('Request failed after 5 attempts - aborting this request.') | |
return '' | |
return self.request(url, page, attempt) | |
try: | |
data = res.json() | |
except json.decoder.JSONDecodeError: | |
data = res.content | |
return data | |
def get_org_members(self, org): | |
page = 1 | |
members = [] | |
while True: | |
res = self.request('https://api.github.com/orgs/%s/members' % (org), page) | |
if len(res) == 0: | |
self.logger.debug('Finished finding members for %s' % (org)) | |
break | |
for member in res: | |
if 'login' in member: | |
self.logger.debug('Found member %s' % (member['login'])) | |
members.append(member['login']) | |
page += 1 | |
return members | |
def get_org_repos(self, org, is_user=False): | |
page = 1 | |
repos = [] | |
org_type = 'users' if is_user else 'orgs' | |
while True: | |
res = self.request('https://api.github.com/%s/%s/repos' % (org_type, org), page) | |
if not res and not repos: | |
self.logger.error('Failed to get repos for "%s"' % (org)) | |
break | |
if len(res) == 0: | |
self.logger.debug('Finished finding repos for %s' % (org)) | |
break | |
for repo in res: | |
if repo and repo.get('archived'): | |
self.logger.debug('Skipping archived repo %s' % (repo['name'])) | |
continue | |
if 'name' in repo: | |
self.logger.debug('Found repo %s' % (repo['name'])) | |
repos.append({'name': repo['name'], 'is_fork': repo.get('fork', False)}) | |
page += 1 | |
return repos | |
def _check_for_oidc_role_usage(self, yaml_html_url, yaml_parsed): | |
vulnerable_workflows = [] | |
jobs = yaml_parsed['jobs'] or [] | |
for job_name in jobs: | |
if 'steps' in yaml_parsed['jobs'][job_name]: | |
for step in yaml_parsed['jobs'][job_name]['steps']: | |
if 'with' in step and 'role-to-assume' in step['with'] and step['with']['role-to-assume'] and 'secrets.' not in step['with']['role-to-assume']: | |
vulnerable_workflows.append({'url': yaml_html_url, 'job': job_name, 'type': 'oidc-role-usage'}) | |
return vulnerable_workflows | |
def _check_for_unsafe_event_usage(self, yaml_html_url, yaml_parsed): | |
vulnerable_workflows = [] | |
events = yaml_parsed.get(True, []) | |
if events and 'pull_request_target' in events: | |
jobs = yaml_parsed['jobs'] or [] | |
for job_name in jobs: | |
if 'steps' in yaml_parsed['jobs'][job_name]: | |
for step in yaml_parsed['jobs'][job_name]['steps']: | |
if 'actions/checkout' in step.get('uses', ''): | |
with_string = str(step.get('with', '')) | |
if 'pull' in with_string: | |
vulnerable_workflows.append({'url': yaml_html_url, 'job': job_name, 'type': 'unsafe-pr-event-usage'}) | |
return vulnerable_workflows | |
def _check_for_unsafe_run_context_usage(self, yaml_html_url, yaml_parsed): | |
vulnerable_workflows = [] | |
jobs = yaml_parsed['jobs'] or [] | |
for job_name in jobs: | |
if 'steps' in yaml_parsed['jobs'][job_name]: | |
for step in yaml_parsed['jobs'][job_name]['steps']: | |
for unsafe_context in UNSAFE_CONTEXT_DATA: | |
if unsafe_context in str(step.get('run', '')): | |
vulnerable_workflows.append({'url': yaml_html_url, 'job': job_name, 'type': 'unsafe-run-context-usage', 'context': unsafe_context}) | |
return vulnerable_workflows | |
def _check_for_self_hosted_runner(self, yaml_html_url, yaml_parsed): | |
vulnerable_workflows = [] | |
jobs = yaml_parsed['jobs'] or [] | |
for job_name in jobs: | |
if 'runs-on' in yaml_parsed['jobs'][job_name]: | |
if 'self-hosted' in str(yaml_parsed['jobs'][job_name]['runs-on']): | |
vulnerable_workflows.append({'url': yaml_html_url, 'job': job_name, 'type': 'self-hosted-usage'}) | |
return vulnerable_workflows | |
def _check_for_potential_unsafe_context_usage(self, yaml_html_url, yaml_parsed): | |
vulnerable_workflows = [] | |
# remove the concurrency details as this often causes false positives | |
if 'concurrency' in yaml_parsed: | |
del yaml_parsed['concurrency'] | |
# remove all if:, env: and with > ref: as potential hits in there are false positives | |
jobs = yaml_parsed.get('jobs', []) or [] | |
for job_name in jobs: | |
if 'if' in yaml_parsed['jobs'][job_name]: | |
del yaml_parsed['jobs'][job_name]['if'] | |
if 'env' in yaml_parsed['jobs'][job_name]: | |
del yaml_parsed['jobs'][job_name]['env'] | |
for step in yaml_parsed['jobs'][job_name].get('steps', []): | |
if 'if' in step: | |
del step['if'] | |
if 'env' in step: | |
del step['env'] | |
if 'with' in step: | |
if 'ref' in step.get('with', {}): | |
del step['with']['ref'] | |
raw_yaml = str(yaml_parsed) | |
for unsafe_context in UNSAFE_CONTEXT_DATA: | |
if unsafe_context in raw_yaml: | |
vulnerable_workflows.append({'url': yaml_html_url, 'type': 'potential-unsafe-context-usage', 'context': unsafe_context}) | |
return vulnerable_workflows | |
def check_vulnerable_workflows(self, yaml_html_url, yaml_raw): | |
vulnerable_workflows = [] | |
yaml_raw = yaml_raw.replace(b'\t', b' ') | |
yaml_parsed = yaml.safe_load(yaml_raw) | |
if yaml_parsed and 'jobs' in yaml_parsed: | |
vulnerable_workflows.extend(self._check_for_oidc_role_usage(yaml_html_url, yaml_parsed)) | |
vulnerable_workflows.extend(self._check_for_unsafe_event_usage(yaml_html_url, yaml_parsed)) | |
vulnerable_workflows.extend(self._check_for_potential_unsafe_context_usage(yaml_html_url, yaml_parsed)) | |
vulnerable_workflows.extend(self._check_for_unsafe_run_context_usage(yaml_html_url, yaml_parsed)) | |
vulnerable_workflows.extend(self._check_for_self_hosted_runner(yaml_html_url, yaml_parsed)) | |
return vulnerable_workflows | |
def get_workflows(self, repo): | |
page = 1 | |
vulnerable_workflows = [] | |
while True: | |
res = self.request('https://api.github.com/repos/%s/actions/workflows' % (repo), page) | |
if not res: | |
self.logger.debug('Failed to get https://api.github.com/repos/%s/actions/workflows' % (repo)) | |
break | |
workflows = res.get('workflows', []) | |
if page == 1: | |
self.logger.debug('Starting finding workflows for %s' % (repo)) | |
if len(workflows) == 0: | |
self.logger.debug('Finished finding workflows for %s' % (repo)) | |
break | |
for workflow in workflows: | |
yaml_path = workflow.get('path') | |
yaml_html_url = workflow.get('html_url') | |
if yaml_path and yaml_html_url: | |
branch_re = re.search(r'%s\/blob/([^/]+)' % (repo), yaml_html_url) | |
if branch_re: | |
workflow_branch = branch_re.group(1) | |
if workflow_branch: | |
yaml_raw_url = 'https://raw.githubusercontent.com/%s/%s/%s' % (repo, workflow_branch, yaml_path) | |
self.logger.debug('Checking %s...' % (yaml_raw_url)) | |
yaml_raw = self.request(yaml_raw_url) | |
try: | |
yaml_raw = yaml_raw.encode('utf-8') | |
except Exception as e: | |
pass | |
try: | |
vulnerable_workflows.extend(self.check_vulnerable_workflows(yaml_html_url, yaml_raw)) | |
except yaml.YAMLError as e: | |
self.logger.error(f'Error while parsing yaml for {yaml_html_url}', exc_info=True) | |
continue | |
page += 1 | |
return vulnerable_workflows | |
def main(args): | |
verbose = args.verbose | |
org = args.org | |
token = args.token | |
check_members = args.members | |
repos = [] | |
gh = GitHub(token, verbose) | |
if args.repo: | |
repos = [{'name': args.repo}] | |
else: | |
if check_members: | |
members = gh.get_org_members(org) | |
for member in members: | |
repos.extend(gh.get_org_repos(member, is_user=True)) | |
else: | |
repos = gh.get_org_repos(org) | |
workflows = {} | |
for repo in repos: | |
for workflow in gh.get_workflows(f'{org}/{repo["name"]}'): | |
if workflow['url'] not in workflows: | |
workflows[workflow['url']] = [] | |
if 'is_fork' in repo: | |
workflow['is_fork'] = repo.get('is_fork', False) | |
workflows[workflow['url']].append(workflow) | |
for url, hits in workflows.items(): | |
out = f'workflow: {url}' | |
types = [] | |
seen_contexts = [] | |
for hit in hits: | |
if hit['type'] in ['oidc-role-usage'] and 'job' in hit: | |
out += f" job: {hit['job']}" | |
types.append(hit['type']) | |
if hit['type'] in ['unsafe-pr-event-usage', 'self-hosted-usage'] and 'job' in hit: | |
out += f" job: {hit['job']}" | |
types.append(hit['type']) | |
if hit['type'] in ['potential-unsafe-context-usage', 'unsafe-run-context-usage'] and 'context' in hit: | |
types.append(hit['type']) | |
if 'job' in hit: | |
out += f" job: {hit['job']}" | |
if hit['context'] in seen_contexts: | |
continue | |
out += f" context: {hit['context']}" | |
seen_contexts.append(hit['context']) | |
if types: | |
if 'is_fork' in hit: | |
out += f' is_fork: {hit["is_fork"]}' | |
out += f' ({", ".join(list(set(types)))})' | |
print(out) | |
if __name__ == '__main__': | |
parser = ArgumentParser(description="Checks whether a Github org has any actions workflows that may be vulnerable to malicious pull requests") | |
parser.add_argument("-o", "--org", help="Github org to check", required=True) | |
parser.add_argument("-m", "--members", action="store_true", help="Check the repos of members of the org") | |
parser.add_argument("-r", "--repo", help="Github repo to check") | |
parser.add_argument("-t", "--token", help="Github token for authenticated API requests, used in the Authorization header") | |
parser.add_argument("-v", "--verbose", action="store_true", help="More output") | |
args = parser.parse_args() | |
main(args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
See also https://securitylab.github.com/research/github-actions-preventing-pwn-requests/