Last active
January 18, 2021 12:12
-
-
Save xnuinside/3bbaaf9a406a064b0054f32c105bb5d0 to your computer and use it in GitHub Desktop.
Apache Airflow: Check Table Exist and get schema name with Python callable and PostgresHook
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime | |
from airflow.hooks.postgres_hook import PostgresHook | |
from airflow.operators.python_operator import PythonOperator | |
from airflow import DAG | |
with DAG(dag_id="postgres_check_table", start_date=datetime(2018, 10, 12)) as dag: | |
def check_table_exist(sql_to_get_schema, sql_to_check_table_exist, | |
table_name): | |
""" callable function to get schema name and after that check if table exist """ | |
hook = PostgresHook() | |
# get schema name | |
query = hook.get_records(sql=sql_to_get_schema) | |
for result in query: | |
if 'airflow' in result: | |
schema = result[0] | |
print(schema) | |
break | |
# check table exist | |
query = hook.get_first(sql=sql_to_check_table_exist.format(schema, table_name)) | |
print(query) | |
if query: | |
return True | |
else: | |
raise ValueError("table {} does not exist".format(table_name)) | |
# will success | |
table_name_success = "dag" | |
get_pg_table = PythonOperator(task_id="check_table_success", python_callable=check_table_exist, | |
op_args=["SELECT * FROM pg_tables;", | |
"SELECT * FROM information_schema.tables " | |
"WHERE table_schema = '{}'" | |
"AND table_name = '{}';", table_name_success]) | |
# will fails | |
table_name_fail = "rock" | |
get_rock_pg_table = PythonOperator(task_id="check_table_fail", | |
python_callable=check_table_exist, | |
op_args=["SELECT * FROM pg_tables;", | |
"SELECT * FROM information_schema.tables " | |
"WHERE table_schema = 'schema_name' " | |
"AND table_name = '{}';", table_name_fail]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment