Created
October 22, 2021 01:17
-
-
Save andycasey/c0df8e18da8aec087ac242aa0ec1d736 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import pendulum | |
| from datetime import datetime | |
| from airflow import DAG | |
| from airflow.models import DagRun | |
| from airflow.sensors.external_task import ExternalTaskSensor | |
| from airflow.operators.dummy import DummyOperator | |
| def get_most_recent_dag_run(dag_id): | |
| dag_runs = DagRun.find(dag_id=dag_id) | |
| dag_runs.sort(key=lambda x: x.execution_date, reverse=True) | |
| return dag_runs[0] if dag_runs else None | |
| def execution_date_fn(execution_date, dag, task, **kwargs): | |
| # Return the most recent execution date for the dependent DAG. | |
| dag_run = get_most_recent_dag_run(task.external_dag_id) | |
| dag_execution_date = dag.start_date if dag_run is None else dag_run.execution_date | |
| return pendulum.instance(dag_execution_date) | |
| with DAG( | |
| "example_A", | |
| schedule_interval="@daily", | |
| start_date=datetime(2020, 10, 24), | |
| tags=["testing"], | |
| ) as dag: | |
| check = ExternalTaskSensor( | |
| task_id="check", | |
| external_dag_id="example_A_dependency", | |
| external_task_id=None, | |
| timeout=86_400, | |
| allowed_states=("success", ), | |
| failed_states=("failed", ), | |
| execution_date_fn=execution_date_fn, | |
| mode="reschedule", | |
| poke_interval=60 # check every 60 seconds if we are waiting on the dependent DAG | |
| ) | |
| ok = DummyOperator( | |
| task_id="done" | |
| ) | |
| check >> ok | |
| # If I set example_A running first,... then 'check' gets set up_for_reschedule, and it has two dags set as running. So it is waiting for the dependency to be executed. | |
| # If I then un-pause it in the web UI, and trigger the dependency: | |
| # > airflow dags trigger example_A_dependency | |
| # Then the dependent DAG will run. | |
| # When the dependent DAG is finished, example_A will run. | |
| # If we "clear" the result from the dependent DAG, then the example_A DAGs will wait because it sees no result. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| from datetime import datetime | |
| from airflow import DAG | |
| from airflow.operators.python import PythonOperator | |
| from airflow.operators.dummy import DummyOperator | |
| from time import sleep | |
| sleep_2min = lambda **_: sleep(120) | |
| with DAG( | |
| "example_A_dependency", | |
| schedule_interval=None, | |
| start_date=datetime(2020, 10, 24), | |
| tags=["testing"], | |
| ) as dag: | |
| a = DummyOperator(task_id="a") | |
| b = PythonOperator( | |
| task_id="sleep", | |
| python_callable=sleep_2min | |
| ) | |
| c = DummyOperator(task_id="c") | |
| a >> b >> c |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment