Skip to content

Instantly share code, notes, and snippets.

@MatrixManAtYrService
Last active September 12, 2021 03:16
Show Gist options
  • Save MatrixManAtYrService/5c15e3018c366210b7318724e88537ed to your computer and use it in GitHub Desktop.
Save MatrixManAtYrService/5c15e3018c366210b7318724e88537ed to your computer and use it in GitHub Desktop.
Running airflow with a faked clock
#! /usr/bin/env bash
# run the script that calculates the appropriate time to fake
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )"
OFFSET_TIME="$(python $DIR/containertime.py $FIRST_FAKED_TIME)"
# run airflow, faked at that time
faketime "$OFFSET_TIME" actual_airflow $@
import textwrap
import sqlite3
import psycopg2
from airflow import settings
import sys
from datetime import datetime
create_sqlite = textwrap.dedent(
"""
CREATE TABLE IF NOT EXISTS faketime (
actual_time_of_first_call TEXT NOT NULL
);
""")
create_postgres = textwrap.dedent(
"""
CREATE TABLE IF NOT EXISTS faketime (
actual_time_of_first_call VARCHAR(255) NOT NULL
);
""")
get_offset = "SELECT * FROM faketime;"
def get_or_set(now, faked) -> datetime:
# create the 'faketime' table
url = settings.engine.url
if url.get_backend_name() == 'sqlite':
print("Using sqlite backend", file=sys.stderr)
conn = sqlite3.connect(url.database)
cursor = conn.cursor()
print(create_sqlite, file=sys.stderr)
cursor.execute(create_sqlite)
elif url.get_backend_name() == 'postgresql':
print("Using postgresql backend", file=sys.stderr)
host = url.host or ""
port = str(url.port or "5432")
user = url.username or ""
password = url.password or ""
schema = url.database
conn = psycopg2.connect(host=host, user=user, port=port, password=password, dbname=schema)
cursor = conn.cursor()
print(create_postgres, file=sys.stderr)
cursor.execute(create_postgres)
# see if it has data
print(get_offset, file=sys.stderr)
cursor.execute(get_offset)
start_clock_str = cursor.fetchone()
if start_clock_str:
start_clock_str = start_clock_str[0]
print(f"Found previous faketime call at {start_clock_str}. Offsetting...", file=sys.stderr)
start_clock = datetime.fromisoformat(start_clock_str)
else:
print("No previously faketime call, assuming this is the first.", file=sys.stderr)
sql = f"INSERT INTO faketime VALUES('{now.isoformat()}');"
print(sql, file=sys.stderr)
cursor.execute(sql)
start_clock = now
# mischeif managed
cursor.close()
conn.commit()
conn.close()
# this faketime call = first faketime call + time elapsed since that first call
new_fake = faked + (now - start_clock)
print(f"Now: {now}, First Fake: {start_clock}", file=sys.stderr)
print(f"Faking time: {new_fake.isoformat()}", file=sys.stderr)
return new_fake
if __name__ == "__main__":
now = datetime.now()
faked = datetime.fromisoformat(sys.argv[1])
offset_fake = get_or_set(now, faked)
print(offset_fake.isoformat())
FROM quay.io/astronomer/ap-airflow-dev:2.2.0-buster-43536
USER root
# deps for faking time and talking to postgres databases
RUN apt-get update && apt-get install -y faketime libpq-dev build-essential
# place shim where airflow was, `airflow` is now `actual_airflow`
RUN sh -c 'set -x ; mv $(which airflow) $(dirname $(which airflow))/actual_airflow'
COPY ./airflow_wrapper.sh /home/root/
COPY ./containertime.py /home/root/
RUN chmod +x /home/root/airflow_wrapper.sh /home/root/containertime.py
RUN sh -c 'set -x; cp /home/root/airflow_wrapper.sh $(dirname $(which actual_airflow))/airflow'
RUN sh -c 'set -x; cp /home/root/containertime.py $(dirname $(which actual_airflow))/containertime.py'
COPY requirements.txt /home/astro
USER astro
RUN pip install -r /home/astro/requirements.txt
# indicate the desired time with a variable
ENV FIRST_FAKED_TIME=2000-01-01
Mostly...
Each airflow process in a container does in fact have a faked time, which was the goal.
The first scheduled task runs without trouble. For some reason that I don't yet
understand, subsequent scheduled tasks cause errors in the scheduler:
ERROR - Execution date is in future: 2000-01-01 00:02:00+00:00
❯ docker logs f5f34d634fc6 | head -n 300
Using postgresql backend
CREATE TABLE IF NOT EXISTS faketime (
actual_time_of_first_call VARCHAR(255) NOT NULL
Waiting for host: postgres 5432
);
SELECT * FROM faketime;
No previously faketime call, assuming this is the first.
INSERT INTO faketime VALUES('2021-09-12T02:54:19.613468');
Now: 2021-09-12 02:54:19.613468, First Fake: 2021-09-12 02:54:19.613468
Faking time: 2000-01-01T00:00:00
airflow command error: argument GROUP_OR_COMMAND: `airflow upgradedb` command, has been removed, please use `airflow db upgrade`, see help above.
usage: airflow [-h] GROUP_OR_COMMAND ...
positional arguments:
GROUP_OR_COMMAND
Groups:
celery Celery components
config View configuration
connections Manage connections
dags Manage DAGs
db Database operations
jobs Manage jobs
kubernetes Tools to help run the KubernetesExecutor
pools Manage pools
providers Display providers
roles Manage roles
tasks Manage tasks
users Manage users
variables Manage variables
Commands:
cheat-sheet Display cheat sheet
info Show information about current Airflow and environment
kerberos Start a kerberos ticket renewer
plugins Dump information about loaded plugins
rotate-fernet-key
Rotate encrypted connection credentials and variables
scheduler Start a scheduler instance
standalone Run an all-in-one copy of Airflow
sync-perm Update permissions for existing roles and optionally DAGs
triggerer Start a triggerer instance
Using postgresql backend
version Show the version
webserver Start a Airflow webserver instance
optional arguments:
-h, --help show this help message and exit
CREATE TABLE IF NOT EXISTS faketime (
actual_time_of_first_call VARCHAR(255) NOT NULL
);
SELECT * FROM faketime;
Found previous faketime call at 2021-09-12T02:54:19.613468. Offsetting...
Now: 2021-09-12 02:54:25.682741, First Fake: 2021-09-12 02:54:19.613468
Faking time: 2000-01-01T00:00:06.069273
[2000-01-01 00:00:11,564] {cli_action_loggers.py:105} WARNING - Failed to log action with (psycopg2.errors.UndefinedTable) relation "log" does not exist
LINE 1: INSERT INTO log (dttm, dag_id, task_id, event, execution_dat...
^
[SQL: INSERT INTO log (dttm, dag_id, task_id, event, execution_date, owner, extra) VALUES (%(dttm)s, %(dag_id)s, %(task_id)s, %(event)s, %(execution_date)s, %(owner)s, %(extra)s) RETURNING log.id]
[parameters: {'dttm': datetime.datetime(2000, 1, 1, 0, 0, 11, 553224, tzinfo=Timezone('UTC')), 'dag_id': None, 'task_id': None, 'event': 'cli_upgradedb', 'execution_date': None, 'owner': 'astro', 'extra': '{"host_name": "f5f34d634fc6", "full_command": "[\'/usr/local/bin/actual_airflow\', \'db\', \'upgrade\']"}'}]
INFO [alembic.runtime.migration] Context impl PostgresqlImpl.
(Background on this error at: http://sqlalche.me/e/13/f405)
INFO [alembic.runtime.migration] Will assume transactional DDL.
DB: postgresql://postgres:***@postgres:5432
/usr/local/lib/python3.9/site-packages/airflow/providers_manager.py:486 DeprecationWarning: The provider apache-airflow-providers-amazon uses `hook-class-names` property in provider-info and has no `connection-types` one. The 'hook-class-names' property has been deprecated in favour of 'connection-types' in Airflow 2.2. Use **both** in case you want to have backwards compatibility with Airflow < 2.2
[2000-01-01 00:00:11,570] {plugin.py:75} INFO - Creating DB tables for astronomer.airflow.version_check.plugin
/usr/local/lib/python3.9/site-packages/airflow/providers_manager.py:486 DeprecationWarning: The provider apache-airflow-providers-elasticsearch uses `hook-class-names` property in provider-info and has no `connection-types` one. The 'hook-class-names' property has been deprecated in favour of 'connection-types' in Airflow 2.2. Use **both** in case you want to have backwards compatibility with Airflow < 2.2
[2000-01-01 00:00:11,585] {plugin.py:80} INFO - Created
/usr/local/lib/python3.9/site-packages/airflow/providers_manager.py:486 DeprecationWarning: The provider apache-airflow-providers-ftp uses `hook-class-names` property in provider-info and has no `connection-types` one. The 'hook-class-names' property has been deprecated in favour of 'connection-types' in Airflow 2.2. Use **both** in case you want to have backwards compatibility with Airflow < 2.2
[2000-01-01 00:00:11,821] {db.py:722} INFO - Creating tables
WARNI [airflow.providers_manager] The hook connection type 'google_cloud_platform' is registered twice in the package 'apache-airflow-providers-google' with different class names: 'airflow.providers.google.common.hooks.base_google.GoogleBaseHook' and 'airflow.providers.google.cloud.hooks.bigquery.BigQueryHook'. Please fix it!
WARNI [airflow.providers_manager] Exception when importing 'airflow.providers.google.leveldb.hooks.leveldb.LevelDBHook' from 'apache-airflow-providers-google' package: No module named 'plyvel'
/usr/local/lib/python3.9/site-packages/airflow/providers_manager.py:486 DeprecationWarning: The provider apache-airflow-providers-google uses `hook-class-names` property in provider-info and has no `connection-types` one. The 'hook-class-names' property has been deprecated in favour of 'connection-types' in Airflow 2.2. Use **both** in case you want to have backwards compatibility with Airflow < 2.2
/usr/local/lib/python3.9/site-packages/airflow/providers_manager.py:486 DeprecationWarning: The provider apache-airflow-providers-http uses `hook-class-names` property in provider-info and has no `connection-types` one. The 'hook-class-names' property has been deprecated in favour of 'connection-types' in Airflow 2.2. Use **both** in case you want to have backwards compatibility with Airflow < 2.2
/usr/local/lib/python3.9/site-packages/airflow/providers_manager.py:486 DeprecationWarning: The provider apache-airflow-providers-imap uses `hook-class-names` property in provider-info and has no `connection-types` one. The 'hook-class-names' property has been deprecated in favour of 'connection-types' in Airflow 2.2. Use **both** in case you want to have backwards compatibility with Airflow < 2.2
/usr/local/lib/python3.9/site-packages/azure/cosmos/session.py:186 SyntaxWarning: "is not" with a literal. Did you mean "!="?
/usr/local/lib/python3.9/site-packages/azure/storage/common/_connection.py:82 SyntaxWarning: "is" with a literal. Did you mean "=="?
WARNI [airflow.providers_manager] The hook connection type 'azure' is registered twice in the package 'apache-airflow-providers-microsoft-azure' with different class names: 'airflow.providers.microsoft.azure.hooks.base_azure.AzureBaseHook' and 'airflow.providers.microsoft.azure.hooks.azure_container_instance.AzureContainerInstanceHook'. Please fix it!
/usr/local/lib/python3.9/site-packages/airflow/providers_manager.py:486 DeprecationWarning: The provider apache-airflow-providers-microsoft-azure uses `hook-class-names` property in provider-info and has no `connection-types` one. The 'hook-class-names' property has been deprecated in favour of 'connection-types' in Airflow 2.2. Use **both** in case you want to have backwards compatibility with Airflow < 2.2
/usr/local/lib/python3.9/site-packages/airflow/providers_manager.py:486 DeprecationWarning: The provider apache-airflow-providers-mysql uses `hook-class-names` property in provider-info and has no `connection-types` one. The 'hook-class-names' property has been deprecated in favour of 'connection-types' in Airflow 2.2. Use **both** in case you want to have backwards compatibility with Airflow < 2.2
/usr/local/lib/python3.9/site-packages/airflow/providers_manager.py:486 DeprecationWarning: The provider apache-airflow-providers-postgres uses `hook-class-names` property in provider-info and has no `connection-types` one. The 'hook-class-names' property has been deprecated in favour of 'connection-types' in Airflow 2.2. Use **both** in case you want to have backwards compatibility with Airflow < 2.2
/usr/local/lib/python3.9/site-packages/airflow/providers_manager.py:486 DeprecationWarning: The provider apache-airflow-providers-redis uses `hook-class-names` property in provider-info and has no `connection-types` one. The 'hook-class-names' property has been deprecated in favour of 'connection-types' in Airflow 2.2. Use **both** in case you want to have backwards compatibility with Airflow < 2.2
/usr/local/lib/python3.9/site-packages/airflow/providers_manager.py:486 DeprecationWarning: The provider apache-airflow-providers-slack uses `hook-class-names` property in provider-info and has no `connection-types` one. The 'hook-class-names' property has been deprecated in favour of 'connection-types' in Airflow 2.2. Use **both** in case you want to have backwards compatibility with Airflow < 2.2
/usr/local/lib/python3.9/site-packages/airflow/providers_manager.py:486 DeprecationWarning: The provider apache-airflow-providers-sqlite uses `hook-class-names` property in provider-info and has no `connection-types` one. The 'hook-class-names' property has been deprecated in favour of 'connection-types' in Airflow 2.2. Use **both** in case you want to have backwards compatibility with Airflow < 2.2
/usr/local/lib/python3.9/site-packages/airflow/providers_manager.py:486 DeprecationWarning: The provider apache-airflow-providers-ssh uses `hook-class-names` property in provider-info and has no `connection-types` one. The 'hook-class-names' property has been deprecated in favour of 'connection-types' in Airflow 2.2. Use **both** in case you want to have backwards compatibility with Airflow < 2.2
/usr/local/lib/python3.9/site-packages/airflow/providers_manager.py:486 DeprecationWarning: The provider apache-airflow-providers-amazon uses `hook-class-names` property in provider-info and has no `connection-types` one. The 'hook-class-names' property has been deprecated in favour of 'connection-types' in Airflow 2.2. Use **both** in case you want to have backwards compatibility with Airflow < 2.2
/usr/local/lib/python3.9/site-packages/airflow/providers_manager.py:486 DeprecationWarning: The provider apache-airflow-providers-elasticsearch uses `hook-class-names` property in provider-info and has no `connection-types` one. The 'hook-class-names' property has been deprecated in favour of 'connection-types' in Airflow 2.2. Use **both** in case you want to have backwards compatibility with Airflow < 2.2
/usr/local/lib/python3.9/site-packages/airflow/providers_manager.py:486 DeprecationWarning: The provider apache-airflow-providers-ftp uses `hook-class-names` property in provider-info and has no `connection-types` one. The 'hook-class-names' property has been deprecated in favour of 'connection-types' in Airflow 2.2. Use **both** in case you want to have backwards compatibility with Airflow < 2.2
WARNI [airflow.providers_manager] The hook connection type 'google_cloud_platform' is registered twice in the package 'apache-airflow-providers-google' with different class names: 'airflow.providers.google.common.hooks.base_google.GoogleBaseHook' and 'airflow.providers.google.cloud.hooks.bigquery.BigQueryHook'. Please fix it!
WARNI [airflow.providers_manager] Exception when importing 'airflow.providers.google.leveldb.hooks.leveldb.LevelDBHook' from 'apache-airflow-providers-google' package: No module named 'plyvel'
/usr/local/lib/python3.9/site-packages/airflow/providers_manager.py:486 DeprecationWarning: The provider apache-airflow-providers-google uses `hook-class-names` property in provider-info and has no `connection-types` one. The 'hook-class-names' property has been deprecated in favour of 'connection-types' in Airflow 2.2. Use **both** in case you want to have backwards compatibility with Airflow < 2.2
/usr/local/lib/python3.9/site-packages/airflow/providers_manager.py:486 DeprecationWarning: The provider apache-airflow-providers-http uses `hook-class-names` property in provider-info and has no `connection-types` one. The 'hook-class-names' property has been deprecated in favour of 'connection-types' in Airflow 2.2. Use **both** in case you want to have backwards compatibility with Airflow < 2.2
/usr/local/lib/python3.9/site-packages/airflow/providers_manager.py:486 DeprecationWarning: The provider apache-airflow-providers-imap uses `hook-class-names` property in provider-info and has no `connection-types` one. The 'hook-class-names' property has been deprecated in favour of 'connection-types' in Airflow 2.2. Use **both** in case you want to have backwards compatibility with Airflow < 2.2
WARNI [airflow.providers_manager] The hook connection type 'azure' is registered twice in the package 'apache-airflow-providers-microsoft-azure' with different class names: 'airflow.providers.microsoft.azure.hooks.base_azure.AzureBaseHook' and 'airflow.providers.microsoft.azure.hooks.azure_container_instance.AzureContainerInstanceHook'. Please fix it!
INFO [alembic.runtime.migration] Running upgrade -> e3a246e0dc1, current schema
INFO [alembic.runtime.migration] Running upgrade e3a246e0dc1 -> 1507a7289a2f, create is_encrypted
INFO [alembic.runtime.migration] Running upgrade 1507a7289a2f -> 13eb55f81627, maintain history for compatibility with earlier migrations
INFO [alembic.runtime.migration] Running upgrade 13eb55f81627 -> 338e90f54d61, More logging into task_instance
INFO [alembic.runtime.migration] Running upgrade 338e90f54d61 -> 52d714495f0, job_id indices
INFO [alembic.runtime.migration] Running upgrade 52d714495f0 -> 502898887f84, Adding extra to Log
INFO [alembic.runtime.migration] Running upgrade 502898887f84 -> 1b38cef5b76e, add dagrun
INFO [alembic.runtime.migration] Running upgrade 1b38cef5b76e -> 2e541a1dcfed, task_duration
INFO [alembic.runtime.migration] Running upgrade 2e541a1dcfed -> 40e67319e3a9, dagrun_config
INFO [alembic.runtime.migration] Running upgrade 40e67319e3a9 -> 561833c1c74b, add password column to user
INFO [alembic.runtime.migration] Running upgrade 561833c1c74b -> 4446e08588, dagrun start end
INFO [alembic.runtime.migration] Running upgrade 4446e08588 -> bbc73705a13e, Add notification_sent column to sla_miss
INFO [alembic.runtime.migration] Running upgrade bbc73705a13e -> bba5a7cfc896, Add a column to track the encryption state of the 'Extra' field in connection
INFO [alembic.runtime.migration] Running upgrade bba5a7cfc896 -> 1968acfc09e3, add is_encrypted column to variable table
INFO [alembic.runtime.migration] Running upgrade 1968acfc09e3 -> 2e82aab8ef20, rename user table
INFO [alembic.runtime.migration] Running upgrade 2e82aab8ef20 -> 211e584da130, add TI state index
INFO [alembic.runtime.migration] Running upgrade 211e584da130 -> 64de9cddf6c9, add task fails journal table
INFO [alembic.runtime.migration] Running upgrade 64de9cddf6c9 -> f2ca10b85618, add dag_stats table
INFO [alembic.runtime.migration] Running upgrade f2ca10b85618 -> 4addfa1236f1, Add fractional seconds to mysql tables
INFO [alembic.runtime.migration] Running upgrade 4addfa1236f1 -> 8504051e801b, xcom dag task indices
INFO [alembic.runtime.migration] Running upgrade 8504051e801b -> 5e7d17757c7a, add pid field to TaskInstance
INFO [alembic.runtime.migration] Running upgrade 5e7d17757c7a -> 127d2bf2dfa7, Add dag_id/state index on dag_run table
INFO [alembic.runtime.migration] Running upgrade 127d2bf2dfa7 -> cc1e65623dc7, add max tries column to task instance
INFO [alembic.runtime.migration] Running upgrade cc1e65623dc7 -> bdaa763e6c56, Make xcom value column a large binary
INFO [alembic.runtime.migration] Running upgrade bdaa763e6c56 -> 947454bf1dff, add ti job_id index
INFO [alembic.runtime.migration] Running upgrade 947454bf1dff -> d2ae31099d61, Increase text size for MySQL (not relevant for other DBs' text types)
INFO [alembic.runtime.migration] Running upgrade d2ae31099d61 -> 0e2a74e0fc9f, Add time zone awareness
INFO [alembic.runtime.migration] Running upgrade d2ae31099d61 -> 33ae817a1ff4, kubernetes_resource_checkpointing
INFO [alembic.runtime.migration] Running upgrade 33ae817a1ff4 -> 27c6a30d7c24, kubernetes_resource_checkpointing
INFO [alembic.runtime.migration] Running upgrade 27c6a30d7c24 -> 86770d1215c0, add kubernetes scheduler uniqueness
INFO [alembic.runtime.migration] Running upgrade 86770d1215c0, 0e2a74e0fc9f -> 05f30312d566, merge heads
INFO [alembic.runtime.migration] Running upgrade 05f30312d566 -> f23433877c24, fix mysql not null constraint
INFO [alembic.runtime.migration] Running upgrade f23433877c24 -> 856955da8476, fix sqlite foreign key
INFO [alembic.runtime.migration] Running upgrade 856955da8476 -> 9635ae0956e7, index-faskfail
INFO [alembic.runtime.migration] Running upgrade 9635ae0956e7 -> dd25f486b8ea, add idx_log_dag
INFO [alembic.runtime.migration] Running upgrade dd25f486b8ea -> bf00311e1990, add index to taskinstance
INFO [alembic.runtime.migration] Running upgrade 9635ae0956e7 -> 0a2a5b66e19d, add task_reschedule table
INFO [alembic.runtime.migration] Running upgrade 0a2a5b66e19d, bf00311e1990 -> 03bc53e68815, merge_heads_2
INFO [alembic.runtime.migration] Running upgrade 03bc53e68815 -> 41f5f12752f8, add superuser field
INFO [alembic.runtime.migration] Running upgrade 41f5f12752f8 -> c8ffec048a3b, add fields to dag
INFO [alembic.runtime.migration] Running upgrade c8ffec048a3b -> dd4ecb8fbee3, Add schedule interval to dag
INFO [alembic.runtime.migration] Running upgrade dd4ecb8fbee3 -> 939bb1e647c8, task reschedule fk on cascade delete
INFO [alembic.runtime.migration] Running upgrade 939bb1e647c8 -> 6e96a59344a4, Make TaskInstance.pool not nullable
INFO [alembic.runtime.migration] Running upgrade 6e96a59344a4 -> d38e04c12aa2, add serialized_dag table
INFO [alembic.runtime.migration] Running upgrade d38e04c12aa2 -> b3b105409875, add root_dag_id to DAG
INFO [alembic.runtime.migration] Running upgrade 6e96a59344a4 -> 74effc47d867, change datetime to datetime2(6) on MSSQL tables
INFO [alembic.runtime.migration] Running upgrade 939bb1e647c8 -> 004c1210f153, increase queue name size limit
INFO [alembic.runtime.migration] Running upgrade c8ffec048a3b -> a56c9515abdc, Remove dag_stat table
INFO [alembic.runtime.migration] Running upgrade a56c9515abdc, 004c1210f153, 74effc47d867, b3b105409875 -> 08364691d074, Merge the four heads back together
INFO [alembic.runtime.migration] Running upgrade 08364691d074 -> fe461863935f, increase_length_for_connection_password
INFO [alembic.runtime.migration] Running upgrade fe461863935f -> 7939bcff74ba, Add DagTags table
INFO [alembic.runtime.migration] Running upgrade 7939bcff74ba -> a4c2fd67d16b, add pool_slots field to task_instance
INFO [alembic.runtime.migration] Running upgrade a4c2fd67d16b -> 852ae6c715af, Add RenderedTaskInstanceFields table
INFO [alembic.runtime.migration] Running upgrade 852ae6c715af -> 952da73b5eff, add dag_code table
INFO [alembic.runtime.migration] Running upgrade 952da73b5eff -> a66efa278eea, Add Precision to execution_date in RenderedTaskInstanceFields table
INFO [alembic.runtime.migration] Running upgrade a66efa278eea -> da3f683c3a5a, Add dag_hash Column to serialized_dag table
INFO [alembic.runtime.migration] Running upgrade da3f683c3a5a -> 92c57b58940d, Create FAB Tables
INFO [alembic.runtime.migration] Running upgrade 92c57b58940d -> 03afc6b6f902, Increase length of FAB ab_view_menu.name column
INFO [alembic.runtime.migration] Running upgrade 03afc6b6f902 -> cf5dc11e79ad, drop_user_and_chart
INFO [alembic.runtime.migration] Running upgrade cf5dc11e79ad -> bbf4a7ad0465, Remove id column from xcom
INFO [alembic.runtime.migration] Running upgrade bbf4a7ad0465 -> b25a55525161, Increase length of pool name
INFO [alembic.runtime.migration] Running upgrade b25a55525161 -> 3c20cacc0044, Add DagRun run_type
INFO [alembic.runtime.migration] Running upgrade 3c20cacc0044 -> 8f966b9c467a, Set conn_type as non-nullable
INFO [alembic.runtime.migration] Running upgrade 8f966b9c467a -> 8d48763f6d53, add unique constraint to conn_id
INFO [alembic.runtime.migration] Running upgrade 8d48763f6d53 -> e38be357a868, Add sensor_instance table
INFO [alembic.runtime.migration] Running upgrade e38be357a868 -> b247b1e3d1ed, Add queued by Job ID to TI
INFO [alembic.runtime.migration] Running upgrade b247b1e3d1ed -> e1a11ece99cc, Add external executor ID to TI
INFO [alembic.runtime.migration] Running upgrade e1a11ece99cc -> bef4f3d11e8b, Drop KubeResourceVersion and KubeWorkerId
INFO [alembic.runtime.migration] Running upgrade bef4f3d11e8b -> 98271e7606e2, Add scheduling_decision to DagRun and DAG
INFO [alembic.runtime.migration] Running upgrade 98271e7606e2 -> 52d53670a240, fix_mssql_exec_date_rendered_task_instance_fields_for_MSSQL
INFO [alembic.runtime.migration] Running upgrade 52d53670a240 -> 364159666cbd, Add creating_job_id to DagRun table
INFO [alembic.runtime.migration] Running upgrade 364159666cbd -> 45ba3f1493b9, add-k8s-yaml-to-rendered-templates
INFO [alembic.runtime.migration] Running upgrade 45ba3f1493b9 -> 849da589634d, Prefix DAG permissions.
INFO [alembic.runtime.migration] Running upgrade 849da589634d -> 2c6edca13270, Resource based permissions.
INFO [alembic.runtime.migration] Running upgrade 2c6edca13270 -> 61ec73d9401f, Add description field to connection
INFO [alembic.runtime.migration] Running upgrade 61ec73d9401f -> 64a7d6477aae, fix description field in connection to be text
INFO [alembic.runtime.migration] Running upgrade 64a7d6477aae -> e959f08ac86c, Change field in DagCode to MEDIUMTEXT for MySql
INFO [alembic.runtime.migration] Running upgrade e959f08ac86c -> 82b7c48c147f, Remove can_read permission on config resource for User and Viewer role
[2000-01-01 00:00:16,827] {manager.py:788} WARNING - No user yet created, use flask fab command to do it.
[2000-01-01 00:00:16,827] {manager.py:788} WARNING - No user yet created, use flask fab command to do it.
[2000-01-01 00:00:17,232] {providers_manager.py:470} WARNING - The hook connection type 'google_cloud_platform' is registered twice in the package 'apache-airflow-providers-google' with different class names: 'airflow.providers.google.common.hooks.base_google.GoogleBaseHook' and 'airflow.providers.google.cloud.hooks.bigquery.BigQueryHook'. Please fix it!
[2000-01-01 00:00:17,234] {providers_manager.py:133} WARNING - Exception when importing 'airflow.providers.google.leveldb.hooks.leveldb.LevelDBHook' from 'apache-airflow-providers-google' package: No module named 'plyvel'
[2000-01-01 00:00:17,235] {providers_manager.py:470} WARNING - The hook connection type 'azure' is registered twice in the package 'apache-airflow-providers-microsoft-azure' with different class names: 'airflow.providers.microsoft.azure.hooks.base_azure.AzureBaseHook' and 'airflow.providers.microsoft.azure.hooks.azure_container_instance.AzureContainerInstanceHook'. Please fix it!
INFO [alembic.runtime.migration] Running upgrade 82b7c48c147f -> 449b4072c2da, Increase size of connection.extra field to handle multiple RSA keys
[2000-01-01 00:00:17,299] {providers_manager.py:470} WARNING - The hook connection type 'google_cloud_platform' is registered twice in the package 'apache-airflow-providers-google' with different class names: 'airflow.providers.google.common.hooks.base_google.GoogleBaseHook' and 'airflow.providers.google.cloud.hooks.bigquery.BigQueryHook'. Please fix it!
INFO [alembic.runtime.migration] Running upgrade 449b4072c2da -> 8646922c8a04, Change default pool_slots to 1
[2000-01-01 00:00:17,300] {providers_manager.py:133} WARNING - Exception when importing 'airflow.providers.google.leveldb.hooks.leveldb.LevelDBHook' from 'apache-airflow-providers-google' package: No module named 'plyvel'
INFO [alembic.runtime.migration] Running upgrade 8646922c8a04 -> 2e42bb497a22, rename last_scheduler_run column
INFO [alembic.runtime.migration] Running upgrade 2e42bb497a22 -> 90d1635d7b86, Increase pool name size in TaskInstance
[2000-01-01 00:00:17,302] {providers_manager.py:470} WARNING - The hook connection type 'azure' is registered twice in the package 'apache-airflow-providers-microsoft-azure' with different class names: 'airflow.providers.microsoft.azure.hooks.base_azure.AzureBaseHook' and 'airflow.providers.microsoft.azure.hooks.azure_container_instance.AzureContainerInstanceHook'. Please fix it!
INFO [alembic.runtime.migration] Running upgrade 90d1635d7b86 -> e165e7455d70, add description field to variable
[2000-01-01 00:00:18,722] {manager.py:788} WARNING - No user yet created, use flask fab command to do it.
INFO [alembic.runtime.migration] Running upgrade e165e7455d70 -> a13f7613ad25, Resource based permissions for default FAB views.
[2000-01-01 00:00:18,722] {manager.py:788} WARNING - No user yet created, use flask fab command to do it.
[2000-01-01 00:00:18,960] {providers_manager.py:470} WARNING - The hook connection type 'google_cloud_platform' is registered twice in the package 'apache-airflow-providers-google' with different class names: 'airflow.providers.google.common.hooks.base_google.GoogleBaseHook' and 'airflow.providers.google.cloud.hooks.bigquery.BigQueryHook'. Please fix it!
[2000-01-01 00:00:18,962] {providers_manager.py:133} WARNING - Exception when importing 'airflow.providerINFO [alembic.runtime.migration] Running upgrade a13f7613ad25 -> 97cdd93827b8, Add queued_at column to dagrun table
INFO [alembic.runtime.migration] Running upgrade 97cdd93827b8 -> 83f031fd9f1c, improve mssql compatibility
INFO [alembic.runtime.migration] Running upgrade 83f031fd9f1c -> e9304a3141f0, make xcom pkey columns non-nullable
INFO [alembic.runtime.migration] Running upgrade e9304a3141f0 -> 30867afad44a, Rename concurrency column in dag table to max_active_tasks
s.google.leveldb.hooks.leveldb.LevelDBHook' from 'apache-airflow-providers-google' package: No module named 'plyvel'
INFO [alembic.runtime.migration] Running upgrade 30867afad44a -> 54bebd308c5f, Add trigger table and task info
INFO [alembic.runtime.migration] Running upgrade 54bebd308c5f -> 142555e44c17, Add data_interval_[start|end] to DagModel and DagRun.
[2000-01-01 00:00:18,963] {providers_manager.py:470} WARNING - The hook connection type 'azure' is registered twice in the package 'apache-airflow-providers-microsoft-azure' with different class names: 'airflow.providers.microsoft.azure.hooks.base_azure.AzureBaseHook' and 'airflow.providers.microsoft.azure.hooks.azure_container_instance.AzureContainerInstanceHook'. Please fix it!
INFO [alembic.runtime.migration] Running upgrade 142555e44c17 -> 7b2661a43ba3, TaskInstance keyed to DagRun
[2000-01-01 00:00:19,024] {providers_manager.py:470} WARNING - The hook connection type 'google_cloud_platform' is registered twice in the package 'apache-airflow-providers-google' with different class names: 'airflow.providers.google.common.hooks.base_google.GoogleBaseHook' and 'airflow.providers.google.cloud.hooks.bigquery.BigQueryHook'. Please fix it!
[2000-01-01 00:00:19,026] {providers_manager.py:133} WARNING - Exception when importing 'airflow.providers.google.leveldb.hooks.leveldb.LevelDBHook' from 'apache-airflow-providers-google' package: No module named 'plyvel'
[2000-01-01 00:00:19,027] {providers_manager.py:470} WARNING - The hook connection type 'azure' is registered twice in the package 'apache-airflow-providers-microsoft-azure' with different class names: 'airflow.providers.microsoft.azure.hooks.base_azure.AzureBaseHook' and 'airflow.providers.microsoft.azure.hooks.azure_container_instance.AzureContainerInstanceHook'. Please fix it!
Using postgresql backend
CREATE TABLE IF NOT EXISTS faketime (
actual_time_of_first_call VARCHAR(255) NOT NULL
);
SELECT * FROM faketime;
Found previous faketime call at 2021-09-12T02:54:19.613468. Offsetting...
[2000-01-01 00:00:19,855] {manager.py:788} WARNING - No user yet created, use flask fab command to do it.
Now: 2021-09-12 02:54:41.913075, First Fake: 2021-09-12 02:54:19.613468
[2000-01-01 00:00:19,855] {manager.py:788} WARNING - No user yet created, use flask fab command to do it.
Faking time: 2000-01-01T00:00:22.299607
[2000-01-01 00:00:20,080] {providers_manager.py:470} WARNING - The hook connection type 'google_cloud_platform' is registered twice in the package 'apache-airflow-providers-google' with different class names: 'airflow.providers.google.common.hooks.base_google.GoogleBaseHook' and 'airflow.providers.google.cloud.hooks.bigquery.BigQueryHook'. Please fix it!
[2000-01-01 00:00:20,082] {providers_manager.py:133} WARNING - Exception when importing 'airflow.providers.google.leveldb.hooks.leveldb.LevelDBHook' from 'apache-airflow-providers-google' package: No module named 'plyvel'
[2000-01-01 00:00:26 +0000] [40] [INFO] Starting gunicorn 20.1.0
[2000-01-01 00:00:26 +0000] [40] [INFO] Listening at: http://0.0.0.0:8793 (40)
[2000-01-01 00:00:26 +0000] [40] [INFO] Using worker: sync
[2000-01-01 00:00:26 +0000] [42] [INFO] Booting worker with pid: 42
[2000-01-01 00:00:26 +0000] [51] [INFO] Booting worker with pid: 51
[2000-01-01 00:00:20,083] {providers_manager.py:470} WARNING - The hook connection type 'azure' is registered twice in the package 'apache-airflow-providers-microsoft-azure' with different class names: 'airflow.providers.microsoft.azure.hooks.base_azure.AzureBaseHook' and 'airflow.providers.microsoft.azure.hooks.azure_container_instance.AzureContainerInstanceHook'. Please fix it!
/usr/local/lib/python3.9/site-packages/urllib3/connection.py:379 SystemTimeWarning: System time is way off (before 2020-07-01). This will probably lead to SSL verification errors
[2000-01-01 00:00:20,141] {providers_manager.py:470} WARNING - The hook connection type 'google_cloud_platform' is registered twice in the package 'apache-airflow-providers-google' with different class names: 'airflow.providers.google.common.hooks.base_google.GoogleBaseHook' and 'airflow.providers.google.cloud.hooks.bigquery.BigQueryHook'. Please fix it!
[2000-01-01 00:00:20,142] {providers_manager.py:133} WARNING - Exception when importing 'airflow.providers.google.leveldb.hooks.leveldb.LevelDBHook' from 'apache-airflow-providers-google' package: No module named 'plyvel'
[2000-01-01 00:00:20,144] {providers_manager.py:470} WARNING - The hook connection type 'azure' is registered twice in the package 'apache-airflow-providers-microsoft-azure' with different class names: 'airflow.providers.microsoft.azure.hooks.base_azure.AzureBaseHook' and 'airflow.providers.microsoft.azure.hooks.azure_container_instance.AzureContainerInstanceHook'. Please fix it!
Upgrades done
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
[2000-01-01 00:00:26,868] {scheduler_job.py:536} INFO - Starting the scheduler
[2000-01-01 00:00:26,868] {scheduler_job.py:541} INFO - Processing each file at most -1 times
[2000-01-01 00:00:27,023] {manager.py:162} INFO - Launched DagFileProcessorManager with pid: 139
[2000-01-01 00:00:27,058] {scheduler_job.py:1036} INFO - Resetting orphaned tasks for active dag runs
[2000-01-01 00:00:27,123] {settings.py:51} INFO - Configured default timezone Timezone('UTC')
[2000-01-01 00:00:41,490] {update_checks.py:126} INFO - Checking for new version of Astronomer Certified Airflow, previous check was performed at None
[2000-01-01 00:00:41,697] {update_checks.py:84} ERROR - Update check died with an exception, trying again in one hour
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/urllib3/connectionpool.py", line 699, in urlopen
httplib_response = self._make_request(
File "/usr/local/lib/python3.9/site-packages/urllib3/connectionpool.py", line 382, in _make_request
self._validate_conn(conn)
File "/usr/local/lib/python3.9/site-packages/urllib3/connectionpool.py", line 1010, in _validate_conn
conn.connect()
File "/usr/local/lib/python3.9/site-packages/urllib3/connection.py", line 411, in connect
self.sock = ssl_wrap_socket(
File "/usr/local/lib/python3.9/site-packages/urllib3/util/ssl_.py", line 449, in ssl_wrap_socket
ssl_sock = _ssl_wrap_socket_impl(
File "/usr/local/lib/python3.9/site-packages/urllib3/util/ssl_.py", line 493, in _ssl_wrap_socket_impl
return ssl_context.wrap_socket(sock, server_hostname=server_hostname)
File "/usr/local/lib/python3.9/ssl.py", line 500, in wrap_socket
return self.sslsocket_class._create(
File "/usr/local/lib/python3.9/ssl.py", line 1040, in _create
self.do_handshake()
File "/usr/local/lib/python3.9/ssl.py", line 1309, in do_handshake
self._sslobj.do_handshake()
ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: certificate is not yet valid (_ssl.c:1129)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/requests/adapters.py", line 439, in send
resp = conn.urlopen(
File "/usr/local/lib/python3.9/site-packages/urllib3/connectionpool.py", line 755, in urlopen
retries = retries.increment(
File "/usr/local/lib/python3.9/site-packages/urllib3/util/retry.py", line 574, in increment
raise MaxRetryError(_pool, url, error or ResponseError(cause))
urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(host='updates.astronomer.io', port=443): Max retries exceeded with url: /astronomer-certified?site=http%3A%2F%2Flocalhost%3A8080 (Caused by SSLError(SSLCertVerificationError(1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: certificate is not yet valid (_ssl.c:1129)')))
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/astronomer/airflow/version_check/update_checks.py", line 79, in run
update_available, wake_up_in = self.check_for_update()
File "/usr/local/lib/python3.9/site-packages/astronomer/airflow/version_check/update_checks.py", line 136, in check_for_update
for release in self._process_update_json(self._get_update_json()):
File "/usr/local/lib/python3.9/site-packages/astronomer/airflow/version_check/update_checks.py", line 207, in _get_update_json
r = requests.get(
File "/usr/local/lib/python3.9/site-packages/requests/api.py", line 75, in get
return request('get', url, params=params, **kwargs)
File "/usr/local/lib/python3.9/site-packages/requests/api.py", line 61, in request
return session.request(method=method, url=url, **kwargs)
File "/usr/local/lib/python3.9/site-packages/requests/sessions.py", line 542, in request
resp = self.send(prep, **send_kwargs)
File "/usr/local/lib/python3.9/site-packages/requests/sessions.py", line 655, in send
r = adapter.send(request, **kwargs)
File "/usr/local/lib/python3.9/site-packages/requests/adapters.py", line 514, in send
raise SSLError(e, request=request)
requests.exceptions.SSLError: HTTPSConnectionPool(host='updates.astronomer.io', port=443): Max retries exceeded with url: /astronomer-certified?site=http%3A%2F%2Flocalhost%3A8080 (Caused by SSLError(SSLCertVerificationError(1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: certificate is not yet valid (_ssl.c:1129)')))
[2000-01-01 00:01:49,937] {dag.py:2709} INFO - Setting next_dagrun for each_two_wait_one_sync to 2000-01-01T00:00:00+00:00
[2000-01-01 00:01:49,981] {scheduler_job.py:279} INFO - 1 tasks up for execution:
<TaskInstance: each_two_wait_one_sync.before scheduled__1999-12-31T23:58:00+00:00 [scheduled]>
[2000-01-01 00:01:49,983] {scheduler_job.py:308} INFO - Figuring out tasks to run in Pool(name=default_pool) with 128 open slots and 1 task instances ready to be queued
[2000-01-01 00:01:49,984] {scheduler_job.py:336} INFO - DAG each_two_wait_one_sync has 0/16 running and queued tasks
[2000-01-01 00:01:49,984] {scheduler_job.py:401} INFO - Setting the following tasks to queued state:
<TaskInstance: each_two_wait_one_sync.before scheduled__1999-12-31T23:58:00+00:00 [scheduled]>
[2000-01-01 00:01:49,986] {scheduler_job.py:433} INFO - Sending TaskInstanceKey(dag_id='each_two_wait_one_sync', task_id='before', run_id='scheduled__1999-12-31T23:58:00+00:00', try_number=1) to executor with priority 3 and queue default
[2000-01-01 00:01:49,986] {base_executor.py:82} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'each_two_wait_one_sync', 'before', 'scheduled__1999-12-31T23:58:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/each_two_wait_one_sync.py']
[2000-01-01 00:01:49,989] {local_executor.py:79} INFO - QueuedLocalWorker running ['airflow', 'tasks', 'run', 'each_two_wait_one_sync', 'before', 'scheduled__1999-12-31T23:58:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/each_two_wait_one_sync.py']
[2000-01-01 00:01:50,017] {dag.py:2709} INFO - Setting next_dagrun for each_two_wait_one_sync to 2000-01-01T00:02:00+00:00
[2000-01-01 00:01:50,163] {scheduler_job.py:279} INFO - 1 tasks up for execution:
<TaskInstance: each_two_wait_one_sync.before scheduled__2000-01-01T00:00:00+00:00 [scheduled]>
[2000-01-01 00:01:50,166] {scheduler_job.py:308} INFO - Figuring out tasks to run in Pool(name=default_pool) with 127 open slots and 1 task instances ready to be queued
[2000-01-01 00:01:50,166] {scheduler_job.py:336} INFO - DAG each_two_wait_one_sync has 1/16 running and queued tasks
[2000-01-01 00:01:50,167] {scheduler_job.py:401} INFO - Setting the following tasks to queued state:
<TaskInstance: each_two_wait_one_sync.before scheduled__2000-01-01T00:00:00+00:00 [scheduled]>
[2000-01-01 00:01:50,169] {scheduler_job.py:433} INFO - Sending TaskInstanceKey(dag_id='each_two_wait_one_sync', task_id='before', run_id='scheduled__2000-01-01T00:00:00+00:00', try_number=1) to executor with priority 3 and queue default
[2000-01-01 00:01:50,169] {base_executor.py:82} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'each_two_wait_one_sync', 'before', 'scheduled__2000-01-01T00:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/each_two_wait_one_sync.py']
[2000-01-01 00:01:50,173] {local_executor.py:79} INFO - QueuedLocalWorker running ['airflow', 'tasks', 'run', 'each_two_wait_one_sync', 'before', 'scheduled__2000-01-01T00:00:00+00:00', '--local', '--subdir', 'DAGS_FOLDER/each_two_wait_one_sync.py']
[2000-01-01 00:01:50,202] {dag.py:2709} INFO - Setting next_dagrun for each_two_wait_one_sync to 2000-01-01T00:04:00+00:00
[2000-01-01 00:01:50,217] {dagbag.py:492} INFO - Filling up the DagBag from /usr/local/airflow/dags/each_two_wait_one_sync.py
[2000-01-01 00:01:50,225] {scheduler_job.py:957} ERROR - Execution date is in future: 2000-01-01 00:02:00+00:00
[2000-01-01 00:01:50,398] {dagbag.py:492} INFO - Filling up the DagBag from /usr/local/airflow/dags/each_two_wait_one_sync.py
Running <TaskInstance: each_two_wait_one_sync.before scheduled__1999-12-31T23:58:00+00:00 [queued]> on host f5f34d634fc6
Running <TaskInstance: each_two_wait_one_sync.before scheduled__2000-01-01T00:00:00+00:00 [queued]> on host f5f34d634fc6
[2000-01-01 00:01:51,292] {dag.py:2709} INFO - Setting next_dagrun for each_two_wait_one_sync to 2000-01-01T00:06:00+00:00
[2000-01-01 00:01:51,308] {scheduler_job.py:957} ERROR - Execution date is in future: 2000-01-01 00:02:00+00:00
[2000-01-01 00:01:51,309] {scheduler_job.py:957} ERROR - Execution date is in future: 2000-01-01 00:04:00+00:00
[2000-01-01 00:01:51,339] {scheduler_job.py:279} INFO - 2 tasks up for execution:
<TaskInstance: each_two_wait_one_sync.wait_exec_plus_one scheduled__1999-12-31T23:58:00+00:00 [scheduled]>
<TaskInstance: each_two_wait_one_sync.wait_exec_plus_one scheduled__2000-01-01T00:00:00+00:00 [scheduled]>
[2000-01-01 00:01:51,341] {scheduler_job.py:308} INFO - Figuring out tasks to run in Pool(name=default_pool) with 128 open slots and 2 task instances ready to be queued
[2000-01-01 00:01:51,342] {scheduler_job.py:336} INFO - DAG each_two_wait_one_sync has 0/16 running and queued tasks
[2000-01-01 00:01:51,342] {scheduler_job.py:336} INFO - DAG each_two_wait_one_sync has 1/16 running and queued tasks
[2000-01-01 00:01:51,342] {scheduler_job.py:401} INFO - Setting the following tasks to queued state:
<TaskInstance: each_two_wait_one_sync.wait_exec_plus_one scheduled__1999-12-31T23:58:00+00:00 [scheduled]>
<TaskInstance: each_two_wait_one_sync.wait_exec_plus_one scheduled__2000-01-01T00:00:00+00:00 [scheduled]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment