Skip to content

Instantly share code, notes, and snippets.

@mik-laj
Created April 18, 2019 06:20
Show Gist options
  • Save mik-laj/d48544c6e68b8dbb34d6a689eba522fa to your computer and use it in GitHub Desktop.
Save mik-laj/d48544c6e68b8dbb34d6a689eba522fa to your computer and use it in GitHub Desktop.
from airflow.operators import python_operator
import datetime
from airflow.utils import dates
from airflow.operators import bash_operator
from airflow.utils.trigger_rule import TriggerRule
from airflow import models
from o2a_libs.el_basic_functions import first_not_null
from airflow.operators import dummy_operator
PARAMS = {
"user.name": "pig",
"nameNode": "hdfs://localhost:8020",
"resourceManager": "localhost:8032",
"queueName": "default",
"examplesRoot": "examples",
"oozie.wf.application.path": "hdfs://localhost:8020/user/pig/examples/apps/shell",
"dataproc_cluster": "cluster-o2a",
"gcp_region": "europe-west3",
}
with models.DAG(
"decision",
schedule_interval=None, # Change to suit your needs
start_date=dates.days_ago(0), # Change to suit your needs
) as dag:
def decision_node_decision():
if first_not_null("", ""):
return "fake_end"
elif first_not_null("test", ""):
return "real_end"
else:
return "fail"
decision_node = python_operator.BranchPythonOperator(
python_callable=decision_node_decision, task_id="decision_node", trigger_rule="one_success"
)
fail = bash_operator.BashOperator(bash_command="exit 1", task_id="fail", trigger_rule="one_success")
fake_end = bash_operator.BashOperator(
bash_command="exit 1", task_id="fake_end", trigger_rule="one_success"
)
real_end = dummy_operator.DummyOperator(task_id="real_end", trigger_rule="one_success")
decision_node.set_downstream(fail)
decision_node.set_downstream(real_end)
decision_node.set_downstream(fake_end)
from airflow.utils import dates
from airflow.operators import bash_operator
from airflow.utils.trigger_rule import TriggerRule
import datetime
from airflow import models
from airflow.operators import python_operator
from o2a_libs.el_basic_functions import first_not_null
PARAMS = {
"user.name": "pig",
"nameNode": "hdfs://localhost:8020",
"resourceManager": "localhost:8032",
"queueName": "default",
"examplesRoot": "examples",
"oozie.wf.application.path": "hdfs://localhost:8020/user/pig/examples/apps/shell",
"dataproc_cluster": "cluster-o2a",
"gcp_region": "europe-west3",
}
with models.DAG(
"decision",
schedule_interval=None, # Change to suit your needs
start_date=dates.days_ago(0), # Change to suit your needs
) as dag:
def decision_node_decision():
if first_not_null("", ""):
return "fake_end"
elif first_not_null("test", ""):
return "real_end"
else:
return "fail"
decision_node = python_operator.BranchPythonOperator(
python_callable=decision_node_decision, task_id="decision_node", trigger_rule="one_success"
)
fail = bash_operator.BashOperator(bash_command="exit 1", task_id="fail", trigger_rule="one_success")
fake_end = bash_operator.BashOperator(
bash_command="exit 1", task_id="fake_end", trigger_rule="one_success"
)
decision_node.set_downstream(fake_end)
decision_node.set_downstream(fail)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment