Skip to content

Instantly share code, notes, and snippets.

@steder
Created December 19, 2011 19:11
Show Gist options
  • Save steder/1498451 to your computer and use it in GitHub Desktop.
Save steder/1498451 to your computer and use it in GitHub Desktop.
Create and update AWS security groups using Python and Boto.
#!/usr/bin/env python
"""
Recipe for creating and updating security groups programmatically.
"""
import collections
import boto
# Follow instruction at http://www.datastax.com/docs/1.0/install/install_ami
# to define the cluster security group rules and client security group rules.
SecurityGroupRule = collections.namedtuple("SecurityGroupRule", ["ip_protocol", "from_port", "to_port", "cidr_ip", "src_group_name"])
CASSANDRA_RULES = [
SecurityGroupRule("tcp", "22", "22", "0.0.0.0/0", None),
SecurityGroupRule("tcp", "1024", "65535", "0.0.0.0/0", "Cassandra Cluster"),
SecurityGroupRule("tcp", "7000", "7000", "0.0.0.0/0", "Cassandra Cluster"),
SecurityGroupRule("tcp", "61620", "61621", "0.0.0.0/0", "Cassandra Cluster"),
SecurityGroupRule("tcp", "7199", "7199", "0.0.0.0/0", None),
SecurityGroupRule("tcp", "8888", "8888", "0.0.0.0/0", None),
SecurityGroupRule("tcp", "8983", "8983", "0.0.0.0/0", None),
SecurityGroupRule("tcp", "9160", "9160", "0.0.0.0/0", None),
]
TEST_RULES = [
# ssh makes life possible
SecurityGroupRule("tcp", "22", "22", "0.0.0.0/0", None),
]
SECURITY_GROUPS = [("Cassandra Cluster", CASSANDRA_RULES),
("Test", TEST_RULES)
]
def get_or_create_security_group(c, group_name, description=""):
"""
"""
groups = [g for g in c.get_all_security_groups() if g.name == group_name]
group = groups[0] if groups else None
if not group:
print "Creating group '%s'..."%(group_name,)
group = c.create_security_group(group_name, "A group for %s"%(group_name,))
return group
def modify_sg(c, group, rule, authorize=False, revoke=False):
src_group = None
if rule.src_group_name:
src_group = c.get_all_security_groups([rule.src_group_name,])[0]
if authorize and not revoke:
print "Authorizing missing rule %s..."%(rule,)
group.authorize(ip_protocol=rule.ip_protocol,
from_port=rule.from_port,
to_port=rule.to_port,
cidr_ip=rule.cidr_ip,
src_group=src_group)
elif not authorize and revoke:
print "Revoking unexpected rule %s..."%(rule,)
group.revoke(ip_protocol=rule.ip_protocol,
from_port=rule.from_port,
to_port=rule.to_port,
cidr_ip=rule.cidr_ip,
src_group=src_group)
def authorize(c, group, rule):
"""Authorize `rule` on `group`."""
return modify_sg(c, group, rule, authorize=True)
def revoke(c, group, rule):
"""Revoke `rule` on `group`."""
return modify_sg(c, group, rule, revoke=True)
def update_security_group(c, group, expected_rules):
"""
"""
print 'Updating group "%s"...'%(group.name,)
import pprint
print "Expected Rules:"
pprint.pprint(expected_rules)
current_rules = []
for rule in group.rules:
if not rule.grants[0].cidr_ip:
current_rule = SecurityGroupRule(rule.ip_protocol,
rule.from_port,
rule.to_port,
"0.0.0.0/0",
rule.grants[0].name)
else:
current_rule = SecurityGroupRule(rule.ip_protocol,
rule.from_port,
rule.to_port,
rule.grants[0].cidr_ip,
None)
if current_rule not in expected_rules:
revoke(c, group, current_rule)
else:
current_rules.append(current_rule)
print "Current Rules:"
pprint.pprint(current_rules)
for rule in expected_rules:
if rule not in current_rules:
authorize(c, group, rule)
def create_security_groups():
"""
attempts to be idempotent:
if the sg does not exist create it,
otherwise just check that the security group contains the rules
we expect it to contain and updates it if it does not.
"""
c = boto.connect_ec2()
for group_name, rules in SECURITY_GROUPS:
group = get_or_create_security_group(c, group_name)
update_security_group(c, group, rules)
if __name__=="__main__":
create_security_groups()
@steder
Copy link
Author

steder commented Feb 14, 2015

@gyoza what are your rules?

@steder
Copy link
Author

steder commented Feb 14, 2015

This seems to be working fine with my account with PowerUser access rights. Double check that you have permissions and that your ~/.boto configuration file is correct.

(aws)~/Projects/aws-snippets /master> python aws_sg_recipe.py
Creating group 'Cassandra Cluster'...
Updating group "Cassandra Cluster"...
Expected Rules:
[SecurityGroupRule(ip_protocol='tcp', from_port='22', to_port='22', cidr_ip='0.0.0.0/0', src_group_name=None),
 SecurityGroupRule(ip_protocol='tcp', from_port='1024', to_port='65535', cidr_ip='0.0.0.0/0', src_group_name='Cassandra Cluster'),
 SecurityGroupRule(ip_protocol='tcp', from_port='7000', to_port='7000', cidr_ip='0.0.0.0/0', src_group_name='Cassandra Cluster'),
 SecurityGroupRule(ip_protocol='tcp', from_port='61620', to_port='61621', cidr_ip='0.0.0.0/0', src_group_name='Cassandra Cluster'),
 SecurityGroupRule(ip_protocol='tcp', from_port='7199', to_port='7199', cidr_ip='0.0.0.0/0', src_group_name=None),
 SecurityGroupRule(ip_protocol='tcp', from_port='8888', to_port='8888', cidr_ip='0.0.0.0/0', src_group_name=None),
 SecurityGroupRule(ip_protocol='tcp', from_port='8983', to_port='8983', cidr_ip='0.0.0.0/0', src_group_name=None),
 SecurityGroupRule(ip_protocol='tcp', from_port='9160', to_port='9160', cidr_ip='0.0.0.0/0', src_group_name=None)]
Current Rules:
[]
Authorizing missing rule SecurityGroupRule(ip_protocol='tcp', from_port='22', to_port='22', cidr_ip='0.0.0.0/0', src_group_name=None)...
Authorizing missing rule SecurityGroupRule(ip_protocol='tcp', from_port='1024', to_port='65535', cidr_ip='0.0.0.0/0', src_group_name='Cassandra Cluster')...
Authorizing missing rule SecurityGroupRule(ip_protocol='tcp', from_port='7000', to_port='7000', cidr_ip='0.0.0.0/0', src_group_name='Cassandra Cluster')...
Authorizing missing rule SecurityGroupRule(ip_protocol='tcp', from_port='61620', to_port='61621', cidr_ip='0.0.0.0/0', src_group_name='Cassandra Cluster')...
Authorizing missing rule SecurityGroupRule(ip_protocol='tcp', from_port='7199', to_port='7199', cidr_ip='0.0.0.0/0', src_group_name=None)...
Authorizing missing rule SecurityGroupRule(ip_protocol='tcp', from_port='8888', to_port='8888', cidr_ip='0.0.0.0/0', src_group_name=None)...
Authorizing missing rule SecurityGroupRule(ip_protocol='tcp', from_port='8983', to_port='8983', cidr_ip='0.0.0.0/0', src_group_name=None)...
Authorizing missing rule SecurityGroupRule(ip_protocol='tcp', from_port='9160', to_port='9160', cidr_ip='0.0.0.0/0', src_group_name=None)...
Creating group 'Test'...
Updating group "Test"...
Expected Rules:
[SecurityGroupRule(ip_protocol='tcp', from_port='22', to_port='22', cidr_ip='0.0.0.0/0', src_group_name=None)]
Current Rules:
[]
Authorizing missing rule SecurityGroupRule(ip_protocol='tcp', from_port='22', to_port='22', cidr_ip='0.0.0.0/0', src_group_name=None)...
(aws)~/Projects/aws-snippets /master> python aws_sg_recipe.py
Updating group "Cassandra Cluster"...
Expected Rules:
[SecurityGroupRule(ip_protocol='tcp', from_port='22', to_port='22', cidr_ip='0.0.0.0/0', src_group_name=None),
 SecurityGroupRule(ip_protocol='tcp', from_port='1024', to_port='65535', cidr_ip='0.0.0.0/0', src_group_name='Cassandra Cluster'),
 SecurityGroupRule(ip_protocol='tcp', from_port='7000', to_port='7000', cidr_ip='0.0.0.0/0', src_group_name='Cassandra Cluster'),
 SecurityGroupRule(ip_protocol='tcp', from_port='61620', to_port='61621', cidr_ip='0.0.0.0/0', src_group_name='Cassandra Cluster'),
 SecurityGroupRule(ip_protocol='tcp', from_port='7199', to_port='7199', cidr_ip='0.0.0.0/0', src_group_name=None),
 SecurityGroupRule(ip_protocol='tcp', from_port='8888', to_port='8888', cidr_ip='0.0.0.0/0', src_group_name=None),
 SecurityGroupRule(ip_protocol='tcp', from_port='8983', to_port='8983', cidr_ip='0.0.0.0/0', src_group_name=None),
 SecurityGroupRule(ip_protocol='tcp', from_port='9160', to_port='9160', cidr_ip='0.0.0.0/0', src_group_name=None)]
Current Rules:
[SecurityGroupRule(ip_protocol=u'tcp', from_port=u'1024', to_port=u'65535', cidr_ip='0.0.0.0/0', src_group_name=u'Cassandra Cluster'),
 SecurityGroupRule(ip_protocol=u'tcp', from_port=u'7000', to_port=u'7000', cidr_ip='0.0.0.0/0', src_group_name=u'Cassandra Cluster'),
 SecurityGroupRule(ip_protocol=u'tcp', from_port=u'61620', to_port=u'61621', cidr_ip='0.0.0.0/0', src_group_name=u'Cassandra Cluster'),
 SecurityGroupRule(ip_protocol=u'tcp', from_port=u'22', to_port=u'22', cidr_ip=u'0.0.0.0/0', src_group_name=None),
 SecurityGroupRule(ip_protocol=u'tcp', from_port=u'7199', to_port=u'7199', cidr_ip=u'0.0.0.0/0', src_group_name=None),
 SecurityGroupRule(ip_protocol=u'tcp', from_port=u'8888', to_port=u'8888', cidr_ip=u'0.0.0.0/0', src_group_name=None),
 SecurityGroupRule(ip_protocol=u'tcp', from_port=u'8983', to_port=u'8983', cidr_ip=u'0.0.0.0/0', src_group_name=None),
 SecurityGroupRule(ip_protocol=u'tcp', from_port=u'9160', to_port=u'9160', cidr_ip=u'0.0.0.0/0', src_group_name=None)]
Updating group "Test"...
Expected Rules:
[SecurityGroupRule(ip_protocol='tcp', from_port='22', to_port='22', cidr_ip='0.0.0.0/0', src_group_name=None)]
Current Rules:
[SecurityGroupRule(ip_protocol=u'tcp', from_port=u'22', to_port=u'22', cidr_ip=u'0.0.0.0/0', src_group_name=None)]

@ozbillwang
Copy link

src_group_name format has been changed to sg-XXXXXXXX-owner_id, but in your code, rule.grants[0].name is always None, so I got this error:

InvalidPermission.NotFoundThe specified rule does not exist in this security group.

It is fine to create a new security group, then I see the same error as @gyoza, if I change one rule (for example, change a from_port).

Another issue is, cidr_ip can be list. This line need be fixed.

https://gist.github.com/steder/1498451#file-aws_sg_recipe-py-L100

@jpatallah
Copy link

Lines 89-101 should be:

for rule in group.rules:
    for grant in rule.grants:
        if not grant.cidr_ip:
            current_rule = SecurityGroupRule(rule.ip_protocol,
                                             rule.from_port,
                                             rule.to_port,
                                             "0.0.0.0/0",
                                             grant.name)
        else:
            current_rule = SecurityGroupRule(rule.ip_protocol,
                                             rule.from_port,
                                             rule.to_port,
                                             grant.cidr_ip,
                                             None)

@presho-anna
Copy link

I am trying to run the update method, and I have received "AttributeError: 'tuple' object has no attribute 'name' ".
This is what I have tried:
update_security_group(boto.connect_ec2(), SECURITY_GROUPS[0], ["tcp", "5000", "5000", "0.0.0.0/0", "Test"])

Any reason why it's failing?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment