Last active
July 23, 2021 15:34
-
-
Save amalgjose/a2523b157b02f9cfa4f5 to your computer and use it in GitHub Desktop.
Python code for launching an EMR cluster
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
__author__ = 'Amal G Jose' | |
import time | |
import logging | |
from boto.emr.connection import EmrConnection | |
from boto.emr.bootstrap_action import BootstrapAction | |
from boto.emr.step import InstallHiveStep | |
from boto.emr.step import InstallPigStep | |
from boto.regioninfo import RegionInfo | |
#Program for launching an EMR cluster | |
class EmrLauncher(object): | |
# Default constructor of the class. | |
def __init__(self): | |
try: | |
self.zone_name = "ap-southeast-1" | |
self.access_key = "xxxxxx" | |
self.private_key = "xxxxxxx" | |
self.ec2_keyname = "xxxxxxxx" | |
self.base_bucket = "s3://emr-bucket/" | |
self.bootstrap_script = "custom-bootstrap.sh" | |
self.log_dir = "Logs" | |
self.emr_status_wait = 20 | |
self.conn = "" | |
self.cluster_name = "MyFirstEmrCluster" | |
# Establishing EmrConnection | |
self.conn = EmrConnection(self.access_key, self.private_key, | |
region=RegionInfo(name=self.zone_name, | |
endpoint=self.zone_name + '.elasticmapreduce.amazonaws.com')) | |
self.log_bucket_name = self.base_bucket + self.log_dir | |
self.bootstrap_script_name = self.base_bucket + self.bootstrap_script | |
#Method for launching the EMR cluster | |
def launch_emr_cluster(self, master_type, slave_type, num_instance, ami_version): | |
try: | |
#Custom Bootstrap step | |
bootstrap_step = BootstrapAction("CustomBootStrap", self.bootstrap_script_name, None) | |
#Modifyting block size to 256 MB | |
block_size_conf = 'dfs.block.size=256' | |
hadoop_config_params = ['-h', block_size_conf, '-h'] | |
hadoop_config_bootstrapper = BootstrapAction('hadoop-config', | |
's3://elasticmapreduce/bootstrap-actions/configure-hadoop', | |
hadoop_config_params) | |
#Bootstrapping Ganglia | |
hadoop_monitor_bootstrapper = BootstrapAction('ganglia-config', | |
's3://elasticmapreduce/bootstrap-actions/install-ganglia', '') | |
#Bootstrapping Impala | |
impala_install_params = ['--install-impala','--base-path', 's3://elasticmapreduce', '--impala-version', 'latest'] | |
bootstrap_impala_install_step = BootstrapAction("ImpalaInstall", "s3://elasticmapreduce/libs/impala/setup-impala", | |
impala_install_params) | |
#Hive installation | |
hive_install_step = InstallHiveStep(); | |
#Pig Installation | |
pig_install_step = InstallPigStep(); | |
#Launching the cluster | |
jobid = self.conn.run_jobflow( | |
self.cluster_name, | |
self.log_bucket_name, | |
bootstrap_actions=[hadoop_config_bootstrapper, hadoop_monitor_bootstrapper, bootstrap_step, | |
bootstrap_impala_install_step], | |
ec2_keyname=self.ec2_keyname, | |
steps=[hive_install_step, pig_install_step], | |
keep_alive=True, | |
action_on_failure = 'CANCEL_AND_WAIT', | |
master_instance_type=master_type, | |
slave_instance_type=slave_type, | |
num_instances=num_instance, | |
ami_version=ami_version) | |
#Enabling the termination protection | |
self.conn.set_termination_protection(jobid, True) | |
#Checking the state of EMR cluster | |
state = self.conn.describe_jobflow(jobid).state | |
while state != u'COMPLETED' and state != u'SHUTTING_DOWN' and state != u'FAILED' and state != u'WAITING': | |
#sleeping to recheck for status. | |
time.sleep(int(self.emr_status_wait)) | |
state = self.conn.describe_jobflow(jobid).state | |
if state == u'SHUTTING_DOWN' or state == u'FAILED': | |
logging.error("Launching EMR cluster failed") | |
return "ERROR" | |
#Check if the state is WAITING. Then launch the next steps | |
if state == u'WAITING': | |
#Finding the master node dns of EMR cluster | |
master_dns = self.conn.describe_jobflow(jobid).masterpublicdnsname | |
logging.info("Launched EMR Cluster Successfully") | |
logging.info("Master node DNS of EMR " + master_dns) | |
return "SUCCESS" | |
except: | |
logging.error("Launching EMR cluster failed") | |
return "FAILED" | |
#Main method of the program | |
def main(self): | |
try: | |
master_type = 'm3.xlarge' | |
slave_type = 'm3.xlarge' | |
num_instance = 3 | |
ami_version = '2.4.8' | |
emr_status = self.launch_emr_cluster(master_type, slave_type, num_instance, ami_version) | |
if emr_status == 'SUCCESS': | |
logging.info("Emr cluster launched successfully") | |
else: | |
logging.error("Emr launching failed") | |
except: | |
logging.error("Emr launching failed") | |
if __name__ == '__main__': | |
launcher = EmrLauncher() | |
launcher.main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
great post!
in case you getting FAILED. how can you print the error message in order to know the reason for the failed process ?