Skip to content

Instantly share code, notes, and snippets.

@ndavison
Last active July 17, 2024 20:05
Show Gist options
  • Save ndavison/d14dbbd9d015eeeef19b923ab80b1f1a to your computer and use it in GitHub Desktop.
Save ndavison/d14dbbd9d015eeeef19b923ab80b1f1a to your computer and use it in GitHub Desktop.
Detect potentially vulnerable github actions workflows for orgs
import requests
import yaml
import re
import json
import time
import logging
import sys
from argparse import ArgumentParser
UNSAFE_CONTEXT_DATA = [
'github.event.issue.title',
'github.event.issue.body',
'github.event.pull_request.title',
'github.event.pull_request.body',
'github.event.comment.body',
'github.event.review.body',
'github.event.review_comment.body',
'github.event.pages.*.page_name',
'github.event.commits.*.message',
'github.event.head_commit.message',
'github.event.head_commit.author.email',
'github.event.head_commit.author.name',
'github.event.commits.*.author.email',
'github.event.commits.*.author.name',
'github.event.pull_request.head.ref',
'github.event.pull_request.head.label',
'github.event.pull_request.head.repo.default_branch',
'github.head_ref'
]
class GitHub:
def __init__(self, token=None, verbose=False):
self.session = requests.session()
self.headers = {}
if token:
self.headers['Authorization'] = 'token %s' % (token)
self.logger = logging.getLogger('ghactions')
logger_level = logging.DEBUG if verbose else logging.ERROR
logger_handler = logging.StreamHandler(sys.stdout)
logging_format = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger_handler.setFormatter(logging_format)
self.logger.addHandler(logger_handler)
self.logger.setLevel(logger_level)
def request(self, url, page=1, attempt=1):
data = ''
errored = False
try:
res = self.session.get(url, headers=self.headers, params={'page': page})
if res.status_code == 404:
return ''
if int(res.headers.get('x-ratelimit-remaining', 1)) == 0:
reset_time = int(res.headers.get('x-ratelimit-reset'))
sleep_time = (reset_time - int(time.time())) + 1
self.logger.debug('Rate limiting in effect, sleeping for %s seconds...' % (sleep_time))
if sleep_time < 1:
sleep_time = 1
time.sleep(sleep_time)
return self.request(url, page, attempt)
elif res.status_code == 403:
self.logger.debug('HTTP 403 unrelated to rate limitng, skipping this request')
return ''
errored = False if res.ok else True
except requests.exceptions.ConnectionError:
errored = True
if errored:
self.logger.debug('Request errored - retrying attempt %s' % (attempt))
attempt += 1
if attempt > 5:
self.logger.error('Request failed after 5 attempts - aborting this request.')
return ''
return self.request(url, page, attempt)
try:
data = res.json()
except json.decoder.JSONDecodeError:
data = res.content
return data
def get_org_members(self, org):
page = 1
members = []
while True:
res = self.request('https://api.github.com/orgs/%s/members' % (org), page)
if len(res) == 0:
self.logger.debug('Finished finding members for %s' % (org))
break
for member in res:
if 'login' in member:
self.logger.debug('Found member %s' % (member['login']))
members.append(member['login'])
page += 1
return members
def get_org_repos(self, org, is_user=False):
page = 1
repos = []
org_type = 'users' if is_user else 'orgs'
while True:
res = self.request('https://api.github.com/%s/%s/repos' % (org_type, org), page)
if not res and not repos:
self.logger.error('Failed to get repos for "%s"' % (org))
break
if len(res) == 0:
self.logger.debug('Finished finding repos for %s' % (org))
break
for repo in res:
if repo and repo.get('archived'):
self.logger.debug('Skipping archived repo %s' % (repo['name']))
continue
if 'name' in repo:
self.logger.debug('Found repo %s' % (repo['name']))
repos.append({'name': repo['name'], 'is_fork': repo.get('fork', False)})
page += 1
return repos
def _check_for_oidc_role_usage(self, yaml_html_url, yaml_parsed):
vulnerable_workflows = []
jobs = yaml_parsed['jobs'] or []
for job_name in jobs:
if 'steps' in yaml_parsed['jobs'][job_name]:
for step in yaml_parsed['jobs'][job_name]['steps']:
if 'with' in step and 'role-to-assume' in step['with'] and step['with']['role-to-assume'] and 'secrets.' not in step['with']['role-to-assume']:
vulnerable_workflows.append({'url': yaml_html_url, 'job': job_name, 'type': 'oidc-role-usage'})
return vulnerable_workflows
def _check_for_unsafe_event_usage(self, yaml_html_url, yaml_parsed):
vulnerable_workflows = []
events = yaml_parsed.get(True, [])
if events and 'pull_request_target' in events:
jobs = yaml_parsed['jobs'] or []
for job_name in jobs:
if 'steps' in yaml_parsed['jobs'][job_name]:
for step in yaml_parsed['jobs'][job_name]['steps']:
if 'actions/checkout' in step.get('uses', ''):
with_string = str(step.get('with', ''))
if 'pull' in with_string:
vulnerable_workflows.append({'url': yaml_html_url, 'job': job_name, 'type': 'unsafe-pr-event-usage'})
return vulnerable_workflows
def _check_for_unsafe_run_context_usage(self, yaml_html_url, yaml_parsed):
vulnerable_workflows = []
jobs = yaml_parsed['jobs'] or []
for job_name in jobs:
if 'steps' in yaml_parsed['jobs'][job_name]:
for step in yaml_parsed['jobs'][job_name]['steps']:
for unsafe_context in UNSAFE_CONTEXT_DATA:
if unsafe_context in str(step.get('run', '')):
vulnerable_workflows.append({'url': yaml_html_url, 'job': job_name, 'type': 'unsafe-run-context-usage', 'context': unsafe_context})
return vulnerable_workflows
def _check_for_self_hosted_runner(self, yaml_html_url, yaml_parsed):
vulnerable_workflows = []
jobs = yaml_parsed['jobs'] or []
for job_name in jobs:
if 'runs-on' in yaml_parsed['jobs'][job_name]:
if 'self-hosted' in str(yaml_parsed['jobs'][job_name]['runs-on']):
vulnerable_workflows.append({'url': yaml_html_url, 'job': job_name, 'type': 'self-hosted-usage'})
return vulnerable_workflows
def _check_for_potential_unsafe_context_usage(self, yaml_html_url, yaml_parsed):
vulnerable_workflows = []
# remove the concurrency details as this often causes false positives
if 'concurrency' in yaml_parsed:
del yaml_parsed['concurrency']
# remove all if:, env: and with > ref: as potential hits in there are false positives
jobs = yaml_parsed.get('jobs', []) or []
for job_name in jobs:
if 'if' in yaml_parsed['jobs'][job_name]:
del yaml_parsed['jobs'][job_name]['if']
if 'env' in yaml_parsed['jobs'][job_name]:
del yaml_parsed['jobs'][job_name]['env']
for step in yaml_parsed['jobs'][job_name].get('steps', []):
if 'if' in step:
del step['if']
if 'env' in step:
del step['env']
if 'with' in step:
if 'ref' in step.get('with', {}):
del step['with']['ref']
raw_yaml = str(yaml_parsed)
for unsafe_context in UNSAFE_CONTEXT_DATA:
if unsafe_context in raw_yaml:
vulnerable_workflows.append({'url': yaml_html_url, 'type': 'potential-unsafe-context-usage', 'context': unsafe_context})
return vulnerable_workflows
def check_vulnerable_workflows(self, yaml_html_url, yaml_raw):
vulnerable_workflows = []
yaml_raw = yaml_raw.replace(b'\t', b' ')
yaml_parsed = yaml.safe_load(yaml_raw)
if yaml_parsed and 'jobs' in yaml_parsed:
vulnerable_workflows.extend(self._check_for_oidc_role_usage(yaml_html_url, yaml_parsed))
vulnerable_workflows.extend(self._check_for_unsafe_event_usage(yaml_html_url, yaml_parsed))
vulnerable_workflows.extend(self._check_for_potential_unsafe_context_usage(yaml_html_url, yaml_parsed))
vulnerable_workflows.extend(self._check_for_unsafe_run_context_usage(yaml_html_url, yaml_parsed))
vulnerable_workflows.extend(self._check_for_self_hosted_runner(yaml_html_url, yaml_parsed))
return vulnerable_workflows
def get_workflows(self, repo):
page = 1
vulnerable_workflows = []
while True:
res = self.request('https://api.github.com/repos/%s/actions/workflows' % (repo), page)
if not res:
self.logger.debug('Failed to get https://api.github.com/repos/%s/actions/workflows' % (repo))
break
workflows = res.get('workflows', [])
if page == 1:
self.logger.debug('Starting finding workflows for %s' % (repo))
if len(workflows) == 0:
self.logger.debug('Finished finding workflows for %s' % (repo))
break
for workflow in workflows:
yaml_path = workflow.get('path')
yaml_html_url = workflow.get('html_url')
if yaml_path and yaml_html_url:
branch_re = re.search(r'%s\/blob/([^/]+)' % (repo), yaml_html_url)
if branch_re:
workflow_branch = branch_re.group(1)
if workflow_branch:
yaml_raw_url = 'https://raw.githubusercontent.com/%s/%s/%s' % (repo, workflow_branch, yaml_path)
self.logger.debug('Checking %s...' % (yaml_raw_url))
yaml_raw = self.request(yaml_raw_url)
try:
yaml_raw = yaml_raw.encode('utf-8')
except Exception as e:
pass
try:
vulnerable_workflows.extend(self.check_vulnerable_workflows(yaml_html_url, yaml_raw))
except yaml.YAMLError as e:
self.logger.error(f'Error while parsing yaml for {yaml_html_url}', exc_info=True)
continue
page += 1
return vulnerable_workflows
def main(args):
verbose = args.verbose
org = args.org
token = args.token
check_members = args.members
repos = []
gh = GitHub(token, verbose)
if args.repo:
repos = [{'name': args.repo}]
else:
if check_members:
members = gh.get_org_members(org)
for member in members:
repos.extend(gh.get_org_repos(member, is_user=True))
else:
repos = gh.get_org_repos(org)
workflows = {}
for repo in repos:
for workflow in gh.get_workflows(f'{org}/{repo["name"]}'):
if workflow['url'] not in workflows:
workflows[workflow['url']] = []
if 'is_fork' in repo:
workflow['is_fork'] = repo.get('is_fork', False)
workflows[workflow['url']].append(workflow)
for url, hits in workflows.items():
out = f'workflow: {url}'
types = []
seen_contexts = []
for hit in hits:
if hit['type'] in ['oidc-role-usage'] and 'job' in hit:
out += f" job: {hit['job']}"
types.append(hit['type'])
if hit['type'] in ['unsafe-pr-event-usage', 'self-hosted-usage'] and 'job' in hit:
out += f" job: {hit['job']}"
types.append(hit['type'])
if hit['type'] in ['potential-unsafe-context-usage', 'unsafe-run-context-usage'] and 'context' in hit:
types.append(hit['type'])
if 'job' in hit:
out += f" job: {hit['job']}"
if hit['context'] in seen_contexts:
continue
out += f" context: {hit['context']}"
seen_contexts.append(hit['context'])
if types:
if 'is_fork' in hit:
out += f' is_fork: {hit["is_fork"]}'
out += f' ({", ".join(list(set(types)))})'
print(out)
if __name__ == '__main__':
parser = ArgumentParser(description="Checks whether a Github org has any actions workflows that may be vulnerable to malicious pull requests")
parser.add_argument("-o", "--org", help="Github org to check", required=True)
parser.add_argument("-m", "--members", action="store_true", help="Check the repos of members of the org")
parser.add_argument("-r", "--repo", help="Github repo to check")
parser.add_argument("-t", "--token", help="Github token for authenticated API requests, used in the Authorization header")
parser.add_argument("-v", "--verbose", action="store_true", help="More output")
args = parser.parse_args()
main(args)
@xflr6
Copy link

xflr6 commented Dec 18, 2021

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment