Created
May 11, 2021 18:32
-
-
Save gergo-dryrun/0fdf433cbffe8b1688dbbbeaaae86545 to your computer and use it in GitHub Desktop.
Dump policies/OU/accounts from a root
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import defaultdict | |
import datetime | |
import json | |
import boto3 | |
class DateTimeEncoder(json.JSONEncoder): | |
def default(self, z): | |
if isinstance(z, datetime.datetime): | |
return (str(z)) | |
else: | |
return super().default(z) | |
my_dict = {'date': datetime.datetime.now()} | |
def dump_accts(org_client): | |
paginator = org_client.get_paginator('list_accounts') | |
page_iterator = paginator.paginate() | |
accounts = [] | |
for page in page_iterator: | |
for acct in page['Accounts']: | |
accounts.append(acct) | |
print(acct) # print the account | |
with open('accounts.json', 'w') as fs: | |
json.dump(accounts, fs, cls=DateTimeEncoder) | |
def dump_ous(client): | |
root_id = client.list_roots()['Roots'][0]['Id'] | |
ou_id_list = get_ou_ids(client, root_id) | |
with open('ous.json', 'w') as fs: | |
json.dump(ou_id_list, fs, cls=DateTimeEncoder) | |
print(ou_id_list) | |
def get_ou_ids(client, parent_id): | |
full_result = [] | |
paginator = client.get_paginator('list_children') | |
iterator = paginator.paginate( | |
ParentId=parent_id, | |
ChildType='ORGANIZATIONAL_UNIT' | |
) | |
for page in iterator: | |
for ou in page['Children']: | |
# 1. Add entry | |
# 2. Fetch children recursively | |
full_result.append(ou['Id']) | |
full_result.extend(get_ou_ids(client, ou['Id'])) | |
return full_result | |
def dump_traverse_accounts(client): | |
""" | |
:param client: | |
:return: | |
{ "accounts": ["acc1", "acc2"]}, | |
{"child_ou": {"accounts": [], | |
... | |
} | |
} | |
""" | |
root_id = client.list_roots()['Roots'][0]['Id'] | |
ou_id_list = get_ou_content(client, root_id) | |
with open('tree.json', 'w') as fs: | |
json.dump(ou_id_list, fs, cls=DateTimeEncoder) | |
print(ou_id_list) | |
def get_ou_content(client, parent_id): | |
full_result = defaultdict() | |
full_result['accounts'] = [] | |
print(f'Processing {parent_id}') | |
paginator = client.get_paginator('list_children') | |
account_iterator = paginator.paginate( | |
ParentId=parent_id, | |
ChildType='ACCOUNT' | |
) | |
ou_iterator = paginator.paginate( | |
ParentId=parent_id, | |
ChildType='ORGANIZATIONAL_UNIT' | |
) | |
for page in account_iterator: | |
full_result['accounts'] += [item['Id'] for item in page['Children']] | |
print(f'Processed the accounts of {parent_id}') | |
for page in ou_iterator: | |
for ou in page['Children']: | |
print(f'Going into recursion from parent {parent_id} into child {ou["Id"]}') | |
full_result[ou['Id']] = get_ou_content(client, ou['Id']) | |
return full_result | |
def describe_ou(client, ou_id): | |
return client.describe_organizational_unit( | |
OrganizationalUnitId=ou_id | |
)['OrganizationalUnit'] | |
def dump_named_ous(client): | |
named_ou = [] | |
with open('ous.json', 'r') as fs: | |
ous = json.load(fs) | |
for ou_id in ous: | |
named_ou.append(describe_ou(client, ou_id)) | |
with open('named_ou.json', 'w') as fs: | |
json.dump(named_ou, fs, cls=DateTimeEncoder) | |
def parse_dumps(): | |
with open('accounts.json', 'r') as fs: | |
accounts = json.load(fs) | |
with open('tree.json', 'r') as fs: | |
tree = json.load(fs) | |
with open('named_ou.json', 'r') as fs: | |
named_ous = json.load(fs) | |
parsed_tree = recursive_replace(tree, accounts, named_ous) | |
with open('parsed_tree_counted.json', 'w') as fs: | |
json.dump(parsed_tree, fs, cls=DateTimeEncoder) | |
def get_ou_name_from_key(named_ous, ou_id): | |
""" | |
:param named_ous: list of dict {"Id", "Name"} | |
:param ou_id: | |
:return: | |
""" | |
for ou in named_ous: | |
if ou['Id'] == ou_id: | |
print(f'Found {ou["Name"]} for {ou_id}') | |
return ou['Name'] | |
print(f'Did not find name for {ou_id}') | |
def recursive_replace(tree, detailed_accounts, named_ous): | |
parsed_tree = defaultdict() | |
if isinstance(tree, list): | |
return tree | |
for key, value in tree.items(): | |
if key == 'accounts': | |
parsed_tree['accounts'] = len(value) | |
# parsed_tree['accounts'] = extend_accounts(value, detailed_accounts) | |
else: | |
replaced_key = get_ou_name_from_key(named_ous, key) | |
parsed_tree[replaced_key] = recursive_replace(value, detailed_accounts, named_ous) | |
return parsed_tree | |
def dump_policies(client): | |
policies = [] | |
policy_targets = defaultdict() | |
paginator = client.get_paginator('list_policies') | |
policies_iterator = paginator.paginate( | |
Filter='SERVICE_CONTROL_POLICY' | |
) | |
for policy_response in policies_iterator: | |
for policy in policy_response['Policies']: | |
response = client.list_targets_for_policy( | |
PolicyId=policy['Id'], | |
) | |
policy_targets[policy['Name']] = response['Targets'] | |
policies += policy_response['Policies'] | |
with open('policies.json', 'w') as fs: | |
json.dump(policies, fs, cls=DateTimeEncoder) | |
with open('policy_targets.json', 'w') as fs: | |
json.dump(policy_targets, fs, cls=DateTimeEncoder) | |
def main(): | |
session = boto3.session.Session(profile_name='') | |
org_client = session.client('organizations') | |
# dump_accts(org_client) | |
# dump_ous(org_client) | |
# dump_traverse_accounts(org_client) | |
# dump_named_ous(org_client) | |
dump_policies(org_client) | |
# parse_dumps() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment