Creates a step in Amazon EMR for a given cluster_id
and monitors it's progress using a sensor. A more complex example, that involves cluster creation/termination can be found here.
-
-
Save cr3a7ure/3c76a829caa53bdb4d06e4fde2804948 to your computer and use it in GitHub Desktop.
Submit a Spark job to an existing Amazon EMR cluster
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import airflow | |
from airflow import DAG | |
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator | |
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor | |
from datetime import timedelta | |
DEFAULT_ARGS = { | |
'owner': 'airflow', | |
'depends_on_past': False, | |
'start_date': airflow.utils.dates.days_ago(2), | |
'email': ['[email protected]'], | |
'email_on_failure': False, | |
'email_on_retry': False | |
} | |
SPARK_TEST_STEPS = [ | |
{ | |
'Name': 'TestSparkJob1', | |
'ActionOnFailure': 'CONTINUE', | |
'HadoopJarStep': { | |
'Jar': 'command-runner.jar', | |
'Args': [ | |
'spark-submit', | |
'--deploy-mode', 'cluster', | |
'--master', 'yarn', | |
'--driver-memory', '1G', | |
'--executor-memory','1G', | |
'--num-executors', '1', | |
'--class', 'Main', | |
's3://dev-bucket/test-spark-job-spark-assembly-0.1.0-SNAPSHOT.jar' | |
] | |
} | |
} | |
] | |
cluster_id = "j-xxxxxxxxxxxxx" | |
dag = DAG( | |
'emr1', | |
default_args=DEFAULT_ARGS, | |
dagrun_timeout=timedelta(hours=2), | |
schedule_interval='0 3 * * *' | |
) | |
step_adder = EmrAddStepsOperator( | |
task_id='add_steps', | |
job_flow_id=cluster_id, | |
aws_conn_id='aws_default', | |
steps=SPARK_TEST_STEPS, | |
dag=dag | |
) | |
step_checker = EmrStepSensor( | |
task_id='watch_step', | |
job_flow_id=cluster_id, | |
step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}", | |
aws_conn_id='aws_default', | |
dag=dag | |
) | |
step_adder.set_downstream(step_checker) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment