Created
September 18, 2012 23:48
-
-
Save mattrobenolt/3746772 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import sys | |
import time | |
import urllib2 | |
import boto | |
try: | |
import simplejson as json | |
except ImportError: | |
import json | |
import ec2 | |
__version__ = '0.1.0' | |
ec2.credentials.ACCESS_KEY_ID = 'xxx' | |
ec2.credentials.SECRET_ACCESS_KEY = 'xxx' | |
def log(what, where=sys.stdout): | |
where.write(what) | |
where.flush() | |
def ok(txt): | |
return '\033[92m{0}\033[0m'.format(txt) | |
def fail(txt): | |
return '\033[91m{0}\033[0m'.format(txt) | |
class Port(object): | |
def __init__(self, port): | |
self.port = int(port) | |
self.added = False | |
def __str__(self): | |
return str(self.port) | |
def __unicode__(self): | |
return unicode(self.__str__()) | |
def __repr__(self): | |
return u'<Port: {0}>'.format(self.__unicode__()) | |
class Firewall(object): | |
group = None | |
def __init__(self, group_name, ports): | |
self.group_name = group_name | |
self.ports = ports | |
def main(self): | |
log('Looking up public IP... ') | |
self.ip = json.load(urllib2.urlopen('http://jsonip.com/'))['ip'] | |
self.cidr_ip = '{0}/32'.format(self.ip) | |
log(ok('✔ {0}'.format(self.ip))) | |
log('\nLooking up security group "{0}"... '.format(self.group_name)) | |
try: | |
self.group = ec2.security_groups.get(name__iexact=self.group_name) | |
except ec2.security_groups.DoesNotExist: | |
log(fail('✗\n')) | |
log('\nSecurity group "{0}" not found\n'.format(self.group_name), where=sys.stderr) | |
sys.exit(1) | |
except ec2.security_groups.MultipleObjectsReturned: | |
log(fail('✗\n')) | |
log('\nMore than one security group was found for "{0}"\n'.format(self.group_name), where=sys.stderr) | |
sys.exit(1) | |
log(ok('✔ {0}'.format(self.group.id))) | |
for port in self.ports: | |
log('\nOpening port {0}... '.format(port)) | |
try: | |
self.group.authorize('tcp', port, port, cidr_ip=self.cidr_ip) | |
except boto.exception.EC2ResponseError as e: | |
for reason, message in e.errors: | |
if reason == 'InvalidPermission.Duplicate': | |
log(fail('✗ already open')) | |
else: | |
print e | |
else: | |
port.added = True | |
log(ok('✔')) | |
# Check if any of the ports succeeded being opened | |
if not any([port.added for port in self.ports]): | |
log(fail('\nNo ports could be opened. :(\n')) | |
sys.exit(1) | |
log('\n\nPorts open. CONTROL-C to exit.\n') | |
def cleanup(self): | |
if self.group is None: | |
return | |
for port in self.ports: | |
if port.added: | |
log('\nClosing port {0}... '.format(port)) | |
self.group.revoke('tcp', port, port, cidr_ip=self.cidr_ip) | |
log(ok('✔')) | |
log('\n') | |
if __name__ == '__main__': | |
from optparse import OptionParser | |
parser = OptionParser(usage="%prog [SECURITY GROUP]", version=__version__) | |
parser.add_option('-p', '--ports', dest='ports', help='Comma-separated list of ports to open', default='22') | |
opts, args = parser.parse_args() | |
if len(args) == 0: | |
parser.error("incorrect number of arguments") | |
ports = map(Port, opts.ports.split(',')) | |
fw = Firewall(args[0], ports) | |
fw.main() | |
# Make sure we clean up after ourselves! | |
import atexit | |
atexit.register(fw.cleanup) | |
try: | |
# Hang until we exit the script | |
while 1: | |
time.sleep(5) | |
except KeyboardInterrupt: | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment