Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save MatrixManAtYrService/6b27378776470491eb20b60e01cfb675 to your computer and use it in GitHub Desktop.
Save MatrixManAtYrService/6b27378776470491eb20b60e01cfb675 to your computer and use it in GitHub Desktop.
INFO - Adding to queue: ['<TaskInstance: two_virtualenv.a ']
hi
INFO - Marking task as SUCCESS. dag_id=two_virtualenv, task_id=a
ERROR - Failed to execute task: cannot pickle 'module' object.
Traceback (most recent call last):
File "/Users/matt/src/airflow/airflow/executors/debug_executor.py", line 79, in _run_task
ti._run_raw_task(job_id=ti.job_id, **params) # pylint: disable=protected-access
File "/Users/matt/src/airflow/airflow/utils/session.py", line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/Users/matt/src/airflow/airflow/models/taskinstance.py", line 1201, in _run_raw_task
self._run_mini_scheduler_on_child_tasks(session)
File "/Users/matt/src/airflow/airflow/utils/session.py", line 67, in wrapper
return func(*args, **kwargs)
File "/Users/matt/src/airflow/airflow/models/taskinstance.py", line 1223, in _run_mini_scheduler_on_child_tasks
partial_dag = self.task.dag.partial_subset(
File "/Users/matt/src/airflow/airflow/models/dag.py", line 1490, in partial_subset
dag.task_dict = {
File "/Users/matt/src/airflow/airflow/models/dag.py", line 1491, in <dictcomp>
t.task_id: copy.deepcopy(t, {id(t.dag): dag}) # type: ignore
File "/usr/local/Cellar/[email protected]/3.9.4/Frameworks/Python.framework/Versions/3.9/lib/python3.9/copy.py", line 153, in deepcopy
y = copier(memo)
File "/Users/matt/src/airflow/airflow/models/baseoperator.py", line 961, in __deepcopy__
setattr(result, k, copy.deepcopy(v, memo)) # noqa
File "/usr/local/Cellar/[email protected]/3.9.4/Frameworks/Python.framework/Versions/3.9/lib/python3.9/copy.py", line 161, in deepcopy
rv = reductor(4)
TypeError: cannot pickle 'module' object
ERROR - Task instance <TaskInstance: two_virtualenv.a 2021-05-11 00:00:00+00:00 [failed]> failed
[2021-05-11 21:16:45,136] {dagrun.py:460} ERROR - Deadlock; marking run <DagRun two_virtualenv @ 2021-05-11 00:00:00+00:00: backfill__2021-05-11T00:00:00+00:00, externally triggered: False> failed
INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 1 | succeeded: 0 | running: 0 | failed: 1 | skipped: 0 | deadlocked: 0 | not ready: 1
ERROR - Task instance <TaskInstance: two_virtualenv.b 2021-05-11 00:00:00+00:00 [upstream_failed]> with state upstream_failed
{backfill_job.py:377} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 0 | running: 0 | failed: 2 | skipped: 0 | deadlocked: 0 | not ready: 0
Some task instances failed:
DAG ID Task ID Execution date Try number
-------------- --------- ------------------------- ------------
two_virtualenv a 2021-05-11 00:00:00+00:00 1
two_virtualenv b 2021-05-11 00:00:00+00:00 1
from airflow import DAG
from airflow.operators.python import PythonOperator, PythonVirtualenvOperator
from airflow.utils.dates import days_ago
def callable():
print("hi")
with DAG(
dag_id="two_virtualenv",
default_args={"owner: Airflow"},
schedule_interval=None,
start_date=days_ago(1),
) as dag:
a = PythonOperator(
task_id="a",
python_callable=callable,
)
# b = PythonOperator( # works
b = PythonVirtualenvOperator( # doesn't work
task_id="b",
python_callable=callable,
)
a >> b
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment