Created
July 31, 2021 21:32
-
-
Save MatrixManAtYrService/3901210eb9698c72492b0393a21500eb to your computer and use it in GitHub Desktop.
An attempt to use the Debug Executor with a deferrable task
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ airflow dags test wait_1_async $(date -u +'%Y-%m-%dT%H:%M:%SZ') | |
INFO - Filling up the DagBag from /Users/matt/src/qa-scenario-dags/utility_dags | |
INFO - Adding to queue: ['<TaskInstance: wait_1_async.start 2021-07-31 21:26:55+00:00 [queued]>'] | |
INFO - Exporting the following env vars: | |
AIRFLOW_CTX_DAG_OWNER=airflow | |
AIRFLOW_CTX_DAG_ID=wait_1_async | |
AIRFLOW_CTX_TASK_ID=start | |
AIRFLOW_CTX_EXECUTION_DATE=2021-07-31T21:26:55+00:00 | |
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-07-31T21:26:55+00:00 | |
after this task, we wait | |
INFO - Marking task as SUCCESS. dag_id=wait_1_async, task_id=start, execution_date=20210731T212655, start_date=20210731T212658, end_date=20210731T212703 | |
INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 2 | succeeded: 1 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 2 | |
INFO - Adding to queue: ['<TaskInstance: wait_1_async.wait_1_async 2021-07-31 21:26:55+00:00 [queued]>'] | |
INFO - Exporting the following env vars: | |
AIRFLOW_CTX_DAG_OWNER=airflow | |
AIRFLOW_CTX_DAG_ID=wait_1_async | |
AIRFLOW_CTX_TASK_ID=wait_1_async | |
AIRFLOW_CTX_EXECUTION_DATE=2021-07-31T21:26:55+00:00 | |
AIRFLOW_CTX_DAG_RUN_ID=backfill__2021-07-31T21:26:55+00:00 | |
ERROR - Task failed with exception | |
Traceback (most recent call last): | |
File "/Users/matt/src/astronomer-airflow/airflow/models/taskinstance.py", line 1238, in _run_raw_task | |
self._prepare_and_execute_task_with_callbacks(context, task) | |
File "/Users/matt/src/astronomer-airflow/airflow/models/taskinstance.py", line 1357, in _prepare_and_execute_task_with_callbacks | |
result = self._execute_task(context, task_copy) | |
File "/Users/matt/src/astronomer-airflow/airflow/models/taskinstance.py", line 1413, in _execute_task | |
result = execute_callable(context=context) | |
File "/Users/matt/src/astronomer-airflow/airflow/sensors/time_delta.py", line 59, in execute | |
target_dttm += self.delta | |
TypeError: unsupported operand type(s) for +=: 'NoneType' and 'datetime.timedelta' | |
INFO - Marking task as FAILED. dag_id=wait_1_async, task_id=wait_1_async, execution_date=20210731T212655, start_date=20210731T212658, end_date=20210731T212708 | |
ERROR - Failed to execute task: unsupported operand type(s) for +=: 'NoneType' and 'datetime.timedelta'. | |
Traceback (most recent call last): | |
File "/Users/matt/src/astronomer-airflow/airflow/executors/debug_executor.py", line 79, in _run_task | |
ti._run_raw_task(job_id=ti.job_id, **params) | |
File "/Users/matt/src/astronomer-airflow/airflow/utils/session.py", line 70, in wrapper | |
return func(*args, session=session, **kwargs) | |
File "/Users/matt/src/astronomer-airflow/airflow/models/taskinstance.py", line 1238, in _run_raw_task | |
self._prepare_and_execute_task_with_callbacks(context, task) | |
File "/Users/matt/src/astronomer-airflow/airflow/models/taskinstance.py", line 1357, in _prepare_and_execute_task_with_callbacks | |
result = self._execute_task(context, task_copy) | |
File "/Users/matt/src/astronomer-airflow/airflow/models/taskinstance.py", line 1413, in _execute_task | |
result = execute_callable(context=context) | |
File "/Users/matt/src/astronomer-airflow/airflow/sensors/time_delta.py", line 59, in execute | |
target_dttm += self.delta | |
TypeError: unsupported operand type(s) for +=: 'NoneType' and 'datetime.timedelta' | |
ERROR - Task instance <TaskInstance: wait_1_async.wait_1_async 2021-07-31 21:26:55+00:00 [failed]> failed | |
ERROR - Deadlock; marking run <DagRun wait_1_async @ 2021-07-31T21:26:55+00:00: backfill__2021-07-31T21:26:55+00:00, externally triggered: False> failed | |
INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 1 | succeeded: 1 | running: 0 | failed: 1 | skipped: 0 | deadlocked: 0 | not ready: 1 | |
ERROR - Task instance <TaskInstance: wait_1_async.end 2021-07-31 21:26:55+00:00 [upstream_failed]> with state upstream_failed | |
INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 1 | running: 0 | failed: 2 | skipped: 0 | deadlocked: 0 | not ready: 0 | |
Some task instances failed: | |
DAG ID Task ID Execution date Try number | |
------------ ------------ ------------------------- ------------ | |
wait_1_async end 2021-07-31 21:26:55+00:00 1 | |
wait_1_async wait_1_async 2021-07-31 21:26:55+00:00 1 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import time, timedelta | |
from airflow.decorators import dag, task | |
from airflow.sensors.time_delta import TimeDeltaSensor, TimeDeltaSensorAsync | |
from airflow.utils.dates import days_ago | |
@task | |
def start(): | |
print("after this task, we wait") | |
@task | |
def end(): | |
print("this task comes after a wait") | |
def make_dag(SensorClass, dag_id): | |
""" | |
There is more than one way to wait, call this with a type of sensor to get | |
a dag that waits using that sensor type. | |
""" | |
@dag( | |
schedule_interval="@once", | |
start_date=days_ago(2), | |
default_args={"owner": "airflow"}, | |
catchup=False, | |
dag_id=dag_id, | |
) | |
def wait_1(): | |
""" | |
A dag with a synchronous sensor that sleeps for 5 minutes | |
""" | |
start() >> SensorClass(task_id=dag_id, delta=timedelta(minutes=1)) >> end() | |
return wait_1() | |
sync = make_dag(TimeDeltaSensor, "wait_1_sync") | |
deferred = make_dag(TimeDeltaSensorAsync, "wait_1_async") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment