Created
February 26, 2020 01:20
-
-
Save afsalthaj/50f5f93e9fd960a0f2ac1c193a06129c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import boto3 | |
import logging | |
logger = logging.getLogger() | |
logger.setLevel(logging.INFO) | |
class ZeroClustersAvailable(Exception): pass | |
class MoreThanOneClusterAvailable(Exception): pass | |
def lambda_handler(event, context): | |
client = boto3.client('emr') | |
logger.info("Trying to list the clusters that are in either waiting or running state") | |
clusters = client.list_clusters( | |
ClusterStates = ['WAITING', 'RUNNING'] | |
).get('Clusters') | |
basePlusClusters = list(filter(lambda x: x.get('Name') == ${cluster_name}, list(clusters))) | |
if len(basePlusClusters) == 0: | |
raise ZeroClustersAvailable(f'Cannot run the engine (spark job). There are no clusters available with the name baseplus-{env} in waiting state or running state') | |
emrCluster, *tail = basePlusClusters | |
if len(tail) > 0: | |
raise MoreThanOneClusterAvailable('More than one cluster is available with the name baseplus-dev and in states:[WAITING, RUNNING]') | |
extraArgs = event.get('extra_args') | |
className = event.get('class_name') | |
configFile = event.get('config_file') | |
jobName = event.get('job_name') | |
jarName = event.get('jar_name') | |
logger.info(f'Running spark job with extra arguments: {extraArgs}') | |
logger.info(f'Running spark job with class name: {className}') | |
logger.info(f'Running spark job with config file in: {configFile}') | |
logger.info('Running spark job: {jobName}') | |
job_map = { | |
'Name': '{jobName}', | |
'ActionOnFailure' : 'CONTINUE', | |
'HadoopJarStep' : { | |
'Properties' : [], | |
'Jar': 'command-runner.jar', | |
'Args': [ | |
'/bin/sh', | |
'-c', | |
'spark-submit --packages org.postgresql:postgresql:42.2.5 --conf spark.driver.extraJavaOptions=\"-Ddance-card.config.bucket-name=s3://${account_id}-baseplus-artifacts/{configFile}" --class {className} --deploy-mode cluster s3://${account_id}-baseplus-artifacts/artifacts/{jarName} {extraArgs}' | |
] | |
} | |
} | |
response = client.add_job_flow_steps( | |
JobFlowId = emrCluster.get('Id'), | |
Steps = [job_map for i in range(event.get('count'))] | |
) | |
logger.info(response) | |
return response |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment