Created
September 5, 2021 16:42
-
-
Save kjenney/590af8d1d045c3aae2fbd2c3bbc4e248 to your computer and use it in GitHub Desktop.
Python Boto3 Create and Delete Resources
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
from botocore.exceptions import ClientError | |
import boto3 | |
from cryptography.fernet import Fernet | |
import json | |
import logging | |
from pprint import pprint | |
import uuid | |
''' | |
This script provisions the following resources: | |
1. An S3 Bucket to be used for keeping Pulumi state that one or more IAM users have access to read/write to | |
2. A KMS key for encrypting secrets in state that one or more IAM users have access to | |
3. An encryption key for encrypting secrets in Git | |
''' | |
parser = argparse.ArgumentParser() | |
parser.add_argument('-a', '--aws-region', required=False) | |
parser.add_argument('-b', '--bucket-name', required=True) | |
parser.add_argument('-u', '--iam-users', nargs='+', help='A list of IAM user ARNs', required=False) | |
parser.add_argument('-i', '--iam-name', nargs='+', help='The name of the IAM Role and Policy', default="AccessPulumiStateBucket", required=False) | |
parser.add_argument('-r', '--replace', help='replace resources if they already exist', | |
action='store_true') | |
args = parser.parse_args() | |
class Bootstrap: | |
"""Create resources to use Pulumi securely on AWS | |
If aws_region is not specified, the bucket is created in the S3 default | |
region (us-east-1). | |
If iam_users is empty it will be assumed that the user that is being | |
used to authenticate has access to the S3 bucket already. | |
If the S3 bucket already exists there is an option to replace it. This is False by default. | |
param args Object ArgumentParser object | |
return: True if bucket created, else False | |
""" | |
def __init__(self, args): | |
self.bucket_name = args.bucket_name | |
self.aws_region = args.aws_region | |
self.iam_users = self.validate_iam_users(args.iam_users) | |
self.logger = logging.getLogger(__name__) | |
self.s3_resource = self.get_s3() | |
self.iam_client = boto3.client('iam') | |
self.kms_client = boto3.client('kms') | |
self.sts_client = boto3.client('sts') | |
if self.bucket_exists(): | |
if args.replace: | |
bucket = self.replace_bucket() | |
print('S3 Bucket already exists. Replacing the bucket.') | |
else: | |
print('S3 Bucket already exists. Not replacing the bucket.') | |
else: | |
bucket = self.create_bucket() | |
print('S3 Bucket created.') | |
if self.iam_users: | |
assume_policy = self.create_iam_assume_role_policy() | |
if self.iam_role_exists(args.iam_name): | |
if args.replace: | |
self.replace_iam_role(assume_policy, args.iam_name) | |
print('IAM role already exists. Replacing the IAM role.') | |
else: | |
print('IAM role already exists. Not replacing the IAM role.') | |
else: | |
self.create_iam_role(assume_policy, args.iam_name) | |
if self.iam_policy_exists(args.iam_name): | |
if args.replace: | |
self.replace_iam_policy(bucket, args.iam_name) | |
print('IAM policy already exists. Replacing the IAM policy.') | |
else: | |
print('IAM policy already exists. Not replacing the IAM policy.') | |
else: | |
self.create_iam_policy(bucket, args.iam_name) | |
self.provision_kms_key() | |
self.provision_fernet_key() | |
def delete_bucket(self): | |
""" | |
Delete a bucket. The bucket must be empty or an error is raised. | |
Usage is shown in usage_demo at the end of this module. | |
:param bucket: The bucket to delete. | |
""" | |
bucket = self.s3_resource.Bucket(self.bucket_name) | |
try: | |
bucket.delete() | |
bucket.wait_until_not_exists() | |
self.logger.info("Bucket %s successfully deleted.", bucket.name) | |
except ClientError: | |
self.logger.exception("Couldn't delete bucket %s.", bucket.name) | |
raise | |
def replace_bucket(self): | |
"""Replace an S3 bucket""" | |
self.delete_bucket() | |
return self.create_bucket() | |
def replace_iam_role(self, assume_policy, role_name): | |
"""Replace an IAM Role that already exists""" | |
self.delete_iam_role(role_name) | |
return self.create_iam_role(assume_policy, role_name) | |
def replace_iam_policy(self, bucket, policy_name): | |
"""Replace an IAM Policy that already exists""" | |
self.delete_iam_policy(policy_name) | |
return self.create_iam_policy(bucket, policy_name) | |
def bucket_exists(self): | |
""" | |
Determine whether a bucket with the specified name exists. | |
:return: True when the bucket exists; otherwise, False. | |
""" | |
try: | |
self.s3_resource.meta.client.head_bucket(Bucket=self.bucket_name) | |
self.logger.info("Bucket %s exists.", self.bucket_name) | |
exists = True | |
except ClientError: | |
self.logger.warning("Bucket %s doesn't exist or you don't have access to it.", | |
self.bucket_name) | |
exists = False | |
return exists | |
def validate_iam_users(self, iam_users): | |
""" | |
Ensure that iam_users is a valid list | |
param iam_users: List IAM users arn's to enable access to bucket state | |
return: None if not all elements are valid arns, else List | |
""" | |
if isinstance(iam_users, list): | |
if any('arn' not in i for i in iam_users): | |
return None | |
return iam_users | |
def get_s3(self): | |
"""Get a Boto 3 S3 resource with a specific Region or with your default Region.""" | |
s3_resource = boto3.resource('s3') | |
if not self.aws_region or s3_resource.meta.client.meta.region_name == self.aws_region: | |
return s3_resource | |
else: | |
return boto3.resource('s3', region_name=self.aws_region) | |
def iam_role_exists(self, role_name): | |
""" | |
Determine whether an IAM role with the specified name exists. | |
:return: True when the IAM role exists; otherwise, False. | |
""" | |
try: | |
self.iam_client.get_role(RoleName=role_name) | |
self.logger.info("Role %s exists.", role_name) | |
exists = True | |
except ClientError: | |
self.logger.warning("IAM Role %s doesn't exist or you don't have access to it.", | |
role_name) | |
exists = False | |
return exists | |
def get_iam_policy_arn(self, policy_name): | |
"""Return an IAM Policy's ARN""" | |
account_id = self.sts_client.get_caller_identity()['Account'] | |
return f'arn:aws:iam::{account_id}:policy/{policy_name}' | |
def iam_policy_exists(self, policy_name): | |
""" | |
Determine whether an IAM policy with the specified name exists. | |
:return: True when the IAM policy exists; otherwise, False. | |
""" | |
try: | |
policy_arn = self.get_iam_policy_arn(policy_name) | |
self.iam_client.get_policy(PolicyArn=policy_arn) | |
self.logger.info("Policy %s exists.", policy_name) | |
exists = True | |
except ClientError: | |
self.logger.warning("IAM Policy %s doesn't exist or you don't have access to it.", | |
policy_name) | |
exists = False | |
return exists | |
def iam_detach_policy_from_role(self, iam_name): | |
"""Detach an IAM policy from an IAM role""" | |
policy_arn = self.get_iam_policy_arn(iam_name) | |
self.iam_client.detach_role_policy( | |
RoleName=iam_name, | |
PolicyArn=policy_arn | |
) | |
def delete_iam_role(self, role_name): | |
"""Delete an existing IAM role""" | |
self.iam_detach_policy_from_role(role_name) | |
self.iam_client.delete_role( | |
RoleName=role_name | |
) | |
def delete_iam_policy(self, policy_name): | |
"""Delete an existing IAM policy""" | |
policy_arn = self.get_iam_policy_arn(policy_name) | |
self.iam_client.delete_policy( | |
PolicyArn=policy_arn | |
) | |
def create_bucket(self): | |
""" | |
Create an Amazon S3 bucket with the specified name and in the specified Region. | |
:return: The newly created bucket. | |
""" | |
try: | |
if not self.aws_region or self.s3_resource.meta.client.meta.region_name == self.aws_region: | |
bucket = self.s3_resource.create_bucket(Bucket=self.bucket_name) | |
else: | |
bucket = self.s3_resource.create_bucket( | |
Bucket=self.bucket_name, | |
CreateBucketConfiguration={ | |
'LocationConstraint': self.aws_region | |
} | |
) | |
bucket.wait_until_exists() | |
self.logger.info("Created bucket '%s' in region=%s", bucket.name, | |
self.s3_resource.meta.client.meta.region_name) | |
except ClientError as error: | |
self.logger.exception("Couldn't create bucket named '%s' in region=%s.", | |
self.bucket_name, self.aws_region) | |
if error.response['Error']['Code'] == 'IllegalLocationConstraintException': | |
self.logger.error("When the session Region is anything other than us-east-1, " | |
"you must specify a LocationConstraint that matches the " | |
"session Region. The current session Region is %s and the " | |
"LocationConstraint Region is %s.", | |
self.s3_resource.meta.client.meta.region_name, self.aws_region) | |
raise error | |
else: | |
return bucket | |
def create_iam_policy(self, bucket, iam_name): | |
"""Create an IAM policy in the current account to be attached to an IAM role""" | |
policy_document = self.create_iam_role_policy_document(bucket) | |
policy = self.iam_client.create_policy( | |
PolicyName=iam_name, | |
PolicyDocument=policy_document | |
) | |
self.iam_client.attach_role_policy( | |
RoleName=iam_name, | |
PolicyArn=policy["Policy"]["Arn"] | |
) | |
def create_iam_role_policy_document(self, bucket): | |
""" | |
Create the IAM policy that is attachd to the IAM role granting | |
access to the state bucket | |
""" | |
dict = {} | |
dict["Version"] = "2012-10-17" | |
statements = [] | |
statements.append({"Action":["s3:ListAllMyBuckets"],"Effect":"Allow","Resource":["arn:aws:s3:::*"]}) | |
statements.append({"Action":["s3:ListBucket","s3:GetBucketLocation"],"Effect":"Allow","Resource":[f"arn:aws:s3:::{bucket.name}"]}) | |
statements.append({"Action":["s3:GetObject","s3:PutObject"],"Effect":"Allow","Resource":[f"arn:aws:s3:::{bucket.name}/*"]}) | |
dict["Statement"] = statements | |
return json.dumps(dict) | |
def create_iam_role(self, assume_policy, iam_name): | |
"""Create an IAM role in the current account to be assumed by IAM users""" | |
return self.iam_client.create_role( | |
RoleName = iam_name, | |
AssumeRolePolicyDocument = assume_policy | |
) | |
def create_iam_assume_role_policy(self): | |
""" | |
Create the AssumeRolePolicyDocument to be attached to the IAM role granting | |
access to the state bucket | |
""" | |
dict = {} | |
dict["Version"] = "2012-10-17" | |
statements = [] | |
for i in self.iam_users: | |
statements.append({"Effect":"Allow","Principal":{"AWS":f"{i}"},"Action":"sts:AssumeRole"}) | |
dict["Statement"] = statements | |
return json.dumps(dict) | |
def provision_kms_key(self): | |
print('Provisioning the KMS Key to secure Pulumi state') | |
def provision_fernet_key(self): | |
print('Provisioning the Fernet key for encrypting secrets in Git') | |
def verify_policy_attachment(self): | |
"""Verifying IAM Policy Attachment to IAM Role""" | |
def verify_iam_assume_role_policy_attachment(self): | |
"""Verifying IAM Assume Role Policy Attachment to IAM Role""" | |
bootstrap = Bootstrap(args) | |
#print(bootstrap.bucket_name) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment