Skip to content

Instantly share code, notes, and snippets.

@griggheo
Created January 11, 2018 23:46
Show Gist options
  • Save griggheo/13888f0ea423e33cef79d7df3f5e1eb7 to your computer and use it in GitHub Desktop.
Save griggheo/13888f0ea423e33cef79d7df3f5e1eb7 to your computer and use it in GitHub Desktop.
import boto3
import hashlib
import json
import urllib2
# Ports your application uses that need inbound permissions from the service for
INGRESS_PORTS = {
'web' : [80, 443],
'ssh': [22,]
}
# Tags which identify the security groups you want to update
SECURITY_GROUP_TAG_FOR_WEB = { 'LambdaUpdate': 'web'}
SECURITY_GROUP_TAG_FOR_SSH = { 'LambdaUpdate': 'ssh'}
def lambda_handler(event, context):
cidr_blocks = list(event.values())
result = update_security_groups(cidr_blocks)
return result
def update_security_groups(cidr_blocks):
client = boto3.client('ec2')
web_group = get_security_groups_for_update(client, SECURITY_GROUP_TAG_FOR_WEB)
ssh_group = get_security_groups_for_update(client, SECURITY_GROUP_TAG_FOR_SSH)
print ('Found ' + str(len(web_group)) + ' WebSecurityGroups to update')
print ('Found ' + str(len(ssh_group)) + ' SshSecurityGroups to update')
result = list()
web_updated = 0
ssh_updated = 0
for group in web_group:
for port in INGRESS_PORTS['web']:
if update_security_group(client, group, cidr_blocks, port):
web_updated += 1
result.append('Updated ' + group['GroupId'])
for group in ssh_group:
for port in INGRESS_PORTS['ssh']:
if update_security_group(client, group, cidr_blocks, port):
ssh_updated += 1
result.append('Updated ' + group['GroupId'])
result.append('Updated ' + str(web_updated) + ' of ' + str(len(web_group)) + ' WebSecurityGroups')
result.append('Updated ' + str(ssh_updated) + ' of ' + str(len(ssh_group)) + ' SshSecurityGroups')
return result
def update_security_group(client, group, cidr_blocks, port):
added = 0
if len(group['IpPermissions']) > 0:
for permission in group['IpPermissions']:
if permission['FromPort'] <= port and permission['ToPort'] >= port:
old_prefixes = list()
to_add = list()
for cidr_block in cidr_blocks:
if old_prefixes.count(cidr_block) == 0:
to_add.append({ 'CidrIp': cidr_block })
print(group['GroupId'] + ": Adding " + cidr_block + ":" + str(permission['ToPort']))
added += add_permissions(client, group, permission, to_add)
else:
to_add = list()
for cidr_block in cidr_blocks:
to_add.append({ 'CidrIp': cidr_block })
print(group['GroupId'] + ": Adding " + cidr_block + ":" + str(port))
permission = { 'ToPort': port, 'FromPort': port, 'IpProtocol': 'tcp'}
added += add_permissions(client, group, permission, to_add)
print (group['GroupId'] + ": Added " + str(added))
return (added > 0)
def add_permissions(client, group, permission, to_add):
if len(to_add) > 0:
add_params = {
'ToPort': permission['ToPort'],
'FromPort': permission['FromPort'],
'IpRanges': to_add,
'IpProtocol': permission['IpProtocol']
}
client.authorize_security_group_ingress(GroupId=group['GroupId'], IpPermissions=[add_params])
return len(to_add)
def get_security_groups_for_update(client, security_group_tag):
filters = list();
for key, value in security_group_tag.iteritems():
filters.extend(
[
{ 'Name': "tag-key", 'Values': [ key ] },
{ 'Name': "tag-value", 'Values': [ value ] }
]
)
response = client.describe_security_groups(Filters=filters)
return response['SecurityGroups']
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment