Skip to content

Instantly share code, notes, and snippets.

@amalgjose
Last active July 23, 2021 15:34
Show Gist options
  • Save amalgjose/a2523b157b02f9cfa4f5 to your computer and use it in GitHub Desktop.
Save amalgjose/a2523b157b02f9cfa4f5 to your computer and use it in GitHub Desktop.
Python code for launching an EMR cluster
__author__ = 'Amal G Jose'
import time
import logging
from boto.emr.connection import EmrConnection
from boto.emr.bootstrap_action import BootstrapAction
from boto.emr.step import InstallHiveStep
from boto.emr.step import InstallPigStep
from boto.regioninfo import RegionInfo
#Program for launching an EMR cluster
class EmrLauncher(object):
# Default constructor of the class.
def __init__(self):
try:
self.zone_name = "ap-southeast-1"
self.access_key = "xxxxxx"
self.private_key = "xxxxxxx"
self.ec2_keyname = "xxxxxxxx"
self.base_bucket = "s3://emr-bucket/"
self.bootstrap_script = "custom-bootstrap.sh"
self.log_dir = "Logs"
self.emr_status_wait = 20
self.conn = ""
self.cluster_name = "MyFirstEmrCluster"
# Establishing EmrConnection
self.conn = EmrConnection(self.access_key, self.private_key,
region=RegionInfo(name=self.zone_name,
endpoint=self.zone_name + '.elasticmapreduce.amazonaws.com'))
self.log_bucket_name = self.base_bucket + self.log_dir
self.bootstrap_script_name = self.base_bucket + self.bootstrap_script
#Method for launching the EMR cluster
def launch_emr_cluster(self, master_type, slave_type, num_instance, ami_version):
try:
#Custom Bootstrap step
bootstrap_step = BootstrapAction("CustomBootStrap", self.bootstrap_script_name, None)
#Modifyting block size to 256 MB
block_size_conf = 'dfs.block.size=256'
hadoop_config_params = ['-h', block_size_conf, '-h']
hadoop_config_bootstrapper = BootstrapAction('hadoop-config',
's3://elasticmapreduce/bootstrap-actions/configure-hadoop',
hadoop_config_params)
#Bootstrapping Ganglia
hadoop_monitor_bootstrapper = BootstrapAction('ganglia-config',
's3://elasticmapreduce/bootstrap-actions/install-ganglia', '')
#Bootstrapping Impala
impala_install_params = ['--install-impala','--base-path', 's3://elasticmapreduce', '--impala-version', 'latest']
bootstrap_impala_install_step = BootstrapAction("ImpalaInstall", "s3://elasticmapreduce/libs/impala/setup-impala",
impala_install_params)
#Hive installation
hive_install_step = InstallHiveStep();
#Pig Installation
pig_install_step = InstallPigStep();
#Launching the cluster
jobid = self.conn.run_jobflow(
self.cluster_name,
self.log_bucket_name,
bootstrap_actions=[hadoop_config_bootstrapper, hadoop_monitor_bootstrapper, bootstrap_step,
bootstrap_impala_install_step],
ec2_keyname=self.ec2_keyname,
steps=[hive_install_step, pig_install_step],
keep_alive=True,
action_on_failure = 'CANCEL_AND_WAIT',
master_instance_type=master_type,
slave_instance_type=slave_type,
num_instances=num_instance,
ami_version=ami_version)
#Enabling the termination protection
self.conn.set_termination_protection(jobid, True)
#Checking the state of EMR cluster
state = self.conn.describe_jobflow(jobid).state
while state != u'COMPLETED' and state != u'SHUTTING_DOWN' and state != u'FAILED' and state != u'WAITING':
#sleeping to recheck for status.
time.sleep(int(self.emr_status_wait))
state = self.conn.describe_jobflow(jobid).state
if state == u'SHUTTING_DOWN' or state == u'FAILED':
logging.error("Launching EMR cluster failed")
return "ERROR"
#Check if the state is WAITING. Then launch the next steps
if state == u'WAITING':
#Finding the master node dns of EMR cluster
master_dns = self.conn.describe_jobflow(jobid).masterpublicdnsname
logging.info("Launched EMR Cluster Successfully")
logging.info("Master node DNS of EMR " + master_dns)
return "SUCCESS"
except:
logging.error("Launching EMR cluster failed")
return "FAILED"
#Main method of the program
def main(self):
try:
master_type = 'm3.xlarge'
slave_type = 'm3.xlarge'
num_instance = 3
ami_version = '2.4.8'
emr_status = self.launch_emr_cluster(master_type, slave_type, num_instance, ami_version)
if emr_status == 'SUCCESS':
logging.info("Emr cluster launched successfully")
else:
logging.error("Emr launching failed")
except:
logging.error("Emr launching failed")
if __name__ == '__main__':
launcher = EmrLauncher()
launcher.main()
@avivnoy
Copy link

avivnoy commented May 21, 2015

great post!

in case you getting FAILED. how can you print the error message in order to know the reason for the failed process ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment