Skip to content

Instantly share code, notes, and snippets.

@mattrobenolt
Created September 18, 2012 23:48
Show Gist options
  • Save mattrobenolt/3746772 to your computer and use it in GitHub Desktop.
Save mattrobenolt/3746772 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import time
import urllib2
import boto
try:
import simplejson as json
except ImportError:
import json
import ec2
__version__ = '0.1.0'
ec2.credentials.ACCESS_KEY_ID = 'xxx'
ec2.credentials.SECRET_ACCESS_KEY = 'xxx'
def log(what, where=sys.stdout):
where.write(what)
where.flush()
def ok(txt):
return '\033[92m{0}\033[0m'.format(txt)
def fail(txt):
return '\033[91m{0}\033[0m'.format(txt)
class Port(object):
def __init__(self, port):
self.port = int(port)
self.added = False
def __str__(self):
return str(self.port)
def __unicode__(self):
return unicode(self.__str__())
def __repr__(self):
return u'<Port: {0}>'.format(self.__unicode__())
class Firewall(object):
group = None
def __init__(self, group_name, ports):
self.group_name = group_name
self.ports = ports
def main(self):
log('Looking up public IP... ')
self.ip = json.load(urllib2.urlopen('http://jsonip.com/'))['ip']
self.cidr_ip = '{0}/32'.format(self.ip)
log(ok('✔ {0}'.format(self.ip)))
log('\nLooking up security group "{0}"... '.format(self.group_name))
try:
self.group = ec2.security_groups.get(name__iexact=self.group_name)
except ec2.security_groups.DoesNotExist:
log(fail('✗\n'))
log('\nSecurity group "{0}" not found\n'.format(self.group_name), where=sys.stderr)
sys.exit(1)
except ec2.security_groups.MultipleObjectsReturned:
log(fail('✗\n'))
log('\nMore than one security group was found for "{0}"\n'.format(self.group_name), where=sys.stderr)
sys.exit(1)
log(ok('✔ {0}'.format(self.group.id)))
for port in self.ports:
log('\nOpening port {0}... '.format(port))
try:
self.group.authorize('tcp', port, port, cidr_ip=self.cidr_ip)
except boto.exception.EC2ResponseError as e:
for reason, message in e.errors:
if reason == 'InvalidPermission.Duplicate':
log(fail('✗ already open'))
else:
print e
else:
port.added = True
log(ok('✔'))
# Check if any of the ports succeeded being opened
if not any([port.added for port in self.ports]):
log(fail('\nNo ports could be opened. :(\n'))
sys.exit(1)
log('\n\nPorts open. CONTROL-C to exit.\n')
def cleanup(self):
if self.group is None:
return
for port in self.ports:
if port.added:
log('\nClosing port {0}... '.format(port))
self.group.revoke('tcp', port, port, cidr_ip=self.cidr_ip)
log(ok('✔'))
log('\n')
if __name__ == '__main__':
from optparse import OptionParser
parser = OptionParser(usage="%prog [SECURITY GROUP]", version=__version__)
parser.add_option('-p', '--ports', dest='ports', help='Comma-separated list of ports to open', default='22')
opts, args = parser.parse_args()
if len(args) == 0:
parser.error("incorrect number of arguments")
ports = map(Port, opts.ports.split(','))
fw = Firewall(args[0], ports)
fw.main()
# Make sure we clean up after ourselves!
import atexit
atexit.register(fw.cleanup)
try:
# Hang until we exit the script
while 1:
time.sleep(5)
except KeyboardInterrupt:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment