Created
August 18, 2010 18:58
-
-
Save michaelmontano/535794 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff -Naur ../tmp/hadoop-0.20.1+169.68/src/contrib/cloud/src/py/hadoop/cloud/cli.py src/hadoop/src/contrib/cloud/src/py/hadoop/cloud/cli.py | |
--- ../tmp/hadoop-0.20.1+169.68/src/contrib/cloud/src/py/hadoop/cloud/cli.py 2010-03-18 17:29:55.000000000 -0700 | |
+++ src/hadoop/src/contrib/cloud/src/py/hadoop/cloud/cli.py 2010-05-17 15:00:26.000000000 -0700 | |
@@ -292,7 +292,7 @@ | |
opt.get('public_key'), opt.get('user_data_file'), | |
opt.get('availability_zone'), opt.get('user_packages'), | |
opt.get('auto_shutdown'), opt.get('env'), | |
- opt.get('security_group')) | |
+ opt.get('security_group'), opt.get('spot_price')) | |
service.launch_master(template, config_dir, opt.get('client_cidr')) | |
elif command == 'launch-slaves': | |
@@ -306,7 +306,7 @@ | |
opt.get('public_key'), opt.get('user_data_file'), | |
opt.get('availability_zone'), opt.get('user_packages'), | |
opt.get('auto_shutdown'), opt.get('env'), | |
- opt.get('security_group')) | |
+ opt.get('security_group'), opt.get('spot_price')) | |
service.launch_slaves(template) | |
elif command == 'launch-cluster': | |
@@ -327,14 +327,14 @@ | |
opt.get('public_key'), opt.get('user_data_file'), | |
opt.get('availability_zone'), opt.get('user_packages'), | |
opt.get('auto_shutdown'), opt.get('env'), | |
- opt.get('security_group')), | |
+ opt.get('security_group'), opt.get('spot_price')), | |
InstanceTemplate((DATANODE, TASKTRACKER), number_of_slaves, | |
get_image_id(service.cluster, opt), | |
opt.get('instance_type'), opt.get('key_name'), | |
opt.get('public_key'), opt.get('user_data_file'), | |
opt.get('availability_zone'), opt.get('user_packages'), | |
opt.get('auto_shutdown'), opt.get('env'), | |
- opt.get('security_group')), | |
+ opt.get('security_group'), opt.get('spot_price')), | |
] | |
elif len(args) > 2 and len(args) % 2 == 0: | |
print_usage(sys.argv[0]) | |
@@ -350,7 +350,7 @@ | |
opt.get('availability_zone'), | |
opt.get('user_packages'), | |
opt.get('auto_shutdown'), opt.get('env'), | |
- opt.get('security_group'))) | |
+ opt.get('security_group'), opt.get('spot_price'))) | |
service.launch_cluster(instance_templates, config_dir, | |
opt.get('client_cidr')) | |
diff -Naur ../tmp/hadoop-0.20.1+169.68/src/contrib/cloud/src/py/hadoop/cloud/providers/ec2.py src/hadoop/src/contrib/cloud/src/py/hadoop/cloud/providers/ec2.py | |
--- ../tmp/hadoop-0.20.1+169.68/src/contrib/cloud/src/py/hadoop/cloud/providers/ec2.py 2010-03-18 17:29:55.000000000 -0700 | |
+++ src/hadoop/src/contrib/cloud/src/py/hadoop/cloud/providers/ec2.py 2010-05-17 14:47:21.000000000 -0700 | |
@@ -243,6 +243,43 @@ | |
sys.stdout.write(".") | |
sys.stdout.flush() | |
time.sleep(1) | |
+ | |
+ def launch_spot_instances(self, roles, price, number, image_id, size_id, | |
+ instance_user_data, **kwargs): | |
+ for role in roles: | |
+ self._check_role_name(role) | |
+ self._create_groups(role) | |
+ | |
+ user_data = instance_user_data.read_as_gzip_stream() | |
+ security_groups = self._get_group_names(roles) + kwargs.get('security_groups', []) | |
+ spot_request = self.ec2Connection.request_spot_instances(price=price, image_id=image_id, | |
+ count=number, type=None, valid_from=None, valid_until=None, | |
+ launch_group=kwargs.get('launch_group', None), | |
+ availability_zone_group=kwargs.get('availability_zone_group', None), | |
+ key_name=kwargs.get('key_name', None), | |
+ security_groups=security_groups, user_data=user_data, | |
+ instance_type=size_id, placement=kwargs.get('placement', None)) | |
+ spot_instance_request_ids = [request.id for request in spot_request] | |
+ instance_ids = self.wait_for_spot_instances(spot_instance_request_ids) | |
+ return instance_ids | |
+ | |
+ def wait_for_spot_instances(self, request_ids, timeout=1200): | |
+ start_time = time.time() | |
+ while True: | |
+ if (time.time() - start_time >= timeout): | |
+ raise TimeoutException() | |
+ try: | |
+ instance_ids = [request.instanceId for request in self.ec2Connection.get_all_spot_instance_requests(request_ids)] | |
+ if self._all_started(self.ec2Connection.get_all_instances(instance_ids)): | |
+ return instance_ids | |
+ except AttributeError: | |
+ pass | |
+ # don't timeout for race condition where instance is not yet registered | |
+ except EC2ResponseError: | |
+ pass | |
+ sys.stdout.write(".") | |
+ sys.stdout.flush() | |
+ time.sleep(1) | |
def _all_started(self, reservations): | |
for res in reservations: | |
@@ -255,6 +292,9 @@ | |
instances = self._get_instances(self._get_cluster_group_name(), "running") | |
if instances: | |
self.ec2Connection.terminate_instances([i.id for i in instances]) | |
+ spot_instance_request_ids = map(lambda x: x.spot_instance_request_id, filter(lambda x: x.spot_instance_request_id is not None, instances)) | |
+ if spot_instance_request_ids: | |
+ self.ec2Connection.cancel_spot_instance_requests(spot_instance_request_ids) | |
def delete(self): | |
""" | |
diff -Naur ../tmp/hadoop-0.20.1+169.68/src/contrib/cloud/src/py/hadoop/cloud/service.py src/hadoop/src/contrib/cloud/src/py/hadoop/cloud/service.py | |
--- ../tmp/hadoop-0.20.1+169.68/src/contrib/cloud/src/py/hadoop/cloud/service.py 2010-03-18 17:29:53.000000000 -0700 | |
+++ src/hadoop/src/contrib/cloud/src/py/hadoop/cloud/service.py 2010-05-16 17:08:53.000000000 -0700 | |
@@ -52,7 +52,7 @@ | |
key_name, public_key, | |
user_data_file_template=None, placement=None, | |
user_packages=None, auto_shutdown=None, env_strings=[], | |
- security_groups=[]): | |
+ security_groups=[], spot_price=None): | |
self.roles = roles | |
self.number = number | |
self.image_id = image_id | |
@@ -65,6 +65,7 @@ | |
self.auto_shutdown = auto_shutdown | |
self.env_strings = env_strings | |
self.security_groups = security_groups | |
+ self.spot_price = spot_price | |
def add_env_strings(self, env_strings): | |
new_env_strings = list(self.env_strings or []) | |
@@ -209,12 +210,20 @@ | |
"EBS_MAPPINGS": ebs_mappings, | |
}) } | |
instance_user_data = InstanceUserData(user_data_file_template, replacements) | |
- instance_ids = self.cluster.launch_instances(it.roles, it.number, it.image_id, | |
- it.size_id, | |
- instance_user_data, | |
- key_name=it.key_name, | |
- public_key=it.public_key, | |
- placement=it.placement) | |
+ if it.spot_price is None: | |
+ instance_ids = self.cluster.launch_instances(it.roles, it.number, it.image_id, | |
+ it.size_id, | |
+ instance_user_data, | |
+ key_name=it.key_name, | |
+ public_key=it.public_key, | |
+ placement=it.placement) | |
+ else: | |
+ instance_ids = self.cluster.launch_spot_instances(it.roles, it.spot_price, it.number, it.image_id, | |
+ it.size_id, | |
+ instance_user_data, | |
+ key_name=it.key_name, | |
+ public_key=it.public_key, | |
+ placement=it.placement) | |
print "Waiting for %s instances in role %s to start" % \ | |
(it.number, ",".join(it.roles)) | |
try: |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment