Created
March 20, 2014 06:24
-
-
Save shaon/7007ecfc63d7d023aa04 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from boto.exception import EC2ResponseError | |
from eucaops import Eucaops | |
from eutester.eutestcase import EutesterTestCase | |
import time | |
import os | |
class EC2IamTest(EutesterTestCase): | |
def __init__(self): | |
self.setuptestcase() | |
self.setup_parser() | |
self.parser.add_argument("--user-name", default="admin") | |
self.unique_number = str(int(time.time())) | |
self.parser.add_argument("--user-account", default="eutester-account" + self.unique_number) | |
self.get_args() | |
self.tester = Eucaops( config_file=self.args.config, password=self.args.password ) | |
def clean_method(self): | |
pass | |
def CreateResourcesTest(self): | |
self.groups = { 'eutester_allusers': [], | |
'eutester_admins': ['user_admin01', 'user_admin02', 'user_admin03'], | |
'eutester_developers': ['user_developer01', 'user_developer02', 'user_developer03', 'user_developer04', 'user_developer05'], | |
'eutester_managers': ['user_manager01', 'user_manager02', 'user_manager03'], | |
'eutester_sysadmins': ['user_sysadmin01', 'user_sysadmin02'] } | |
self.account = self.tester.create_account(self.args.user_account) | |
keys = self.tester.create_access_key(self.args.user_name, self.args.user_account) | |
access_key = keys['access_key_id'] | |
secret_key = keys['secret_access_key'] | |
self.account_tester = Eucaops(aws_access_key_id=access_key, aws_secret_access_key=secret_key, | |
ec2_ip=self.tester.ec2.host, s3_ip=self.tester.s3.host, s3_path=self.tester.get_s3_path(), | |
username=self.args.user_name, account=self.args.user_account) | |
for group, users in self.groups.iteritems(): | |
self.account_tester.create_group(group) | |
for user in users: | |
self.account_tester.create_user(user) | |
self.account_tester.add_user_to_group(group, user) | |
eutester_allusers_policy = """ | |
{ | |
"Statement": [{ | |
"Effect": "Deny", | |
"Action": "*", | |
"Resource": "*", | |
"Condition": { | |
"NotIpAddress": { | |
"aws:SourceIp": ["10.0.0.0/8"] | |
} | |
} | |
}] | |
} | |
""" | |
eutester_developers_policy = """ | |
{ | |
"Statement": [{ | |
"Action": [ | |
"ec2:DescribeInstances", | |
"ec2:CreateKeyPair", | |
"ec2:DeleteKeyPair", | |
"ec2:DescribeKeyPairs", | |
"ec2:DescribeImages", | |
"ec2:RunInstances", | |
"ec2:StartInstances", | |
"ec2:StopInstances", | |
"ec2:TerminateInstances", | |
"ec2:AuthorizeSecurityGroupEgress", | |
"ec2:AuthorizeSecurityGroupIngress", | |
], | |
"Effect": "Allow", | |
"Resource": "*" | |
}] | |
} | |
""" | |
eutester_managers_policy = """ | |
{ | |
"Statement": [{ | |
"Action": [ | |
"ec2:Describe*" | |
], | |
"Effect": "Allow", | |
"Resource": "*" | |
}] | |
} | |
""" | |
eutester_admins_policy = """ | |
{ | |
"Statement": [{ | |
"Action": "*", | |
"Effect": "Allow", | |
"Resource": "*" | |
}] | |
} | |
""" | |
eutester_sysadmins_policy = """ | |
{ | |
"Statement": [{ | |
"Action": [ | |
"ec2:AuthorizeSecurityGroupEgress", | |
"ec2:AuthorizeSecurityGroupIngress", | |
"ec2:CreateImage", | |
"ec2:CreateSecurityGroup", | |
"ec2:CreateSnapshot", | |
"ec2:CreateVolume", | |
"ec2:DeleteSecurityGroup", | |
"ec2:DeleteSnapshot", | |
"ec2:DeleteVolume", | |
"ec2:DeregisterImage", | |
"ec2:DescribeImages", | |
"ec2:DescribeInstances", | |
"ec2:DescribeSecurityGroups", | |
"ec2:DescribeSnapshots", | |
"ec2:DescribeVolumes", | |
"ec2:RevokeSecurityGroupEgress", | |
"ec2:RevokeSecurityGroupIngress" | |
], | |
"Effect": "Allow", | |
"Resource": "*" | |
}] | |
} | |
""" | |
for group in self.groups: | |
self.account_tester.attach_policy_group(group, str(group + '_policy'), vars()[group+'_policy']) | |
def IAMPolicyTest(self): | |
# test from developers group | |
test_user = self.groups['eutester_developers'][0] | |
keys = self.account_tester.create_access_key(test_user) | |
access_key = keys['access_key_id'] | |
secret_key = keys['secret_access_key'] | |
self.iam_tester = Eucaops(aws_access_key_id=access_key, aws_secret_access_key=secret_key, | |
ec2_ip=self.tester.ec2.host,s3_ip=self.tester.s3.host, s3_path=self.tester.get_s3_path(), | |
username=test_user, account=self.args.user_account) | |
# "ec2:CreateKeyPair" | |
try: | |
self.keypair = self.iam_tester.add_keypair( "keypair-" + str(time.time())) | |
self.keypath = '%s/%s.pem' % (os.curdir, self.keypair.name) | |
except EC2ResponseError as e: | |
self.fail("Was unable to create keypair as developer '" + test_user) | |
# "ec2:*SecurityGroup*" | |
try: | |
self.iam_tester.authorize_group_by_name(group_name='default') | |
self.iam_tester.authorize_group_by_name(group_name='default', port=-1, protocol="icmp") | |
except EC2ResponseError as e: | |
self.fail("Was unable to authorize security group rules as developer '" + test_user + "'") | |
# "ec2:RunInstances" | |
try: | |
self.image = self.iam_tester.get_emi(root_device_type="ebs") | |
self.reservation = self.iam_tester.run_instance(self.image, keypair=self.keypair.name) | |
instance = self.reservation.instances[0] | |
self.debug("Was able to run instance as developer '" + test_user + "'") | |
except EC2ResponseError as e: | |
self.fail("Was unable to run instance as developer '" + test_user + "'") | |
# "ec2:StopInstances" | |
try: | |
self.iam_tester.stop_instances(self.reservation) | |
self.debug("Was able to stop instance '" + instance.id + "' as developer '" + test_user + "'") | |
except EC2ResponseError as e: | |
self.fail("Was unable to stop instance as developer '" + test_user + "'") | |
# "ec2:StartInstances" | |
try: | |
self.iam_tester.start_instances(self.reservation) | |
self.debug("Was able to start instance '" + instance.id + "' as developer " + test_user + "'") | |
except EC2ResponseError as e: | |
self.fail("Was unable to start instance as developer '" + test_user + "'") | |
# "ec2:TerminateInstances" | |
try: | |
self.debug("Terminating instance '" + instance.id + "' as developer '" + test_user + "'") | |
self.iam_tester.terminate_instances(reservation=self.reservation) | |
except EC2ResponseError as e: | |
self.fail("Was unable to terminate instance from developer '" + test_user + "'") | |
# manager's test | |
test_user = self.groups['eutester_managers'][0] | |
keys = self.account_tester.create_access_key(test_user) | |
access_key = keys['access_key_id'] | |
secret_key = keys['secret_access_key'] | |
self.iam_tester = Eucaops(aws_access_key_id=access_key, aws_secret_access_key=secret_key, | |
ec2_ip=self.tester.ec2.host,s3_ip=self.tester.s3.host, s3_path=self.tester.get_s3_path(), | |
username=test_user, account=self.args.user_account) | |
try: | |
self.keypair = self.iam_tester.add_keypair( "keypair-" + str(time.time())) | |
self.errormsg("Failed because user '" + test_user + "' was able to create keypair.") | |
except EC2ResponseError as e: | |
self.iam_tester.debug("Failed to create '"+ self.keypair.name + "' as expected. Reason: " + e.error_code) | |
def RemoveResourcesTest(self): | |
for group, users in self.groups.iteritems(): | |
self.account_tester.detach_policy_group(group, str('eutester_' + group + '_policy')) | |
for i, user in enumerate(users): | |
self.account_tester.remove_user_from_group(group, user) | |
self.account_tester.delete_user(user) | |
if (i+1) == len(users): | |
self.account_tester.delete_group(group) | |
self.tester.delete_account(self.args.user_account, True) | |
# ipython | |
## add resources | |
# for group, users in groups.iteritems(): | |
# account_tester.create_group(group) | |
# for user in users: | |
# account_tester.create_user(user) | |
# account_tester.add_user_to_group(group, user) | |
## remove resources | |
# for group, users in groups.iteritems(): | |
# for i, user in enumerate(users): | |
# account_tester.remove_user_from_group(group, user) | |
# account_tester.delete_user(user) | |
# if (i+1) == len(users): | |
# account_tester.delete_group(group) | |
if __name__ == "__main__": | |
testcase = EC2IamTest() | |
### Use the list of tests passed from config/command line to determine what subset of tests to run | |
### or use a predefined list | |
list = testcase.args.tests or ["CreateResourcesTest", "IAMPolicyTest", "RemoveResourcesTest"] | |
### Convert test suite methods to EutesterUnitTest objects | |
unit_list = [ ] | |
for test in list: | |
unit_list.append( testcase.create_testunit_by_name(test) ) | |
### Run the EutesterUnitTest objects | |
result = testcase.run_test_case_list(unit_list,clean_on_exit=True) | |
exit(result) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment