Skip to content

Instantly share code, notes, and snippets.

@chenhan1218
Created October 21, 2019 10:37
Show Gist options
  • Save chenhan1218/2bf5eb8525dec29c125888eb10e11821 to your computer and use it in GitHub Desktop.
Save chenhan1218/2bf5eb8525dec29c125888eb10e11821 to your computer and use it in GitHub Desktop.
Airflow subdag
# Failed trial of airflow subdag with timezone aware
from datetime import datetime, timedelta
import pendulum
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator
default_args = {
"owner": "airflow",
"start_date": datetime(2018, 11, 1, tzinfo=pendulum.timezone("Etc/GMT+12")),
}
# Dag is returned by a factory method
def sub_dag(
parent_dag_name,
child_dag_name,
start_date,
schedule_interval,
tzinfo: datetime.tzinfo,
):
dag = DAG(
"%s.%s" % (parent_dag_name, child_dag_name),
schedule_interval=schedule_interval,
start_date=start_date,
)
PythonOperator(
task_id="task",
provide_context=True,
python_callable=f1,
dag=dag,
op_kwargs={"tzinfo": tzinfo},
)
return dag
def f1(tzinfo: datetime.tzinfo = None, **context):
if tzinfo is not None:
print(("local time:", tzinfo.convert(context["execution_date"])))
for key in context:
print((key, context[key]))
PARENT_DAG_NAME = "timezone"
with DAG(
PARENT_DAG_NAME, catchup=True, default_args=default_args, schedule_interval="@daily"
) as dag:
task = PythonOperator(
task_id="t1", provide_context=True, python_callable=f1, dag=dag
)
task2 = PythonOperator(
task_id="t2", provide_context=True, python_callable=f1, dag=dag
)
list_tz = [
"Europe/Amsterdam",
"America/New_York",
"America/Los_Angeles",
"Asia/Jakarta",
"Asia/Kolkata",
"Asia/Taipei",
]
for tz in list_tz:
format_tz = tz.replace("/", "_")
subdag = SubDagOperator(
subdag=sub_dag(
parent_dag_name=PARENT_DAG_NAME,
child_dag_name=format_tz,
start_date=datetime(2018, 11, 1, tzinfo=pendulum.timezone(tz)),
schedule_interval="@daily",
tzinfo=pendulum.timezone(tz),
),
task_id=format_tz,
dag=dag,
)
task >> subdag >> task2
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment