Last active
August 29, 2015 14:03
-
-
Save zitooon/e299e7db442b49cc0449 to your computer and use it in GitHub Desktop.
Amazon AWS EMR rake tasks for automated cluster creation (using Hadoop, Hive and Sqoop), easily editable
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace :aws do | |
namespace :emr do | |
def emr_cluster_base_options(opts={}) | |
opts = { keep_job_flow_alive_when_no_steps: false }.merge(opts) | |
base_options = { | |
ami_version: 'latest', | |
log_uri: 's3n://your/bucket/logs/dir/', | |
instances: { | |
instance_groups: [ | |
{ instance_role: 'MASTER', instance_type: @master_instance_type, instance_count: @cluster_instances_conf[0], market: 'ON_DEMAND' }.merge(!@on_demand ? {market: 'SPOT', bid_price: @master_bid_price} : {}), | |
{ instance_role: 'CORE', instance_type: @instance_type, instance_count: @cluster_instances_conf[1], market: 'ON_DEMAND' }.merge(!@on_demand ? {market: 'SPOT', bid_price: @bid_price} : {}) | |
], | |
ec2_key_name: 'your-keypair', | |
keep_job_flow_alive_when_no_steps: opts[:keep_job_flow_alive_when_no_steps], | |
termination_protected: false, | |
ec2_subnet_id: 'your-subnet-id' | |
}, | |
bootstrap_actions: [ { name: 'Install Sqoop', script_bootstrap_action: { path: 's3://octoly/aws-emr/scripts/install_sqoop.sh' } } ], | |
steps: {}, | |
visible_to_all_users: true | |
} | |
if @cluster_instances_conf[2] > 0 | |
base_options[:instances][:instance_groups] << { instance_role: 'TASK', instance_type: @instance_type, instance_count: @cluster_instances_conf[2], market: 'SPOT', bid_price: @bid_price } | |
end | |
base_options | |
end | |
def emr_base_cluster_steps(opts={}) | |
puts "Registering step 'Setup Hive'" | |
opts = { action_on_failure: 'CANCEL_AND_WAIT' }.merge(opts) # other option : TERMINATE_CLUSTER | |
[ { | |
name: 'Setup Hive', | |
hadoop_jar_step: { jar: 's3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar', args: ['s3://us-east-1.elasticmapreduce/libs/hive/hive-script', '--base-path', 's3://us-east-1.elasticmapreduce/libs/hive/', '--install-hive', '--hive-versions', 'latest'] }, | |
action_on_failure: opts[:action_on_failure] | |
}, emr_script_step(script_name: 'import_sql_tables.sh') | |
] | |
end | |
def emr_script_step(opts={}) | |
opts = { action_on_failure: 'CANCEL_AND_WAIT' }.merge(opts) # other option : TERMINATE_CLUSTER | |
raise "Need script_name option in emr_script_step" unless opts[:script_name] | |
step_name = opts[:script_name].gsub(/\..*/, '').humanize | |
puts "Registering step '#{step_name}'" | |
{ | |
name: step_name, | |
hadoop_jar_step: { jar: 's3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar', args: ["s3://octoly/aws-emr/scripts/#{opts[:script_name]}"] }, | |
action_on_failure: opts[:action_on_failure] | |
} | |
end | |
def emr_hive_steps(opts={}) | |
opts = { action_on_failure: 'CANCEL_AND_WAIT', script_params: [] }.merge(opts) # other option : TERMINATE_CLUSTER | |
raise 'Need queries_path: in opts for emr_hive_steps' unless opts[:queries_path] | |
steps = [] | |
Dir.glob(opts[:queries_path]).each do |file| | |
next unless File.file?(file) | |
file_name = file.gsub("#{Rails.root}/db/hive/", '') | |
puts "Registering step 'Hive : #{file_name.gsub('.hql', '')}'" | |
script_args = [ 's3://us-east-1.elasticmapreduce/libs/hive/hive-script', '--base-path', 's3://us-east-1.elasticmapreduce/libs/hive/', '--hive-versions', 'latest', '--run-hive-script', '--args', '-f', "s3://octoly/aws-emr/hive-queries/#{file_name}"] | |
opts[:script_params].each do |script_param| | |
script_args << "-d" | |
script_args << script_param | |
end | |
steps << { | |
name: "Hive : #{file_name.gsub('.hql', '')}", | |
hadoop_jar_step: { jar: 's3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar', args: script_args }, | |
action_on_failure: opts[:action_on_failure] | |
} | |
end | |
steps | |
end | |
# Add this step to enable hadoop debug | |
def emr_hadoop_debug_step(opts={}) | |
puts "Registering step 'Setup Hadoop Debugging'" | |
opts = { action_on_failure: 'CANCEL_AND_WAIT' }.merge(opts) # other option : TERMINATE_CLUSTER | |
{ | |
name: 'Setup Hadoop Debugging', | |
hadoop_jar_step: { jar: 's3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar', args: ['s3://us-east-1.elasticmapreduce/libs/state-pusher/0.1/fetch'] }, | |
action_on_failure: opts[:action_on_failure] | |
} | |
end | |
def emr_general_config | |
@cluster_instances_conf = (ENV['size'] || '1x10x0').split('x').map(&:to_i) | |
@master_instance_type = ENV['master_instance'] || 'm1.xlarge' | |
@master_bid_price = ENV['master_bid_price'] || '0.5' | |
@instance_type = ENV['instance'] || 'm1.large' | |
@bid_price = ENV['bid_price'] || '0.3' | |
@on_demand = ENV['on_demand'] || false | |
@keep_alive = ENV['keep_alive'] ? eval(ENV['keep_alive']) : nil | |
puts "Cluster hardware configuration :" | |
puts " - size : #{@cluster_instances_conf.join('x')}" | |
puts " - Master instance : #{@master_instance_type} (#{!@on_demand ? "Bid #{@master_bid_price}" : 'on_demand'})" | |
puts " - Other instances : #{@instance_type} (#{!@on_demand ? "Bid #{@bid_price}" : 'on_demand'})" | |
end | |
def emr_finish_config_and_launch(name, cluster_steps, keep_alive=true) | |
keep_alive = @keep_alive if @keep_alive.present? | |
cluster_options = emr_cluster_base_options(keep_job_flow_alive_when_no_steps: keep_alive).merge(steps: cluster_steps) | |
job_flow = AWS::EMR.new(OVERWRITE_AWS_CONFIG).job_flows.create(name, cluster_options) | |
puts "Automated job flow #{name}:#{job_flow.id} created !" | |
end | |
desc "Launch an EMR cluster which will compute audits stats and export it to RDS" | |
task launch_audits_stats_cluster: :environment do | |
emr_general_config | |
cluster_steps = emr_base_cluster_steps | |
cluster_steps += emr_hive_steps(queries_path: "#{Rails.root}/db/hive/import/[v|c]*") | |
cluster_steps += emr_hive_steps(queries_path: "#{Rails.root}/db/hive/backup/*", action_on_failure: 'CONTINUE', script_params: ["CURRENT_DATE=#{Date.yesterday.to_s}"]) | |
cluster_steps += emr_hive_steps(queries_path: "#{Rails.root}/db/hive/audits_stats/[1-9]*") | |
cluster_steps += emr_hive_steps(queries_path: "#{Rails.root}/db/hive/audits_stats/[a-z]*") | |
cluster_steps << emr_script_step(script_name: 'export_audits_stats_to_sql.sh') | |
emr_finish_config_and_launch('AuditsStatsCluster', cluster_steps, false) | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment