Skip to content

Instantly share code, notes, and snippets.

@epequeno
Created April 14, 2018 18:16
Show Gist options
  • Save epequeno/3497b989c65f64ae90db651bbe653cc4 to your computer and use it in GitHub Desktop.
Save epequeno/3497b989c65f64ae90db651bbe653cc4 to your computer and use it in GitHub Desktop.
py3 creates IAM users, attaches an SES policy, generates SES credentials, and verifies addresses
"""automates the process of adding verified email accts to SES and reporting
SMTP credentials
https://docs.aws.amazon.com/ses/latest/DeveloperGuide/smtp-credentials.html#smtp-credentials-convert
"""
# stdlib
from collections import namedtuple
import logging
import sys
import json
import hmac
import base64
import hashlib
# 3rd party
import boto3
# local
#logging config
logging.basicConfig(level=logging.WARNING,
format='%(asctime)s - %(levelname)s - %(message)s',
stream=sys.stdout)
SES_REGION = 'us-east-1' # N. Virginia
def make_client(service, region='us-east-1'):
return boto3.client(service, region_name=region)
# boto3 clients
iam = make_client('iam')
ses = make_client('ses', region=SES_REGION)
sts = make_client('sts')
AWS_ACCOUNT_ID = sts.get_caller_identity()['Account']
User = namedtuple('User', ['username', 'email'])
users = [
User('SMTP-IAM-user-name', '[email protected]')
]
def create_iam_user(user):
try:
res = iam.create_user(Path='/', UserName=user.username)
except Exception as e:
logging.error(e)
return False
user_info = res.get('User')
try:
logging.info(f'created user: {user_info["Arn"]}')
except Exception as e:
logging.error(f'{e}: {res}')
return False
return True
def create_iam_policy(user):
policy_name = user.username.replace('SMTP', 'SES') + '-policy'
policy_doc = {"Version": "2012-10-17",
"Statement": [{"Action": ["ses:*"],
"Resource": f"arn:aws:ses:{SES_REGION}:{AWS_ACCOUNT_ID}:identity/{user.email}",
"Effect": "Allow"}]}
try:
res = iam.create_policy(PolicyName=policy_name,
Path='/',
PolicyDocument=json.dumps(policy_doc),
Description=f'SES policy for {user.username}')
except Exception as e:
logging.error(e)
return
try:
policy_arn = res['Policy']['Arn']
except Exception as e:
loggging.error(f'{e} - {res}')
return
return policy_arn
def attach_policy(user, policy_arn):
try:
iam.attach_user_policy(UserName=user.username,
PolicyArn=policy_arn)
except Exception as e:
logging.error(e)
return
def create_access_key(user):
try:
res = iam.create_access_key(UserName=user.username)
except Exception as e:
logging.error(e)
return
return res['AccessKey']
def generate_ses_credentials(key):
k = key.get('SecretAccessKey').encode('ascii')
message = 'SendRawEmail'.encode('ascii')
version = b'\x02'
h = hmac.new(k, message, digestmod=hashlib.sha256).digest()
ver_and_sig = version + h
return base64.b64encode(ver_and_sig).decode()
def main():
for u in users:
# if an error occured, move on
if not create_iam_user(u):
logging.warning(f'there was a problem creating: {u}')
continue
policy_arn = create_iam_policy(u)
if policy_arn:
attach_policy(u, policy_arn)
key = create_access_key(u)
ses_credential = generate_ses_credentials(key)
key['ses_credential'] = ses_credential
file_name = f'{u.username}-credentials.json'
with open(file_name, 'w') as fd:
fd.write(json.dumps(key, default=str))
ses.verify_email_identity(EmailAddress=u.email)
print(f'{u.username} - {key.get("AccessKeyId")} - {ses_credential}')
if __name__ == '__main__':
main()
print("the credentials have been written to the current directory.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment