Last active
November 12, 2018 14:35
-
-
Save smstec/28d77db596f037a792b6d909652d2607 to your computer and use it in GitHub Desktop.
Creates EC2 security group across region/VPCs to access SSH.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import division, print_function | |
import click | |
import boto3 | |
import os | |
import socket | |
from botocore.exceptions import ClientError | |
def create_security_groups(region, ipv4, vpcid, group_name): | |
new_group = False | |
client = boto3.client('ec2', region_name=region) | |
resource = boto3.resource('ec2', region_name=region) | |
try: | |
response = client.create_security_group(Description=group_name, GroupName=group_name, VpcId=vpcid) | |
gid = response['GroupId'] | |
new_group = True | |
except ClientError as e: | |
response = client.describe_security_groups(Filters=[{'Name': 'group-name', 'Values': [group_name]}, | |
{'Name': 'vpc-id', 'Values': [vpcid]}]) | |
for group in response['SecurityGroups']: | |
if group['VpcId'] == vpcid: | |
gid = group['GroupId'] | |
break | |
try: | |
sec_group = resource.SecurityGroup(gid) | |
except Exception as e: | |
print('Trouble creating/finding the security group to write.') | |
raise e | |
sec_group.create_tags(Tags=[{'Key': 'Name', 'Value': group_name}]) | |
# Configure update | |
cidr = "{}/32".format(ipv4) | |
desc = group_name | |
args = {'IpPermissions': [{'FromPort': 22, 'ToPort': 22, 'IpProtocol': 'tcp', | |
'IpRanges': [{'CidrIp': cidr, 'Description': desc}]}]} | |
try: | |
sec_group.authorize_ingress(**args) | |
except ClientError: | |
pass | |
return gid, new_group | |
def list_regions(): | |
ec2 = boto3.client('ec2') | |
regions = ec2.describe_regions() | |
return [region['RegionName'] for region in regions['Regions']] | |
def list_availability_zones(region): | |
ec2 = boto3.client('ec2', region_name=region) | |
response = ec2.describe_availability_zones() | |
return [zone['ZoneName'] for zone in response['AvailabilityZones']] | |
def list_vpcs(region): | |
ec2 = boto3.resource('ec2', region_name=region) | |
return [vpc.id for vpc in ec2.vpcs.filter()] | |
def get_ssh_address(): | |
try: | |
ipv4 = os.environ.get('SSH_CONNECTION').split()[0] | |
except: | |
ipv4 = None | |
return ipv4 | |
def is_valid_ipv4_address(address): | |
# Stolen from: https://stackoverflow.com/a/4017219/1713013 | |
try: | |
socket.inet_pton(socket.AF_INET, address) | |
except AttributeError: # no inet_pton here, sorry | |
try: | |
socket.inet_aton(address) | |
except socket.error: | |
return False | |
except socket.error: # not a valid address | |
return False | |
return address.count('.') == 3 | |
@click.command(context_settings=dict(max_content_width=128)) | |
@click.argument('group_name') | |
@click.argument('regions', nargs=-1) | |
@click.option('--ip', default=get_ssh_address(), show_default=True, help="IP Address to add to security group") | |
def main(group_name, regions, ip): | |
""" | |
Create a security group across regions and VPCs that enables SSH access from the computer accessing this | |
EC2 instance. | |
""" | |
if ip is None or not is_valid_ipv4_address(ip): | |
print('Invalid IP address: {}'.format(ip)) | |
return -1 | |
aws_regions = list_regions() | |
for region in regions: | |
if region not in aws_regions: | |
print('Invalid region: {}'.format(region)) | |
continue | |
for vpc in list_vpcs(region): | |
group, new_group = create_security_groups(region, ip, vpc, group_name) | |
verb = "Created" | |
if not new_group: | |
verb = 'Updated' | |
print('{} Group {}:{} in AWS:{} in {}'.format(verb, group_name, group, region, vpc)) | |
if __name__ == "__main__": | |
# pylint: disable=no-value-for-parameter | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Running as-is will require the ec2 instance to have an assigned IAM role that has the right permissions.