Skip to content

Instantly share code, notes, and snippets.

@andycasey
Created October 22, 2021 01:17
Show Gist options
  • Select an option

  • Save andycasey/c0df8e18da8aec087ac242aa0ec1d736 to your computer and use it in GitHub Desktop.

Select an option

Save andycasey/c0df8e18da8aec087ac242aa0ec1d736 to your computer and use it in GitHub Desktop.
import pendulum
from datetime import datetime
from airflow import DAG
from airflow.models import DagRun
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.operators.dummy import DummyOperator
def get_most_recent_dag_run(dag_id):
dag_runs = DagRun.find(dag_id=dag_id)
dag_runs.sort(key=lambda x: x.execution_date, reverse=True)
return dag_runs[0] if dag_runs else None
def execution_date_fn(execution_date, dag, task, **kwargs):
# Return the most recent execution date for the dependent DAG.
dag_run = get_most_recent_dag_run(task.external_dag_id)
dag_execution_date = dag.start_date if dag_run is None else dag_run.execution_date
return pendulum.instance(dag_execution_date)
with DAG(
"example_A",
schedule_interval="@daily",
start_date=datetime(2020, 10, 24),
tags=["testing"],
) as dag:
check = ExternalTaskSensor(
task_id="check",
external_dag_id="example_A_dependency",
external_task_id=None,
timeout=86_400,
allowed_states=("success", ),
failed_states=("failed", ),
execution_date_fn=execution_date_fn,
mode="reschedule",
poke_interval=60 # check every 60 seconds if we are waiting on the dependent DAG
)
ok = DummyOperator(
task_id="done"
)
check >> ok
# If I set example_A running first,... then 'check' gets set up_for_reschedule, and it has two dags set as running. So it is waiting for the dependency to be executed.
# If I then un-pause it in the web UI, and trigger the dependency:
# > airflow dags trigger example_A_dependency
# Then the dependent DAG will run.
# When the dependent DAG is finished, example_A will run.
# If we "clear" the result from the dependent DAG, then the example_A DAGs will wait because it sees no result.
import os
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
from time import sleep
sleep_2min = lambda **_: sleep(120)
with DAG(
"example_A_dependency",
schedule_interval=None,
start_date=datetime(2020, 10, 24),
tags=["testing"],
) as dag:
a = DummyOperator(task_id="a")
b = PythonOperator(
task_id="sleep",
python_callable=sleep_2min
)
c = DummyOperator(task_id="c")
a >> b >> c
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment