Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save MatrixManAtYrService/3901210eb9698c72492b0393a21500eb to your computer and use it in GitHub Desktop.
Save MatrixManAtYrService/3901210eb9698c72492b0393a21500eb to your computer and use it in GitHub Desktop.
An attempt to use the Debug Executor with a deferrable task
$ airflow dags test wait_1_async $(date -u +'%Y-%m-%dT%H:%M:%SZ')
INFO - Filling up the DagBag from /Users/matt/src/qa-scenario-dags/utility_dags
INFO - Adding to queue: ['<TaskInstance: wait_1_async.start 2021-07-31 21:26:55+00:00 [queued]>']
INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=wait_1_async
AIRFLOW_CTX_TASK_ID=start
AIRFLOW_CTX_EXECUTION_DATE=2021-07-31T21:26:55+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-07-31T21:26:55+00:00
after this task, we wait
INFO - Marking task as SUCCESS. dag_id=wait_1_async, task_id=start, execution_date=20210731T212655, start_date=20210731T212658, end_date=20210731T212703
INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 2 | succeeded: 1 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 2
INFO - Adding to queue: ['<TaskInstance: wait_1_async.wait_1_async 2021-07-31 21:26:55+00:00 [queued]>']
INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=wait_1_async
AIRFLOW_CTX_TASK_ID=wait_1_async
AIRFLOW_CTX_EXECUTION_DATE=2021-07-31T21:26:55+00:00
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-07-31T21:26:55+00:00
ERROR - Task failed with exception
Traceback (most recent call last):
File "/Users/matt/src/astronomer-airflow/airflow/models/taskinstance.py", line 1238, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/Users/matt/src/astronomer-airflow/airflow/models/taskinstance.py", line 1357, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/Users/matt/src/astronomer-airflow/airflow/models/taskinstance.py", line 1413, in _execute_task
result = execute_callable(context=context)
File "/Users/matt/src/astronomer-airflow/airflow/sensors/time_delta.py", line 59, in execute
target_dttm += self.delta
TypeError: unsupported operand type(s) for +=: 'NoneType' and 'datetime.timedelta'
INFO - Marking task as FAILED. dag_id=wait_1_async, task_id=wait_1_async, execution_date=20210731T212655, start_date=20210731T212658, end_date=20210731T212708
ERROR - Failed to execute task: unsupported operand type(s) for +=: 'NoneType' and 'datetime.timedelta'.
Traceback (most recent call last):
File "/Users/matt/src/astronomer-airflow/airflow/executors/debug_executor.py", line 79, in _run_task
ti._run_raw_task(job_id=ti.job_id, **params)
File "/Users/matt/src/astronomer-airflow/airflow/utils/session.py", line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/Users/matt/src/astronomer-airflow/airflow/models/taskinstance.py", line 1238, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/Users/matt/src/astronomer-airflow/airflow/models/taskinstance.py", line 1357, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/Users/matt/src/astronomer-airflow/airflow/models/taskinstance.py", line 1413, in _execute_task
result = execute_callable(context=context)
File "/Users/matt/src/astronomer-airflow/airflow/sensors/time_delta.py", line 59, in execute
target_dttm += self.delta
TypeError: unsupported operand type(s) for +=: 'NoneType' and 'datetime.timedelta'
ERROR - Task instance <TaskInstance: wait_1_async.wait_1_async 2021-07-31 21:26:55+00:00 [failed]> failed
ERROR - Deadlock; marking run <DagRun wait_1_async @ 2021-07-31T21:26:55+00:00: backfill__2021-07-31T21:26:55+00:00, externally triggered: False> failed
INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 1 | succeeded: 1 | running: 0 | failed: 1 | skipped: 0 | deadlocked: 0 | not ready: 1
ERROR - Task instance <TaskInstance: wait_1_async.end 2021-07-31 21:26:55+00:00 [upstream_failed]> with state upstream_failed
INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 1 | running: 0 | failed: 2 | skipped: 0 | deadlocked: 0 | not ready: 0
Some task instances failed:
DAG ID Task ID Execution date Try number
------------ ------------ ------------------------- ------------
wait_1_async end 2021-07-31 21:26:55+00:00 1
wait_1_async wait_1_async 2021-07-31 21:26:55+00:00 1
from datetime import time, timedelta
from airflow.decorators import dag, task
from airflow.sensors.time_delta import TimeDeltaSensor, TimeDeltaSensorAsync
from airflow.utils.dates import days_ago
@task
def start():
print("after this task, we wait")
@task
def end():
print("this task comes after a wait")
def make_dag(SensorClass, dag_id):
"""
There is more than one way to wait, call this with a type of sensor to get
a dag that waits using that sensor type.
"""
@dag(
schedule_interval="@once",
start_date=days_ago(2),
default_args={"owner": "airflow"},
catchup=False,
dag_id=dag_id,
)
def wait_1():
"""
A dag with a synchronous sensor that sleeps for 5 minutes
"""
start() >> SensorClass(task_id=dag_id, delta=timedelta(minutes=1)) >> end()
return wait_1()
sync = make_dag(TimeDeltaSensor, "wait_1_sync")
deferred = make_dag(TimeDeltaSensorAsync, "wait_1_async")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment