Created
April 18, 2019 06:20
-
-
Save mik-laj/d48544c6e68b8dbb34d6a689eba522fa to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from airflow.operators import python_operator | |
import datetime | |
from airflow.utils import dates | |
from airflow.operators import bash_operator | |
from airflow.utils.trigger_rule import TriggerRule | |
from airflow import models | |
from o2a_libs.el_basic_functions import first_not_null | |
from airflow.operators import dummy_operator | |
PARAMS = { | |
"user.name": "pig", | |
"nameNode": "hdfs://localhost:8020", | |
"resourceManager": "localhost:8032", | |
"queueName": "default", | |
"examplesRoot": "examples", | |
"oozie.wf.application.path": "hdfs://localhost:8020/user/pig/examples/apps/shell", | |
"dataproc_cluster": "cluster-o2a", | |
"gcp_region": "europe-west3", | |
} | |
with models.DAG( | |
"decision", | |
schedule_interval=None, # Change to suit your needs | |
start_date=dates.days_ago(0), # Change to suit your needs | |
) as dag: | |
def decision_node_decision(): | |
if first_not_null("", ""): | |
return "fake_end" | |
elif first_not_null("test", ""): | |
return "real_end" | |
else: | |
return "fail" | |
decision_node = python_operator.BranchPythonOperator( | |
python_callable=decision_node_decision, task_id="decision_node", trigger_rule="one_success" | |
) | |
fail = bash_operator.BashOperator(bash_command="exit 1", task_id="fail", trigger_rule="one_success") | |
fake_end = bash_operator.BashOperator( | |
bash_command="exit 1", task_id="fake_end", trigger_rule="one_success" | |
) | |
real_end = dummy_operator.DummyOperator(task_id="real_end", trigger_rule="one_success") | |
decision_node.set_downstream(fail) | |
decision_node.set_downstream(real_end) | |
decision_node.set_downstream(fake_end) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from airflow.utils import dates | |
from airflow.operators import bash_operator | |
from airflow.utils.trigger_rule import TriggerRule | |
import datetime | |
from airflow import models | |
from airflow.operators import python_operator | |
from o2a_libs.el_basic_functions import first_not_null | |
PARAMS = { | |
"user.name": "pig", | |
"nameNode": "hdfs://localhost:8020", | |
"resourceManager": "localhost:8032", | |
"queueName": "default", | |
"examplesRoot": "examples", | |
"oozie.wf.application.path": "hdfs://localhost:8020/user/pig/examples/apps/shell", | |
"dataproc_cluster": "cluster-o2a", | |
"gcp_region": "europe-west3", | |
} | |
with models.DAG( | |
"decision", | |
schedule_interval=None, # Change to suit your needs | |
start_date=dates.days_ago(0), # Change to suit your needs | |
) as dag: | |
def decision_node_decision(): | |
if first_not_null("", ""): | |
return "fake_end" | |
elif first_not_null("test", ""): | |
return "real_end" | |
else: | |
return "fail" | |
decision_node = python_operator.BranchPythonOperator( | |
python_callable=decision_node_decision, task_id="decision_node", trigger_rule="one_success" | |
) | |
fail = bash_operator.BashOperator(bash_command="exit 1", task_id="fail", trigger_rule="one_success") | |
fake_end = bash_operator.BashOperator( | |
bash_command="exit 1", task_id="fake_end", trigger_rule="one_success" | |
) | |
decision_node.set_downstream(fake_end) | |
decision_node.set_downstream(fail) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment