Created
January 20, 2021 19:44
-
-
Save alisade/a087384d9a62ad0726b4596e61123aef to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import boto3 | |
import json | |
import uuid | |
import sys | |
from time import sleep | |
import backoff | |
from botocore import exceptions | |
if len(sys.argv) < 2 : | |
print("Usage: " + sys.argv[0] + " SOURCE_ROLE_NAME [AWS_PROFILE]") | |
exit(1) | |
source_role = sys.argv[1] | |
# create a copy of source role based of the name of the | |
# source role and the macaddress of the machine creating | |
# the role this will make sure a user will not generate too | |
# many roles as assumed roles ttl out | |
destination_role = str(uuid.uuid5(namespace=uuid.NAMESPACE_OID, name=source_role + str(uuid.getnode()))) | |
try: | |
profile = sys.argv[2] | |
session = boto3.Session(profile_name=profile) | |
sts = session.client('sts') | |
iam = session.client('iam') | |
except: | |
sts = boto3.client('sts') | |
iam = boto3.client('iam') | |
try: | |
sts_identity = sts.get_caller_identity()['Arn'] | |
except exceptions.ClientError as err: | |
if err.response['Error']['Code'] == 'ExpiredToken': | |
print('You need to re-authenticate with Okta and try again') | |
exit(1) | |
role = iam.get_role(RoleName=source_role) | |
assumed_role_policy = role['Role']['AssumeRolePolicyDocument'] | |
# sleep for 30 seconds max | |
for remaining in range(30, 0, -1): | |
sys.stdout.write("\r") | |
sys.stdout.write("{:2d} Creating a new copy of the role...".format(remaining)) | |
sys.stdout.flush() | |
try: | |
# duplicate role | |
iam.create_role( | |
RoleName=destination_role, | |
MaxSessionDuration=8 * 3600, # 8 hours max session | |
AssumeRolePolicyDocument=json.dumps(assumed_role_policy), | |
Tags=[ | |
{ | |
'Key': 'DUPLICATE-OF', | |
'Value': source_role | |
}, | |
{ | |
'Key': 'TEMPORARY_WILL_BE_DELETED', | |
'Value': 'True' | |
} | |
] | |
) | |
break | |
except iam.exceptions.EntityAlreadyExistsException: | |
# we will delete and recreate the same role again | |
# this makes sure we always get a fresh and untouched | |
# copy of the source role | |
response = iam.list_attached_role_policies(RoleName=destination_role) | |
for _policy in response.get('AttachedPolicies'): | |
_resp = iam.detach_role_policy( | |
RoleName=destination_role, | |
PolicyArn=_policy.get('PolicyArn') | |
) | |
response = iam.list_role_policies(RoleName=destination_role) | |
for _policy in response.get('PolicyNames'): | |
dest = iam.delete_role_policy( | |
RoleName=destination_role, | |
PolicyName=_policy | |
) | |
iam.delete_role(RoleName=destination_role) | |
sleep(1) | |
sys.stdout.write("\rRole copy created sucessfully. \n") | |
# Copy source role Inline Policies | |
response = iam.list_role_policies(RoleName=source_role) | |
for _policy in response.get('PolicyNames'): | |
_src = iam.get_role_policy( | |
RoleName=source_role, | |
PolicyName=_policy | |
) | |
dest = iam.put_role_policy( | |
RoleName=destination_role, | |
PolicyName=_policy, | |
PolicyDocument=json.dumps(_src.get('PolicyDocument')) | |
) | |
# Copy source role managed policies | |
response = iam.list_attached_role_policies(RoleName=source_role) | |
for _policy in response.get('AttachedPolicies'): | |
_resp = iam.attach_role_policy( | |
RoleName=destination_role, | |
PolicyArn=_policy.get('PolicyArn') | |
) | |
# find the currently assumed role | |
role_principal_to_add = ('/').join(sts_identity.split('/')[0:2]).replace('sts','iam').replace('assumed-','') | |
# find source policy statements | |
assumed_role_policy_statements = role['Role']['AssumeRolePolicyDocument']['Statement'] | |
# append current role to trusted principals | |
assumed_role_policy_statements.append({'Effect': 'Allow', 'Principal': {'AWS': role_principal_to_add}, 'Action': 'sts:AssumeRole'}) | |
# apply changes to the principals of the destination role | |
response = iam.update_assume_role_policy( | |
RoleName=destination_role, | |
PolicyDocument=json.dumps(assumed_role_policy) | |
) | |
role = iam.get_role(RoleName=destination_role) | |
@backoff.on_exception(backoff.expo, sts.exceptions.ClientError) | |
def assume_role(sts, role): | |
print('Waiting for the role to become available to assume...') | |
return sts.assume_role(RoleArn=role['Role']['Arn'], | |
RoleSessionName=str(uuid.uuid4())) | |
response = assume_role(sts, role) | |
def aws_envs(): | |
print('RoleArn:', role['Role']['Arn']) | |
print('Run your docker command as below:') | |
print('=================================') | |
print('docker run -e AWS_ACCESS_KEY_ID="{}"'.format(response['Credentials']['AccessKeyId']), end=" ") | |
print('-e AWS_SECRET_ACCESS_KEY="{}"'.format(response['Credentials']['SecretAccessKey']), end=" ") | |
print('-e AWS_SESSION_TOKEN="{}"'.format(response['Credentials']['SessionToken'])) | |
if __name__ == '__main__': | |
aws_envs() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment