Last active
January 16, 2023 08:30
-
-
Save rukshn/89a423fff04e06aec4bece2355e0af8a to your computer and use it in GitHub Desktop.
airflow_default_dag
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import pendulum | |
import os | |
import requests | |
from airflow.decorators import dag, task | |
from airflow.providers.postgres.hooks.postgres import PostgresHook | |
from airflow.providers.postgres.operators.postgres import PostgresOperator | |
@dag( | |
dag_id="process-employees", | |
schedule_interval="0 0 * * *", | |
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), | |
catchup=False, | |
dagrun_timeout=datetime.timedelta(minutes=60), | |
) | |
def ProcessEmployees(): | |
create_employees_table = PostgresOperator( | |
task_id="create_employees_table", | |
postgres_conn_id="tutorial_pg_conn", | |
sql=""" | |
CREATE TABLE IF NOT EXISTS employees ( | |
"Serial Number" NUMERIC PRIMARY KEY, | |
"Company Name" TEXT, | |
"Employee Markme" TEXT, | |
"Description" TEXT, | |
"Leave" INTEGER | |
);""", | |
) | |
create_employees_temp_table = PostgresOperator( | |
task_id="create_employees_temp_table", | |
postgres_conn_id="tutorial_pg_conn", | |
sql=""" | |
DROP TABLE IF EXISTS employees_temp; | |
CREATE TABLE employees_temp ( | |
"Serial Number" NUMERIC PRIMARY KEY, | |
"Company Name" TEXT, | |
"Employee Markme" TEXT, | |
"Description" TEXT, | |
"Leave" INTEGER | |
);""", | |
) | |
@task | |
def get_data(): | |
# NOTE: configure this as appropriate for your airflow environment | |
data_path = "/opt/airflow/dags/files/employees.csv" | |
os.makedirs(os.path.dirname(data_path), exist_ok=True) | |
url = "https://raw.githubusercontent.com/apache/airflow/main/docs/apache-airflow/tutorial/pipeline_example.csv" | |
response = requests.request("GET", url) | |
with open(data_path, "w") as file: | |
file.write(response.text) | |
postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn") | |
conn = postgres_hook.get_conn() | |
cur = conn.cursor() | |
with open(data_path, "r") as file: | |
cur.copy_expert( | |
"COPY employees_temp FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'", | |
file, | |
) | |
conn.commit() | |
@task | |
def merge_data(): | |
query = """ | |
INSERT INTO employees | |
SELECT * | |
FROM ( | |
SELECT DISTINCT * | |
FROM employees_temp | |
) | |
ON CONFLICT ("Serial Number") DO UPDATE | |
SET "Serial Number" = excluded."Serial Number"; | |
""" | |
try: | |
postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn") | |
conn = postgres_hook.get_conn() | |
cur = conn.cursor() | |
cur.execute(query) | |
conn.commit() | |
return 0 | |
except Exception as e: | |
return 1 | |
[create_employees_table, create_employees_temp_table] >> get_data() >> merge_data() | |
dag = ProcessEmployees() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import airflow | |
from datetime import timedelta | |
from airflow import DAG | |
from airflow.operators.postgres_operator import PostgresOperator | |
from airflow.utils.dates import days_ago | |
args={'owner': 'airflow'} | |
default_args = { | |
'owner': 'airflow', | |
#'start_date': airflow.utils.dates.days_ago(2), | |
# 'end_date': datetime(), | |
# 'depends_on_past': False, | |
#'email': ['[email protected]'], | |
#'email_on_failure': False, | |
# 'email_on_retry': False, | |
# If a task fails, retry it once after waiting | |
# at least 5 minutes | |
'retries': 1, | |
'retry_delay': timedelta(minutes=5), | |
} | |
dag_psql = DAG( | |
dag_id = "postgresoperator_demo", | |
default_args=args, | |
# schedule_interval='0 0 * * *', | |
schedule_interval='@once', | |
dagrun_timeout=timedelta(minutes=60), | |
description='use case of psql operator in airflow', | |
start_date = airflow.utils.dates.days_ago(1) | |
) | |
create_table_sql_query = """ | |
CREATE TABLE employee (id INT NOT NULL, name VARCHAR(250) NOT NULL, dept VARCHAR(250) NOT NULL); | |
""" | |
insert_data_sql_query = """ | |
insert into employee (id, name, dept) values(1, 'vamshi','bigdata'),(2, 'divya','bigdata'),(3, 'binny','projectmanager'), | |
(4, 'omair','projectmanager') ;""" | |
create_table = PostgresOperator( | |
sql = create_table_sql_query, | |
task_id = "create_table_task", | |
postgres_conn_id = "postgres_local", | |
dag = dag_psql | |
) | |
insert_data = PostgresOperator( | |
sql = insert_data_sql_query, | |
task_id = "insert_data_task", | |
postgres_conn_id = "postgres_local", | |
dag = dag_psql | |
) | |
create_table >> insert_data | |
if __name__ == "__main__": | |
dag_psql.cli() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment