Created
June 24, 2021 15:49
-
-
Save revolutionisme/8246c6e2fa7d8abb7f6c393cb5218595 to your computer and use it in GitHub Desktop.
Query the Airflow metadata db from within a python operator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def set_task_status(**kwargs): | |
dag_id = kwargs["dag_id"] | |
task_id = kwargs["task_id"] | |
start_date = kwargs["start_date"] | |
end_date = kwargs["end_date"] | |
session = settings.Session() | |
print("session: ", str(session)) | |
# stmt = f"UPDATE task_instance SET state = 'success', try_number = 0 WHERE task_id = '{task_id}' AND dag_id = '{dag_id}' AND execution_date = '{start_date}';" | |
stmt = "Select * from task_instance;" | |
result = session.execute(stmt) | |
print(f"Attributes - {result._metadata.keys}") | |
for row in result: | |
print(f"Result from task instance - {row}") | |
PythonOperator( | |
task_id="backfill", | |
python_callable=set_task_status, | |
dag=dag, | |
op_kwargs={ | |
"start_date": '{{ dag_run.conf["start_date"] if dag_run else "" }}', | |
"end_date": '{{ dag_run.conf["end_date"] if dag_run else "" }}', | |
"dag_id": '{{ dag_run.conf["dag_id"] if dag_run else "" }}', | |
"task_id": '{{ dag_run.conf["task_id"] if dag_run and "task_id" in dag_run.conf else None }}', | |
}, | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment