Skip to content

Instantly share code, notes, and snippets.

@dagrz
Created August 29, 2023 05:54
Show Gist options
  • Save dagrz/80cba578179afc2c9e861122bab45c5e to your computer and use it in GitHub Desktop.
Save dagrz/80cba578179afc2c9e861122bab45c5e to your computer and use it in GitHub Desktop.
Find OIDC roles in github events firehose
#!/usr/bin/env python3
import json, time, requests, re, argparse, os, boto3
def main(args):
print("> Started...")
if args.verbose:
print(f"^ Args: {args}")
s3_bucket = None
if args.s3_bucket:
s3 = boto3.resource('s3')
s3_bucket = s3.Bucket(args.s3_bucket)
if args.gharchive_file:
process_gharchive_file(args, s3_bucket)
else:
process_github_events(args, s3_bucket)
def process_event(args, event, s3_bucket=None):
if event['type'] == 'PushEvent':
repo_name = event['repo']['name']
if "commits" in event['payload']:
for commit in event['payload']['commits']:
commit_sha = commit['sha']
# Build the cached raw commit patch URL external to the API
url = f"https://github.com/{repo_name}/commit/{commit_sha}.patch"
r = make_web_request(args, url)
if r.status_code >= 200 and r.status_code < 300:
commit_text = r.text
if commit_text.find('.github/workflow') >= 0:
print(f"+ Workflow: {url}")
# Store all commits with Github Actions worklflows in S3
if args.s3_bucket:
s3_bucket.put_object(Key=f"commits/{repo_name.replace('/','-')}-{commit_sha}.txt", Body=commit_text)
matches = re.findall("arn:aws:iam::[0-9]{12}:role\/[\/a-zA-Z0-9-_]+", commit_text)
# Separately store all AWS IAM role ARNs found inside Workflows in S3
if matches:
if args.s3_bucket:
s3_bucket.put_object(Key=f"roles/{commit_sha}.txt", Body=commit_text)
for match in matches:
print(f"! Role: {match}")
def process_gharchive_file(args, s3_bucket=None):
line = args.gharchive_file.readline()
while line:
event = json.loads(line)
process_event(args, event, s3_bucket)
line = args.gharchive_file.readline()
# Argparse opened the file so we need to close it
args.gharchive_file.close()
def process_github_events(args, s3_bucket=None):
events_url = 'https://api.github.com/events'
etag = None
while(True):
r = make_github_request(args, events_url, etag)
if r.status_code >= 200 and r.status_code < 300:
if 'ETag' in r.headers:
etag = r.headers['ETag']
if args.verbose:
print(f"^ ETag: {etag}")
events = r.json()
for event in events:
process_event(args, event, s3_bucket)
time.sleep(args.sleep)
def make_web_request(args, url):
r = None
headers = {
"User-Agent": args.user_agent,
"Range": "bytes=0-2000000" # ~2MB
}
try:
r = requests.get(url, headers=headers)
except Exception as e:
print(e)
return None
return r
def make_github_request(args, url, etag=None):
r = None
headers = {
"Authorization": f"Bearer {args.github_token}",
"X-GitHub-Api-Version": "2022-11-28",
"User-Agent": args.user_agent
}
# https://docs.github.com/en/rest/overview/resources-in-the-rest-api?apiVersion=2022-11-28#conditional-requests
if etag:
headers["If-None-Match"] = etag
try:
r = requests.get(url, headers=headers)
data = r.json()
print
if "message" in data and data["message"].startswith("Bad credentials"):
print(f"{r.status_code} Bad Github credentials, exiting")
exit(1)
if "message" in data and data["message"].startswith("API rate limit exceeded"):
# There is a better way but I'm too lazy to implement it
print(f"{r.status_code} Github rate limit hit, sleeping for 15 minutes")
time.sleep(60*15)
r = requests.get(url=url, headers=headers)
data = r.json()
print(f"{r.status_code} Rate limit hit again, exiting")
exit(1)
except Exception as e:
print(e)
return None
return r
if __name__ == "__main__":
""" This is executed when run from the command line """
parser = argparse.ArgumentParser(
prog='Github AWS OIDC Firehose',
description='Reads Github events and finds AWS roles in Github actions workflows'
)
parser.add_argument(
'-v',
'--verbose',
action='store_true',
required=False,
help='Output verbose logging'
)
parser.add_argument(
'-f',
'--gharchive-file',
type=argparse.FileType('r'),
required=False,
help='Use this Github archive JSON file instead of Github events API'
)
parser.add_argument(
'-b',
'--s3-bucket',
required=False,
help='Store commit patches in this S3 bucket'
)
parser.add_argument(
'-t',
'--github-token',
required=False,
default=os.getenv('GITHUB_TOKEN'),
help='Use this Github token to authenticate API requests. Defaults to GITHUB_TOKEN env var'
)
parser.add_argument(
'-a',
'--user-agent',
required=False,
default='github-aws-oidc-firehose',
help='Use this user agent when making Github API requests. Defaults to github-aws-oidc-firehose'
)
parser.add_argument(
'-s',
'--sleep',
required=False,
default=300,
help='Sleep this many seconds between Github API requests. Defaults to 300'
)
args = parser.parse_args()
main(args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment