Created
June 17, 2022 19:24
-
-
Save gfranxman/109f3e1df0916c155a6b0ce49c848a6a to your computer and use it in GitHub Desktop.
skip all airflow catchup runs.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def abort_on_catchup(**context): | |
""" | |
This function determines whether to continue to the `next_task` or skip to 'end' | |
using the "next" schedule interval. | |
""" | |
# "Catchups" during this window are allowed. | |
# This is just to cover for late startingjobs. | |
allowed_execution_start_window = 10 # minutes. | |
this_dag = context.get("dag") | |
this_task = context.get("task") | |
tasks = this_dag.tasks | |
last_task = tasks[-2] # I don't know why,but we're the last, so -2 is the true last | |
next_tasks = list(this_task.downstream_task_ids) | |
current_time = now("UTC") | |
if context["run_id"].startswith("manual"): | |
logging.info("manual tasks can always proceed.") | |
return next_tasks | |
elif ( | |
current_time.subtract(minutes=allowed_execution_start_window) | |
< context["next_execution_date"] | |
): | |
return next_tasks | |
else: | |
logging.info( | |
f"This appears to be a skippable catchup run," | |
f" outside the {allowed_execution_start_window} minute execution window." | |
) | |
logging.info( | |
f"{current_time=} is greater than next_execution_date: {context['next_execution_date']}" | |
) | |
logging.info(f"current_time: {current_time}") | |
logging.info( | |
f"Next scheduled execution datetime: {context['next_execution_date'].add(days=1)}" | |
) | |
logging.error(f"{last_task.task_id=}") | |
return [last_task.task_id] | |
branch = BranchPythonOperator( | |
task_id="skip_catchups", python_callable=abort_on_catchup | |
) | |
tasks.append(branch) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment