Skip to content

Instantly share code, notes, and snippets.

@jerowe
Created March 21, 2019 16:27
Show Gist options
  • Save jerowe/d184b70f5050d7ee04028974b28d21b2 to your computer and use it in GitHub Desktop.
Save jerowe/d184b70f5050d7ee04028974b28d21b2 to your computer and use it in GitHub Desktop.
Ice Cream Sundae Linear DAG Def Airflow
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
import random
from pprint import pprint
from icecream_sunday_dag_def import choose_icecream_flavor, choose_cone, choose_toppings, make_icecream_sundae, \
default_args
icecream_sundae_linear_dag = DAG('ice_cream_sundae_linear', default_args=default_args, schedule_interval=None)
"""
Linear Ice Cream Sundae DAG Def
"""
def generate_choose_cone_op(dag, task_id):
return PythonOperator(
task_id='choose_cone_task_{}'.format(task_id),
python_callable=choose_cone,
provide_context=True,
dag=dag
)
def generate_choose_icecream_flavor_op(dag, task_id):
return PythonOperator(
task_id='choose_icecream_flavor_task_{}'.format(task_id),
provide_context=True,
dag=dag,
python_callable=choose_icecream_flavor,
)
def generate_choose_toppings_op(dag, task_id):
return PythonOperator(
task_id='choose_toppings_task_{}'.format(task_id),
provide_context=True,
dag=dag,
python_callable=choose_toppings,
)
def generate_make_icecream_sundae_op(dag, task_id):
return PythonOperator(
task_id='make_icecream_sundae_task_{}'.format(task_id),
provide_context=True,
dag=dag,
python_callable=make_icecream_sundae,
)
choose_cones_op_list = [generate_choose_cone_op(icecream_sundae_linear_dag, 1),
generate_choose_cone_op(icecream_sundae_linear_dag, 2)]
choose_flavors_op_list = [generate_choose_icecream_flavor_op(icecream_sundae_linear_dag, 1),
generate_choose_icecream_flavor_op(icecream_sundae_linear_dag, 2)]
choose_toppings_op_list = [generate_choose_toppings_op(icecream_sundae_linear_dag, 1),
generate_choose_toppings_op(icecream_sundae_linear_dag, 2)]
for index, choose_cone_OP in enumerate(choose_cones_op_list):
choose_cone_OP.set_downstream(choose_flavors_op_list[index])
choose_flavors_op_list[index].set_downstream(choose_toppings_op_list[index])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment