Skip to content

Instantly share code, notes, and snippets.

@developer-sdk
Created July 2, 2023 10:41
Show Gist options
  • Save developer-sdk/82b3b3900be473a28c094fe68a577927 to your computer and use it in GitHub Desktop.
Save developer-sdk/82b3b3900be473a28c094fe68a577927 to your computer and use it in GitHub Desktop.
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
import datetime
dag_args = {'owner': 'deploy',
'start_date': datetime.datetime(2022, 7, 10),
'catchup': False}
# op_args 출력
def print_op_args(*op_args):
print("*"*100)
print(op_args) # ('a', 'b', 1)
print("*"*100)
# op_kwargs 출력
def print_op_kwargs(**op_kwargs):
print("*"*100)
print(op_kwargs) # {'conf': <airflow.configuration.AirflowConfigParser object at 0x7f3c15ccdcd0>, 'dag': <DAG: python_example>, 'dag_run': <DagRun python_example @ 2023-07-02 10:36:24+00:00: manual__2023-07-02T19:36:24+09:00, ... }
print("*"*100)
# 변수명은 다른 이름도 상관 없음
def print_op_other_name(*op_list, **op_dict):
print("*"*100)
print(op_list) # ('a', 'b', 1)
print(op_dict) # {'conf': <airflow.configuration.AirflowConfigParser object at 0x7f3c15ccdcd0>, 'dag': <DAG: python_example>, 'dag_run': <DagRun python_example @ 2023-07-02 10:36:24+00:00: manual__2023-07-02T19:36:24+09:00, ... }
print("*"*100)
# op_kwargs 의 params 출력
def print_template_param(params):
print("*"*100)
print(params) # {'hello': 'world'}
print("*"*100)
def print_template_param_other_name(params, dag_run):
print("*"*100)
print(params) # {'hello': 'world'}
print(dag_run) # <DagRun python_example @ 2023-07-02 10:36:24+00:00: manual__2023-07-02T19:36:24+09:00, state:running, queued_at: 2023-07-02 10:36:35.473247+00:00. externally triggered: True>
print(dag_run.conf) # {'hello': 'world'}
print("*"*100)
with DAG(dag_id='python_example',
default_args=dag_args,
schedule_interval=None) as dag:
t1 = PythonOperator(
task_id='t1',
python_callable=print_op_args,
op_args=['a', 'b', 1],
dag=dag
)
t2 = PythonOperator(
task_id='t2',
python_callable=print_op_kwargs,
dag=dag
)
t3 = PythonOperator(
task_id='t3',
python_callable=print_op_other_name,
op_args=['a', 'b', 1],
dag=dag
)
t4 = PythonOperator(
task_id='t4',
python_callable=print_template_param,
dag=dag
)
t5 = PythonOperator(
task_id='t5',
python_callable=print_template_param_other_name,
dag=dag
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment