Created
December 7, 2020 21:54
-
-
Save finnigja/a47afd61857b33a33373224f1066c108 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from gevent import monkey, joinall, spawn | |
monkey.patch_all() | |
import base64 | |
import gzip | |
import boto3 | |
import sys | |
POSSIBLE_SECRETS = ['PRIVATE KEY', ] # add other 'secret' strings here as needed... | |
# helper function to collect data about instances, called concurrently using gevent | |
def _get_instances(profile_name, account_id, region_name): | |
print('profile_name: {}\t\tregion: {}'.format(profile_name, region_name)) | |
session = boto3.Session(profile_name=profile_name) | |
ec2 = session.resource('ec2', region_name=region_name) | |
instances = [] | |
for instance in ec2.instances.all(): | |
response = instance.describe_attribute(Attribute='userData') | |
userdata = b'' | |
if 'UserData' in response and response['UserData']: | |
raw_userdata = base64.b64decode(response['UserData']['Value']) | |
try: | |
userdata = gzip.decompress(raw_userdata) | |
except OSError as e: | |
userdata = raw_userdata | |
instances.append((profile_name, account_id, region_name, instance.id, instance.instance_type, userdata)) | |
return instances | |
requested_profile_names = sys.argv[1:] | |
print('Checking metadata...') | |
session = boto3.Session() | |
profiles = session.available_profiles | |
results = [] | |
userdatas = {} | |
for profile_name in profiles: | |
if profile_name in requested_profile_names: | |
session = boto3.Session(profile_name=profile_name) | |
regions = session.get_available_regions('ec2', partition_name='aws') | |
account_id = session.client('sts').get_caller_identity().get('Account') | |
joined = joinall( | |
[spawn(_get_instances, profile_name, account_id, region_name) for region_name in regions] | |
) | |
for j in joined: | |
if j.value not in [[], None]: | |
results.extend(j.value) | |
for r in results: | |
instance = '{}:{}:{}:{}'.format(r[0], r[1], r[2], r[3]) | |
userdata = r[-1].decode('utf-8') | |
if userdata not in userdatas: | |
userdatas[userdata] = [instance, ] | |
else: | |
userdatas[userdata].append(instance) | |
print('Instances examined: {}'.format(len(results))) | |
print('Unique userdata values: {}'.format(len(userdatas))) | |
possible_secrets = [] | |
for userdata in userdatas: | |
for value in POSSIBLE_SECRETS: | |
if value in userdata: | |
possible_secrets.append('**** found value "{}" in instance userdata ****\r\n**** instances:\r\n\t{}\r\n**** userdata:\r\n{}'.format(value, '\r\n\t'.join(userdatas[userdata]), userdata)) | |
print('Possible secrets identified in userdata: {}'.format(len(possible_secrets))) | |
for ps in possible_secrets: | |
print(ps) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment