Skip to content

Instantly share code, notes, and snippets.

@gergo-dryrun
Created May 11, 2021 18:32
Show Gist options
  • Save gergo-dryrun/0fdf433cbffe8b1688dbbbeaaae86545 to your computer and use it in GitHub Desktop.
Save gergo-dryrun/0fdf433cbffe8b1688dbbbeaaae86545 to your computer and use it in GitHub Desktop.
Dump policies/OU/accounts from a root
from collections import defaultdict
import datetime
import json
import boto3
class DateTimeEncoder(json.JSONEncoder):
def default(self, z):
if isinstance(z, datetime.datetime):
return (str(z))
else:
return super().default(z)
my_dict = {'date': datetime.datetime.now()}
def dump_accts(org_client):
paginator = org_client.get_paginator('list_accounts')
page_iterator = paginator.paginate()
accounts = []
for page in page_iterator:
for acct in page['Accounts']:
accounts.append(acct)
print(acct) # print the account
with open('accounts.json', 'w') as fs:
json.dump(accounts, fs, cls=DateTimeEncoder)
def dump_ous(client):
root_id = client.list_roots()['Roots'][0]['Id']
ou_id_list = get_ou_ids(client, root_id)
with open('ous.json', 'w') as fs:
json.dump(ou_id_list, fs, cls=DateTimeEncoder)
print(ou_id_list)
def get_ou_ids(client, parent_id):
full_result = []
paginator = client.get_paginator('list_children')
iterator = paginator.paginate(
ParentId=parent_id,
ChildType='ORGANIZATIONAL_UNIT'
)
for page in iterator:
for ou in page['Children']:
# 1. Add entry
# 2. Fetch children recursively
full_result.append(ou['Id'])
full_result.extend(get_ou_ids(client, ou['Id']))
return full_result
def dump_traverse_accounts(client):
"""
:param client:
:return:
{ "accounts": ["acc1", "acc2"]},
{"child_ou": {"accounts": [],
...
}
}
"""
root_id = client.list_roots()['Roots'][0]['Id']
ou_id_list = get_ou_content(client, root_id)
with open('tree.json', 'w') as fs:
json.dump(ou_id_list, fs, cls=DateTimeEncoder)
print(ou_id_list)
def get_ou_content(client, parent_id):
full_result = defaultdict()
full_result['accounts'] = []
print(f'Processing {parent_id}')
paginator = client.get_paginator('list_children')
account_iterator = paginator.paginate(
ParentId=parent_id,
ChildType='ACCOUNT'
)
ou_iterator = paginator.paginate(
ParentId=parent_id,
ChildType='ORGANIZATIONAL_UNIT'
)
for page in account_iterator:
full_result['accounts'] += [item['Id'] for item in page['Children']]
print(f'Processed the accounts of {parent_id}')
for page in ou_iterator:
for ou in page['Children']:
print(f'Going into recursion from parent {parent_id} into child {ou["Id"]}')
full_result[ou['Id']] = get_ou_content(client, ou['Id'])
return full_result
def describe_ou(client, ou_id):
return client.describe_organizational_unit(
OrganizationalUnitId=ou_id
)['OrganizationalUnit']
def dump_named_ous(client):
named_ou = []
with open('ous.json', 'r') as fs:
ous = json.load(fs)
for ou_id in ous:
named_ou.append(describe_ou(client, ou_id))
with open('named_ou.json', 'w') as fs:
json.dump(named_ou, fs, cls=DateTimeEncoder)
def parse_dumps():
with open('accounts.json', 'r') as fs:
accounts = json.load(fs)
with open('tree.json', 'r') as fs:
tree = json.load(fs)
with open('named_ou.json', 'r') as fs:
named_ous = json.load(fs)
parsed_tree = recursive_replace(tree, accounts, named_ous)
with open('parsed_tree_counted.json', 'w') as fs:
json.dump(parsed_tree, fs, cls=DateTimeEncoder)
def get_ou_name_from_key(named_ous, ou_id):
"""
:param named_ous: list of dict {"Id", "Name"}
:param ou_id:
:return:
"""
for ou in named_ous:
if ou['Id'] == ou_id:
print(f'Found {ou["Name"]} for {ou_id}')
return ou['Name']
print(f'Did not find name for {ou_id}')
def recursive_replace(tree, detailed_accounts, named_ous):
parsed_tree = defaultdict()
if isinstance(tree, list):
return tree
for key, value in tree.items():
if key == 'accounts':
parsed_tree['accounts'] = len(value)
# parsed_tree['accounts'] = extend_accounts(value, detailed_accounts)
else:
replaced_key = get_ou_name_from_key(named_ous, key)
parsed_tree[replaced_key] = recursive_replace(value, detailed_accounts, named_ous)
return parsed_tree
def dump_policies(client):
policies = []
policy_targets = defaultdict()
paginator = client.get_paginator('list_policies')
policies_iterator = paginator.paginate(
Filter='SERVICE_CONTROL_POLICY'
)
for policy_response in policies_iterator:
for policy in policy_response['Policies']:
response = client.list_targets_for_policy(
PolicyId=policy['Id'],
)
policy_targets[policy['Name']] = response['Targets']
policies += policy_response['Policies']
with open('policies.json', 'w') as fs:
json.dump(policies, fs, cls=DateTimeEncoder)
with open('policy_targets.json', 'w') as fs:
json.dump(policy_targets, fs, cls=DateTimeEncoder)
def main():
session = boto3.session.Session(profile_name='')
org_client = session.client('organizations')
# dump_accts(org_client)
# dump_ous(org_client)
# dump_traverse_accounts(org_client)
# dump_named_ous(org_client)
dump_policies(org_client)
# parse_dumps()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment