Skip to content

Instantly share code, notes, and snippets.

@fozziethebeat
Created November 30, 2021 01:03
Show Gist options
  • Save fozziethebeat/7bc9bf14e1289913436ac90221406477 to your computer and use it in GitHub Desktop.
Save fozziethebeat/7bc9bf14e1289913436ac90221406477 to your computer and use it in GitHub Desktop.
Simple Dynamic CAN Airflow
from collections import defaultdict
from airflow import DAG
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator
from libs.datasets import combined_datasets
default_args = {
'owner': 'airflow',
}
# A really simple task for each region
def region_task(region_count):
print(region_count)
@dag(default_args=default_args, schedule_interval=None, start_date=days_ago(2))
def simple_dynamic():
"""
### Building a dynamic covid dag
This makes dynamic covid dag with pysier data
"""
@task(multiple_outputs=True)
def extract():
"""
#### Reads the region data
"""
return {'NY': [6, 1], 'CA': [5, 26, 10]}
@task()
def transform(region, region_list):
"""
#### Counts the sub-regions
"""
print(f'region_{region}')
print(region_list)
dag_subdag = DAG(dag_id=f'transform_{region}',
default_args=default_args,
schedule_interval='@daily')
# Attempt to produce a sub-dag dynamically based on the results.
for i, region_count in enumerate(region_list):
PythonOperator(task_id=f'{region}-{i}',
python_callable=region_task,
op_kwargs={'region_count': region_count})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment