Skip to content

Instantly share code, notes, and snippets.

@hussfelt
Created October 15, 2018 19:53
Show Gist options
  • Save hussfelt/a056d9ee990b95e4e3ba95e8b2a05003 to your computer and use it in GitHub Desktop.
Save hussfelt/a056d9ee990b95e4e3ba95e8b2a05003 to your computer and use it in GitHub Desktop.
Use Terraform and Lambda to automatically purge old IAM user passwords and access keys

Purge IAM secrets

This terraform module and lambda will remove passwords and secrets that have been unused for too long.

Usage

Put the files in a module folder, for example modules/purge-iam-secrets Use module like this within your terraform plan:

module "remove_iam_secrets" {
  source = "modules/purge-iam-secrets"

  notification_url  = "https://hooks.slack.com/services/*/*/*"
  notification_json = "{\"text\":\"Removed secrets and/or passwords for users: %%users%%\"}"
  days              = "30"
  purge             = "False"
  dryrun            = "True"
}
  • You can pass notifications to any endpoint - not only slack.
  • The notification JSON has two placeholders, %%users_password%% and &&users_key%%

Inspired by some work I did together with @abjorshammar once.

data "aws_iam_policy_document" "iam_security" {
statement {
sid = ""
effect = "Allow"
actions = [
"iam:ListUsers",
"iam:GetUser",
"iam:ListAccessKeys",
"iam:GetAccessKeyLastUsed",
"iam:DeleteAccessKey",
"iam:DeleteLoginProfile",
"logs:PutLogEvents",
"logs:CreateLogStream",
"logs:CreateLogGroup",
]
resources = [
"*",
]
}
}
resource "aws_iam_policy" "iam_security" {
name = "TerraformLambdaIAMSecurityPolicy"
policy = "${data.aws_iam_policy_document.iam_security.json}"
}
data "aws_iam_policy_document" "iam_security_assume" {
statement {
sid = "LambdaIAMSecurityAssumePolicy"
effect = "Allow"
actions = ["sts:AssumeRole"]
principals {
type = "Service"
identifiers = [
"lambda.amazonaws.com",
]
}
}
}
resource "aws_iam_role" "iam_security" {
name = "TerraformLambdaIAMSecurityRole"
assume_role_policy = "${data.aws_iam_policy_document.iam_security_assume.json}"
}
resource "aws_iam_instance_profile" "iam_security" {
name = "lambda-iam_security-iam-profile"
role = "${aws_iam_role.iam_security.name}"
}
resource "aws_iam_policy_attachment" "iam_security" {
name = "lambda-iam_security-iam-policy-attachment"
roles = ["${aws_iam_role.iam_security.name}"]
policy_arn = "${aws_iam_policy.iam_security.arn}"
}
resource "aws_iam_role_policy_attachment" "iam_security_execution" {
role = "${aws_iam_role.iam_security.name}"
policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}
data "archive_file" "init" {
type = "zip"
source_file = "${path.module}/main.py"
output_path = "${path.module}/main.zip"
}
resource "aws_lambda_function" "lambda" {
filename = "${path.module}/main.zip"
function_name = "purge-iam-secrets"
role = "${aws_iam_role.iam_security.arn}"
handler = "main.handler"
source_code_hash = "${base64sha256(file("${path.module}/main.zip"))}"
runtime = "python3.6"
timeout = 300
environment {
variables = {
NOTIFICATION_URL = "${var.notification_url}"
NOTIFICATION_JSON = "${var.notification_json}"
DAYS = "${var.days}"
PURGE = "${var.purge}"
DRYRUN = "${var.dryrun}"
}
}
depends_on = ["data.archive_file.init"]
}
import boto3
import logging
import json
import os
from botocore.vendored import requests
from datetime import datetime, timedelta, timezone
# Setup logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
NOTIFICATION_URL = os.environ['NOTIFICATION_URL']
NOTIFICATION_JSON = os.environ['NOTIFICATION_JSON']
DAYS = os.environ['DAYS']
PURGE = os.environ['PURGE']
DRYRUN = os.environ['DRYRUN']
def handler(event, context):
threshold = datetime.now(timezone.utc) + timedelta(-int(DAYS))
# Get all users with keys and check if they've expired
keyList = check_user_keys(threshold)
should_purge = (PURGE == "True")
if should_purge:
userRemoved = remove_old_keys(keyList, DRYRUN)
if NOTIFICATION_URL != '':
notify(userRemoved)
else:
logger.info('NOTIFICATION_URL not set, no notification sent.')
return(json.dumps(userRemoved))
else:
return(json.dumps(keyList))
def check_user_keys(threshold):
logger.info('Checking user keys...')
resource = boto3.resource('iam')
client = boto3.client("iam")
keyList = []
# Get all users
for user in resource.users.all():
UserData = client.get_user(UserName=user.user_name)['User']
KeyMetadata = client.list_access_keys(UserName=user.user_name)
tempDict = {}
# Set a username for the tempDict
tempDict['Username'] = user.user_name
try:
# Check if user has a password last used
if UserData['PasswordLastUsed'] < threshold:
logger.info('Password for user ' + user.user_name +
' has expired')
tempDict['PasswordTimedOut'] = True
tempDict['PasswordLastUsed'] = UserData['PasswordLastUsed']
else:
tempDict['PasswordTimedOut'] = False
logger.info('Password for user ' + user.user_name +
' has not expired')
except KeyError:
tempDict['PasswordTimedOut'] = False
logger.info('User ' + user.user_name +
' has no login profile')
# Check if user has a key
if KeyMetadata['AccessKeyMetadata']:
for key in KeyMetadata['AccessKeyMetadata']:
tempDict['AccessKeyId'] = key['AccessKeyId']
tempDict['Status'] = key['Status']
response = client.get_access_key_last_used(
AccessKeyId=key['AccessKeyId'])
lastUsed = response['AccessKeyLastUsed']
# If never used, check when created
if lastUsed['ServiceName'] == 'N/A':
tempDict['LastUsed'] = 'never'
if key['CreateDate'] < threshold:
tempDict['KeyTimedOut'] = True
logger.info('Key for user ' + user.user_name +
' has expired')
else:
tempDict['KeyTimedOut'] = False
logger.info('Key for user ' + user.user_name +
' has not expired')
# Else has it been used in the last 30 days
else:
lastUsedDate = lastUsed['LastUsedDate']
tempDict['LastUsed'] = lastUsedDate.strftime(
'%Y-%m-%d %H:%M:%S')
if lastUsedDate < threshold:
tempDict['KeyTimedOut'] = True
logger.info('Key for user ' + user.user_name +
' has expired')
else:
tempDict['KeyTimedOut'] = False
logger.info('Key for user ' + user.user_name +
' has not expired')
else:
tempDict['KeyTimedOut'] = False
logger.info('No key found for user ' + user.user_name)
# Append to key-list if any of them has expired
if tempDict['KeyTimedOut'] or tempDict['PasswordTimedOut']:
keyList.append(tempDict)
return(keyList)
def remove_old_keys(keyList, dryrun=False):
logger.info('Removing expired user keys...')
userKeyRemoved = []
userPasswordRemoved = []
client = boto3.client("iam")
for user in keyList:
try:
if user['KeyTimedOut']:
if dryrun:
logger.info('Would remove expired key for ' +
user['Username'] + ' (dryrun)')
else:
logger.info('Removing expired key for ' + user['Username'])
response = client.delete_access_key(
UserName=user['Username'],
AccessKeyId=user['AccessKeyId'])
userKeyRemoved.append(user['Username'])
except KeyError:
logger.info('No keys to remove for ' + user['Username'])
try:
if user['PasswordTimedOut']:
if dryrun:
logger.info('Would remove expired password for ' +
user['Username'] + ' (dryrun)')
else:
logger.info(
'Removing expired password for ' + user['Username']
)
response = client.delete_login_profile(
UserName=user['Username'])
userPasswordRemoved.append(user['Username'])
except KeyError:
logger.info('No password to remove for ' + user['Username'])
msg = 'Removed ' + str(len(userKeyRemoved)) + ' keys'
logger.info(msg)
msg = 'Removed ' + str(len(userPasswordRemoved)) + ' passwords'
logger.info(msg)
return({
'userKeyRemoved': userKeyRemoved,
'userPasswordRemoved': userPasswordRemoved
})
def notify(removed):
logger.info('Preparing to send notification: {}'.format(NOTIFICATION_URL))
# Convert list of users to string
pwds = ', '.join(removed['userPasswordRemoved'])
keys = ', '.join(removed['userKeyRemoved'])
payloadString = NOTIFICATION_JSON
if pwds.strip() != '':
payloadString = payloadString.replace("%%users_password%%", pwds)
else:
payloadString = payloadString.replace("%%users_password%%", '-')
if keys.strip() != '':
payloadString = payloadString.replace("%%users_key%%", keys)
else:
payloadString = payloadString.replace("%%users_key%%", '-')
logger.info('Payload:')
logger.info(payloadString)
# Send alert request
request = requests.post(
NOTIFICATION_URL,
data=payloadString,
headers={'Content-Type': 'application/json'}
)
if request.status_code == requests.codes.ok:
return('Notification sent!')
else:
return(
'Notification failed with: "' +
str(request.status_code) +
',' +
str(request.data) +
'"'
)
provider "aws" {}
resource "aws_cloudwatch_event_rule" "iam_security_rule" {
name = "iam-security"
description = "Checking that IAM Passwords and Keys have not expired"
# Run every 24th hour
schedule_expression = "cron(0 0 * * ? *)"
}
resource "aws_cloudwatch_event_target" "iam_security" {
target_id = "iam-security-lambda-target"
rule = "${aws_cloudwatch_event_rule.iam_security_rule.name}"
arn = "${aws_lambda_function.lambda.arn}"
}
resource "aws_lambda_permission" "iam_security" {
statement_id = "AllowExecutionFromCloudWatch"
action = "lambda:InvokeFunction"
function_name = "${aws_lambda_function.lambda.function_name}"
principal = "events.amazonaws.com"
source_arn = "${aws_cloudwatch_event_rule.iam_security_rule.arn}"
}
variable "notification_url" {}
variable "notification_json" {}
variable "days" {}
variable "purge" {
default = "True"
}
variable "dryrun" {
default = "True"
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment