Created
November 14, 2024 00:49
-
-
Save ferruzzi/b390967779f9774eef054b63b73280bf to your computer and use it in GitHub Desktop.
A few basic DAGs which come in handy for Airflow debugging
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime | |
from time import sleep | |
from airflow import DAG | |
from airflow.decorators import task | |
from airflow.utils.dates import days_ago | |
with DAG(dag_id="5_min", schedule_interval=None, start_date=days_ago(1)) as dag: | |
@task | |
def wait_5(minutes_to_run=5): | |
print("Waiting %s minutes!" % minutes_to_run) | |
for x in range(1, minutes_to_run): | |
sleep(60) | |
print("%s minutes done..." % x) | |
print("Done waiting.") | |
wait_5() | |
with DAG(dag_id="10_min", schedule_interval=None, start_date=days_ago(1)) as dag: | |
@task | |
def wait_10(minutes_to_run=10): | |
print("Waiting %d minutes!" % minutes_to_run) | |
sleep(60 * minutes_to_run) | |
print("Done waiting.") | |
wait_10() | |
with DAG(dag_id="hello_world", schedule_interval=None, start_date=days_ago(1)) as dag: | |
@task | |
def hello_world(): | |
print("hello world!") | |
hello_world() | |
with DAG(dag_id="n_parallel_tasks", schedule_interval=None, start_date=days_ago(1)) as dag: | |
N = 20 | |
SLEEP = 1 | |
@task | |
def run_task(foo): | |
print( | |
f"\n***************\nTask: {foo}\nCurrent time: {datetime.utcnow()}\nSleeping: {SLEEP}\n***************" | |
) | |
sleep(SLEEP) | |
for n in range(N): | |
run_task(n) | |
with DAG( | |
dag_id="n_sequential_tasks", schedule_interval=None, start_date=days_ago(1), max_active_tasks=1 | |
) as dag: | |
N = 20 | |
SLEEP = 1 | |
@task | |
def run_task(foo): | |
print( | |
f"\n***************\nTask: {foo}\nCurrent time: {datetime.utcnow()}\nSleeping: {SLEEP}\n***************" | |
) | |
sleep(SLEEP) | |
for n in range(N): | |
run_task(n) | |
with DAG(dag_id="this_is_the_dag_that_never_ends", schedule_interval=None, start_date=days_ago(1)) as dag: | |
@task | |
def hello_world(minutes_between_runs=1): | |
minutes_elapsed = 0 | |
while True: | |
minutes_elapsed += minutes_between_runs | |
print("%s minutes and still running!" % minutes_elapsed) | |
sleep(60 * minutes_between_runs) | |
hw = hello_world(minutes_between_runs=5) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment