Skip to content

Instantly share code, notes, and snippets.

@sprzedwojski
Created April 17, 2019 17:23
Show Gist options
  • Save sprzedwojski/28b5580b4a7becb32cd4fdd9fcc9b884 to your computer and use it in GitHub Desktop.
Save sprzedwojski/28b5580b4a7becb32cd4fdd9fcc9b884 to your computer and use it in GitHub Desktop.
from airflow.contrib.operators import dataproc_operator
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils import dates
from airflow import models
from airflow.operators import bash_operator
import datetime
PARAMS = {
"user.name": "TODO",
"nameNode": "hdfs://",
"resourceManager": "localhost:8032",
"queueName": "default",
"examplesRoot": "examples",
"oozie.wf.application.path": "hdfs:///user/hadoop/examples/mapreduce",
"outputDir": "output",
"dataproc_cluster": "oozie-o2a-2cpu",
"gcp_conn_id": "google_cloud_default",
"gcp_region": "europe-west3",
"hadoop_jars": "hdfs:/user/mapred/examples/mapreduce/lib/wordcount.jar",
"hadoop_main_class": "WordCount",
}
with models.DAG(
"test_mapreduce_dag",
schedule_interval=None, # Change to suit your needs
start_date=dates.days_ago(0), # Change to suit your needs
) as dag:
mr_node_prepare = bash_operator.BashOperator(
task_id="mr_node_prepare",
bash_command='$DAGS_FOLDER/../data/prepare.sh -c oozie-o2a-2cpu -r europe-west3 -d "/user/mapred/examples/mapreduce/output"',
)
mr_node = dataproc_operator.DataProcHadoopOperator(
main_class=PARAMS["hadoop_main_class"],
arguments=["/user/mapred/examples/mapreduce/input", "/user/mapred/examples/mapreduce/output"],
files=["hdfs:///user/mapred/examples/mapreduce/lib/wordcount.jar"],
cluster_name=PARAMS["dataproc_cluster"],
task_id="mr_node",
trigger_rule="one_success",
dataproc_hadoop_properties={
"mapred.job.queue.name": "default",
"mapreduce.mapper.class": "WordCount$Map",
"mapreduce.reducer.class": "WordCount$Reduce",
"mapreduce.combine.class": "WordCount$Reduce",
"mapreduce.job.output.key.class": "org.apache.hadoop.io.Text",
"mapreduce.job.output.value.class": "org.apache.hadoop.io.IntWritable",
"mapred.input.dir": "/user/mapred/examples/mapreduce/input",
"mapred.output.dir": "/user/mapred/examples/mapreduce/output",
},
dataproc_hadoop_jars=PARAMS["hadoop_jars"],
gcp_conn_id=PARAMS["gcp_conn_id"],
region=PARAMS["gcp_region"],
dataproc_job_id="mr_node",
)
mr_node_prepare.set_downstream(mr_node)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment