Skip to content

Instantly share code, notes, and snippets.

@denisb411
Created August 17, 2021 22:44
Show Gist options
  • Save denisb411/a4c0fc2e601e825ad3de044ca1e67772 to your computer and use it in GitHub Desktop.
Save denisb411/a4c0fc2e601e825ad3de044ca1e67772 to your computer and use it in GitHub Desktop.
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.subdag import SubDagOperator
from airflow.utils.task_group import TaskGroup
from random import uniform
from datetime import datetime
default_args = {
'start_date': datetime(2020, 1, 1)
}
def _training_model(ti):
accuracy = uniform(0.1, 10.0)
print(f'model\'s accuracy: {accuracy}')
ti.xcom_push(key='model_accuracy', value=str(accuracy))
def _choose_best_model(**context):
task_instance = context['task_instance']
accuracies = task_instance.xcom_pull(key='model_accuracy',
task_ids=['processing_tasks.training_model_a'
'processing_tasks.training_model_b'
'processing_tasks.training_model_c'])
print('aaaaaaaaaaaa')
print(accuracies)
with DAG('xcom_dag', schedule_interval='@daily', default_args=default_args, catchup=False) as dag:
downloading_data = BashOperator(
task_id='downloading_data',
bash_command='sleep 3',
do_xcom_push=False
)
with TaskGroup('processing_tasks') as processing_tasks:
training_model_a = PythonOperator(
task_id='training_model_a',
python_callable=_training_model,
provide_context=True
)
training_model_b = PythonOperator(
task_id='training_model_b',
python_callable=_training_model,
provide_context=True
)
training_model_c = PythonOperator(
task_id='training_model_c',
python_callable=_training_model,
provide_context=True
)
choose_model = PythonOperator(
task_id='choose_best_model',
python_callable=_choose_best_model,
provide_context=True
)
downloading_data >> processing_tasks >> choose_model
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment