-
-
Save cr3a7ure/b5f8ad018981b29dc4ca343eb48acaa2 to your computer and use it in GitHub Desktop.
Airflow Dynamic Workflow Sample
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import airflow | |
from airflow.executors.celery_executor import CeleryExecutor | |
from airflow.models import DAG | |
from airflow.operators.bash_operator import BashOperator | |
from airflow.operators.dummy_operator import DummyOperator | |
from airflow.operators.subdag_operator import SubDagOperator | |
def get_id_list(): | |
""" idのリストを返す. 例のためとりあえず簡単に0〜99. """ | |
return range(100) | |
def subdag(parent_dag_name, child_dag_name, args): | |
""" 各idに対して実行する処理フローを記述したDAGを返す """ | |
sub_dag = DAG(dag_id="{}.{}".format(parent_dag_name, child_dag_name), default_args=args, schedule_interval="@once") | |
for id in get_id_list(): | |
t1 = BashOperator( | |
task_id='{}-task-1'.format(id), | |
bash_command='echo task1: {}'.format(id), | |
default_args=args, | |
dag=sub_dag, | |
) | |
t2 = BashOperator( | |
task_id='{}-task-2'.format(id), | |
bash_command='echo task2: {}'.format(id), | |
default_args=args, | |
dag=sub_dag, | |
) | |
t1 >> t2 | |
return sub_dag | |
DAG_NAME = 'subdag_operator_sample' | |
args = { | |
'owner': 'airflow', | |
'start_date': airflow.utils.dates.days_ago(2), | |
'provide_context': True, | |
} | |
dag = DAG(dag_id='subdag_operator_sample', default_args=args, schedule_interval='@once') | |
t1 = DummyOperator( | |
task_id='start', | |
default_args=args, | |
dag=dag, | |
) | |
t2 = SubDagOperator( | |
task_id='subdag', | |
executor=CeleryExecutor(), | |
subdag=subdag(DAG_NAME, 'subdag', args), | |
default_args=args, | |
dag=dag, | |
) | |
t1 >> t2 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import airflow | |
from airflow.models import DAG | |
from airflow.operators.bash_operator import BashOperator | |
from airflow.operators.dummy_operator import DummyOperator | |
from airflow.operators.python_operator import PythonOperator | |
from airflow.api.common.experimental.trigger_dag import trigger_dag | |
from airflow.utils import timezone | |
import json | |
def get_id_list(): | |
""" idのリストを返す. 例のためとりあえず簡単に0〜99. """ | |
return range(100) | |
def trigger(**kwargs): | |
dag_id = kwargs['dag_id'] # triggerするDAG idを引数から取得 | |
execution_date = kwargs['ti'].execution_date.isoformat() | |
for id in get_id_list(): | |
run_id = 'trig__{}_{}'.format(id, execution_date) | |
trigger_dag(dag_id=dag_id, | |
run_id=run_id, | |
conf=json.dumps({'id': id}), | |
execution_date=None, | |
replace_microseconds=False) | |
args = { | |
'owner': 'airflow', | |
'start_date': airflow.utils.dates.days_ago(2), | |
'provide_context': True, | |
} | |
### | |
### TriggerするDAGの定義 | |
### | |
DAG_NAME = 'trigger_dag_sample' | |
dag = DAG( | |
dag_id=DAG_NAME, | |
default_args=args, | |
schedule_interval="@once", | |
) | |
t1 = DummyOperator( | |
task_id='start', | |
default_args=args, | |
dag=dag, | |
) | |
t2 = PythonOperator( | |
task_id='trigger_account_dag', | |
python_callable=trigger, | |
op_kwargs={'dag_id': 'triggered_dag_sample'}, | |
default_args=args, | |
dag=dag, | |
) | |
t1 >> t2 | |
### | |
### triggerされるDAGの定義 | |
### | |
sub_dag = DAG( | |
dag_id='triggered_dag_sample', | |
default_args=args, | |
schedule_interval='@once', | |
) | |
command_template_1 = "echo task1: {{ dag_run.conf['id'] }}" | |
command_template_2 = "echo task2: {{ dag_run.conf['id'] }}" | |
t1 = BashOperator( | |
task_id='task-1', | |
bash_command=command_template_1, | |
default_args=args, | |
dag=sub_dag, | |
) | |
t2 = BashOperator( | |
task_id='task-2', | |
bash_command=command_template_2, | |
default_args=args, | |
dag=sub_dag, | |
) | |
t1 >> t2 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment