-
-
Save andymotta/cb64ebd71c4703726501fe9a3776ce3d to your computer and use it in GitHub Desktop.
| ## Meant to be scheudled on a cron/timer of 90 days (CIS Benchmark) | |
| ## The target keys need permissions to rotate themselves | |
| import boto3 | |
| from botocore.exceptions import ClientError | |
| import os | |
| from datetime import datetime | |
| import shutil | |
| from ConfigParser import SafeConfigParser | |
| key_file = os.path.join(os.environ['HOME'], '.aws', 'credentials') | |
| parser = SafeConfigParser() | |
| parser.read(key_file) | |
| timeStamp = datetime.fromtimestamp(os.path.getmtime(key_file)).strftime("%b-%d-%y-%H:%M:%S") | |
| key_bak = "%s_%s.bak" % (key_file, timeStamp) | |
| def generate_list_from_parser(parser): | |
| lst = [] | |
| for profile in parser.sections(): | |
| lst.append(profile) | |
| return lst | |
| def get_aws_access_key_id(profile): | |
| return parser.get(profile, 'aws_access_key_id') | |
| def find_user(key): | |
| try: | |
| key_info = iam.get_access_key_last_used(AccessKeyId=key) | |
| return key_info['UserName'] | |
| except ClientError as e: | |
| if e.response['Error']['Code'] == 'AccessDenied': | |
| print "%s does not exist in target account" % key | |
| return False | |
| def num_keys(): | |
| # See if IAM user already has more than one key | |
| paginator = iam.get_paginator('list_access_keys') | |
| try: | |
| for response in paginator.paginate(UserName=user): | |
| return len(response['AccessKeyMetadata']) | |
| except ClientError as e: | |
| if e.response['Error']['Code'] == 'ParamValidationError': | |
| raise | |
| def delete_inactive_access_key(user): | |
| try: | |
| for access_key in iam.list_access_keys(UserName = user)['AccessKeyMetadata']: | |
| if access_key['Status'] == 'Inactive': | |
| # Delete the access key. | |
| print('Deleting access key {0}.'.format(access_key['AccessKeyId'])) | |
| response = iam.delete_access_key( | |
| UserName = user, | |
| AccessKeyId = access_key['AccessKeyId'] | |
| ) | |
| except ClientError as e: | |
| raise | |
| if e.response['Error']['Code'] == 'InvalidClientTokenId': | |
| print "Not authorized to perform iam maintainence" | |
| # Create an access key | |
| def create_access_key(user): | |
| try: | |
| response = iam.create_access_key( | |
| UserName=user | |
| ) | |
| AccessKey = response['AccessKey']['AccessKeyId'] | |
| SecretAccessKey = response['AccessKey']['SecretAccessKey'] | |
| return AccessKey, SecretAccessKey | |
| except ClientError as e: | |
| if e.response['Error']['Code'] == 'LimitExceededException': | |
| print "User already has two keys, cannot add more" | |
| # Change state of first access key to inactive before deleting | |
| # this should only tru to update users with one key | |
| def update_access_key(key, user): | |
| iam.update_access_key( | |
| AccessKeyId=key, | |
| Status='Inactive', | |
| UserName=user | |
| ) | |
| def write_creds(profile, keyid, secret, keyfile): | |
| parser.set(profile, 'aws_access_key_id', keyid) | |
| parser.set(profile, 'aws_secret_access_key', secret) | |
| # Writing our configuration file to 'example.ini' | |
| with open(keyfile, 'wb') as configfile: | |
| parser.write(configfile) | |
| #### This should be the start of the main function #### | |
| # first create backup | |
| shutil.copy(key_file, key_bak) | |
| if os.getenv("TARGET_PROFILE"): | |
| # targeting only one account | |
| profiles = os.environ["TARGET_PROFILE"] | |
| profiles = [profiles] | |
| else: | |
| #targeting all accounts | |
| profiles = generate_list_from_parser(parser) | |
| keys = [] | |
| for p in profiles: | |
| # you'll need to decide which profile should match the default profile, if applicable | |
| # if p == 'default': | |
| # #skip the default profile because on build servers it matches nonprod | |
| # continue | |
| key = get_aws_access_key_id(p) | |
| if key in keys: # Don't do this twice if default is the same key as some other profile | |
| print "Will not rotate %s, list duplicate." % p | |
| continue | |
| keys.append(key) | |
| # create a custom session to target account based on profile | |
| os.environ["AWS_PROFILE"] = p | |
| session = boto3.session.Session() | |
| iam = session.client('iam') | |
| # Get the user from the key on the host | |
| if find_user(key): | |
| user = find_user(key) | |
| else: | |
| print "Not rotating %s. Moving on..." % key | |
| continue | |
| # 2. Can we add a key for this user? If not, delete the inactive one | |
| if num_keys() == 2: | |
| print "User " + user +" in "+ p +" account " + "has this many keys:", num_keys() # num_keys debugging | |
| try: | |
| delete_inactive_access_key(user) | |
| except: | |
| print "Cannot delete inactive access key for " + user | |
| continue | |
| # 3. Add a secondary key for the users we can add keys to | |
| creds = create_access_key(user) | |
| print "Created: " + creds[0] + " in: " + p | |
| # 5. deactivate the original keys for each user | |
| update_access_key(key, user) | |
| print "Successfully deactivated " + key + " in " + p | |
| # 6. rotate user and secret of each profile | |
| print "Writing creds to " + key_file + "..." | |
| write_creds(p, creds[0], creds[1], key_file) | |
| # if p == 'your_default': | |
| # # default is always matched with your_default on build servers | |
| # print "Matching default profile with your_default..." | |
| # write_creds('default', creds[0], creds[1], key_file) |
Hi andy, I'm having errors on your script and i don't know how to solve them .... I've tried it but i cannot.
it's into the "write_creds" function
can you help me please?
Created: AKIAUTAXJLLBU35B6FFO in: default
Successfully deactivated AKIAUTAXJLLBTVNVLA7A in default
Writing creds to /home/alejandro/.aws/credentials...
Traceback (most recent call last):
File "v3.py", line 140, in
write_creds(p, creds[0], creds[1], key_file)
File "v3.py", line 89, in write_creds
parser.write(configfile)
File "/usr/lib/python3.6/configparser.py", line 919, in write
self._sections[section].items(), d)
File "/usr/lib/python3.6/configparser.py", line 923, in _write_section
fp.write("[{}]\n".format(section_name))
TypeError: a bytes-like object is required, not 'str'
@aladme, you should be using
with open(keyfile, 'w') as configfile:
instead of
with open(keyfile, 'wb') as configfile:
In def create_access_key I receive ClientError Code "LimitExceeded" rather than "LimitExceededException".
Any reason for the difference?