|
import boto3 |
|
import logging |
|
import json |
|
import os |
|
from botocore.vendored import requests |
|
from datetime import datetime, timedelta, timezone |
|
|
|
# Setup logging |
|
logger = logging.getLogger() |
|
logger.setLevel(logging.INFO) |
|
NOTIFICATION_URL = os.environ['NOTIFICATION_URL'] |
|
NOTIFICATION_JSON = os.environ['NOTIFICATION_JSON'] |
|
DAYS = os.environ['DAYS'] |
|
PURGE = os.environ['PURGE'] |
|
DRYRUN = os.environ['DRYRUN'] |
|
|
|
|
|
def handler(event, context): |
|
threshold = datetime.now(timezone.utc) + timedelta(-int(DAYS)) |
|
|
|
# Get all users with keys and check if they've expired |
|
keyList = check_user_keys(threshold) |
|
|
|
should_purge = (PURGE == "True") |
|
|
|
if should_purge: |
|
userRemoved = remove_old_keys(keyList, DRYRUN) |
|
if NOTIFICATION_URL != '': |
|
notify(userRemoved) |
|
else: |
|
logger.info('NOTIFICATION_URL not set, no notification sent.') |
|
|
|
return(json.dumps(userRemoved)) |
|
else: |
|
return(json.dumps(keyList)) |
|
|
|
|
|
def check_user_keys(threshold): |
|
logger.info('Checking user keys...') |
|
resource = boto3.resource('iam') |
|
client = boto3.client("iam") |
|
keyList = [] |
|
|
|
# Get all users |
|
for user in resource.users.all(): |
|
UserData = client.get_user(UserName=user.user_name)['User'] |
|
KeyMetadata = client.list_access_keys(UserName=user.user_name) |
|
|
|
tempDict = {} |
|
|
|
# Set a username for the tempDict |
|
tempDict['Username'] = user.user_name |
|
|
|
try: |
|
# Check if user has a password last used |
|
if UserData['PasswordLastUsed'] < threshold: |
|
logger.info('Password for user ' + user.user_name + |
|
' has expired') |
|
|
|
tempDict['PasswordTimedOut'] = True |
|
tempDict['PasswordLastUsed'] = UserData['PasswordLastUsed'] |
|
else: |
|
tempDict['PasswordTimedOut'] = False |
|
logger.info('Password for user ' + user.user_name + |
|
' has not expired') |
|
except KeyError: |
|
tempDict['PasswordTimedOut'] = False |
|
logger.info('User ' + user.user_name + |
|
' has no login profile') |
|
|
|
# Check if user has a key |
|
if KeyMetadata['AccessKeyMetadata']: |
|
for key in KeyMetadata['AccessKeyMetadata']: |
|
tempDict['AccessKeyId'] = key['AccessKeyId'] |
|
tempDict['Status'] = key['Status'] |
|
|
|
response = client.get_access_key_last_used( |
|
AccessKeyId=key['AccessKeyId']) |
|
lastUsed = response['AccessKeyLastUsed'] |
|
|
|
# If never used, check when created |
|
if lastUsed['ServiceName'] == 'N/A': |
|
tempDict['LastUsed'] = 'never' |
|
if key['CreateDate'] < threshold: |
|
tempDict['KeyTimedOut'] = True |
|
logger.info('Key for user ' + user.user_name + |
|
' has expired') |
|
else: |
|
tempDict['KeyTimedOut'] = False |
|
logger.info('Key for user ' + user.user_name + |
|
' has not expired') |
|
# Else has it been used in the last 30 days |
|
else: |
|
lastUsedDate = lastUsed['LastUsedDate'] |
|
|
|
tempDict['LastUsed'] = lastUsedDate.strftime( |
|
'%Y-%m-%d %H:%M:%S') |
|
if lastUsedDate < threshold: |
|
tempDict['KeyTimedOut'] = True |
|
logger.info('Key for user ' + user.user_name + |
|
' has expired') |
|
else: |
|
tempDict['KeyTimedOut'] = False |
|
logger.info('Key for user ' + user.user_name + |
|
' has not expired') |
|
else: |
|
tempDict['KeyTimedOut'] = False |
|
logger.info('No key found for user ' + user.user_name) |
|
|
|
# Append to key-list if any of them has expired |
|
if tempDict['KeyTimedOut'] or tempDict['PasswordTimedOut']: |
|
keyList.append(tempDict) |
|
|
|
return(keyList) |
|
|
|
|
|
def remove_old_keys(keyList, dryrun=False): |
|
logger.info('Removing expired user keys...') |
|
userKeyRemoved = [] |
|
userPasswordRemoved = [] |
|
client = boto3.client("iam") |
|
|
|
for user in keyList: |
|
try: |
|
if user['KeyTimedOut']: |
|
if dryrun: |
|
logger.info('Would remove expired key for ' + |
|
user['Username'] + ' (dryrun)') |
|
else: |
|
logger.info('Removing expired key for ' + user['Username']) |
|
response = client.delete_access_key( |
|
UserName=user['Username'], |
|
AccessKeyId=user['AccessKeyId']) |
|
userKeyRemoved.append(user['Username']) |
|
except KeyError: |
|
logger.info('No keys to remove for ' + user['Username']) |
|
|
|
try: |
|
if user['PasswordTimedOut']: |
|
if dryrun: |
|
logger.info('Would remove expired password for ' + |
|
user['Username'] + ' (dryrun)') |
|
else: |
|
logger.info( |
|
'Removing expired password for ' + user['Username'] |
|
) |
|
response = client.delete_login_profile( |
|
UserName=user['Username']) |
|
userPasswordRemoved.append(user['Username']) |
|
except KeyError: |
|
logger.info('No password to remove for ' + user['Username']) |
|
|
|
msg = 'Removed ' + str(len(userKeyRemoved)) + ' keys' |
|
logger.info(msg) |
|
msg = 'Removed ' + str(len(userPasswordRemoved)) + ' passwords' |
|
logger.info(msg) |
|
return({ |
|
'userKeyRemoved': userKeyRemoved, |
|
'userPasswordRemoved': userPasswordRemoved |
|
}) |
|
|
|
|
|
def notify(removed): |
|
logger.info('Preparing to send notification: {}'.format(NOTIFICATION_URL)) |
|
|
|
# Convert list of users to string |
|
pwds = ', '.join(removed['userPasswordRemoved']) |
|
keys = ', '.join(removed['userKeyRemoved']) |
|
|
|
payloadString = NOTIFICATION_JSON |
|
|
|
if pwds.strip() != '': |
|
payloadString = payloadString.replace("%%users_password%%", pwds) |
|
else: |
|
payloadString = payloadString.replace("%%users_password%%", '-') |
|
|
|
if keys.strip() != '': |
|
payloadString = payloadString.replace("%%users_key%%", keys) |
|
else: |
|
payloadString = payloadString.replace("%%users_key%%", '-') |
|
|
|
logger.info('Payload:') |
|
logger.info(payloadString) |
|
|
|
# Send alert request |
|
request = requests.post( |
|
NOTIFICATION_URL, |
|
data=payloadString, |
|
headers={'Content-Type': 'application/json'} |
|
) |
|
|
|
if request.status_code == requests.codes.ok: |
|
return('Notification sent!') |
|
else: |
|
return( |
|
'Notification failed with: "' + |
|
str(request.status_code) + |
|
',' + |
|
str(request.data) + |
|
'"' |
|
) |