Skip to content

Instantly share code, notes, and snippets.

@rukshn
Last active January 16, 2023 08:30
Show Gist options
  • Save rukshn/89a423fff04e06aec4bece2355e0af8a to your computer and use it in GitHub Desktop.
Save rukshn/89a423fff04e06aec4bece2355e0af8a to your computer and use it in GitHub Desktop.
airflow_default_dag
import datetime
import pendulum
import os
import requests
from airflow.decorators import dag, task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.postgres.operators.postgres import PostgresOperator
@dag(
dag_id="process-employees",
schedule_interval="0 0 * * *",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
dagrun_timeout=datetime.timedelta(minutes=60),
)
def ProcessEmployees():
create_employees_table = PostgresOperator(
task_id="create_employees_table",
postgres_conn_id="tutorial_pg_conn",
sql="""
CREATE TABLE IF NOT EXISTS employees (
"Serial Number" NUMERIC PRIMARY KEY,
"Company Name" TEXT,
"Employee Markme" TEXT,
"Description" TEXT,
"Leave" INTEGER
);""",
)
create_employees_temp_table = PostgresOperator(
task_id="create_employees_temp_table",
postgres_conn_id="tutorial_pg_conn",
sql="""
DROP TABLE IF EXISTS employees_temp;
CREATE TABLE employees_temp (
"Serial Number" NUMERIC PRIMARY KEY,
"Company Name" TEXT,
"Employee Markme" TEXT,
"Description" TEXT,
"Leave" INTEGER
);""",
)
@task
def get_data():
# NOTE: configure this as appropriate for your airflow environment
data_path = "/opt/airflow/dags/files/employees.csv"
os.makedirs(os.path.dirname(data_path), exist_ok=True)
url = "https://raw.githubusercontent.com/apache/airflow/main/docs/apache-airflow/tutorial/pipeline_example.csv"
response = requests.request("GET", url)
with open(data_path, "w") as file:
file.write(response.text)
postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
conn = postgres_hook.get_conn()
cur = conn.cursor()
with open(data_path, "r") as file:
cur.copy_expert(
"COPY employees_temp FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'",
file,
)
conn.commit()
@task
def merge_data():
query = """
INSERT INTO employees
SELECT *
FROM (
SELECT DISTINCT *
FROM employees_temp
)
ON CONFLICT ("Serial Number") DO UPDATE
SET "Serial Number" = excluded."Serial Number";
"""
try:
postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
conn = postgres_hook.get_conn()
cur = conn.cursor()
cur.execute(query)
conn.commit()
return 0
except Exception as e:
return 1
[create_employees_table, create_employees_temp_table] >> get_data() >> merge_data()
dag = ProcessEmployees()
import airflow
from datetime import timedelta
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from airflow.utils.dates import days_ago
args={'owner': 'airflow'}
default_args = {
'owner': 'airflow',
#'start_date': airflow.utils.dates.days_ago(2),
# 'end_date': datetime(),
# 'depends_on_past': False,
#'email': ['[email protected]'],
#'email_on_failure': False,
# 'email_on_retry': False,
# If a task fails, retry it once after waiting
# at least 5 minutes
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag_psql = DAG(
dag_id = "postgresoperator_demo",
default_args=args,
# schedule_interval='0 0 * * *',
schedule_interval='@once',
dagrun_timeout=timedelta(minutes=60),
description='use case of psql operator in airflow',
start_date = airflow.utils.dates.days_ago(1)
)
create_table_sql_query = """
CREATE TABLE employee (id INT NOT NULL, name VARCHAR(250) NOT NULL, dept VARCHAR(250) NOT NULL);
"""
insert_data_sql_query = """
insert into employee (id, name, dept) values(1, 'vamshi','bigdata'),(2, 'divya','bigdata'),(3, 'binny','projectmanager'),
(4, 'omair','projectmanager') ;"""
create_table = PostgresOperator(
sql = create_table_sql_query,
task_id = "create_table_task",
postgres_conn_id = "postgres_local",
dag = dag_psql
)
insert_data = PostgresOperator(
sql = insert_data_sql_query,
task_id = "insert_data_task",
postgres_conn_id = "postgres_local",
dag = dag_psql
)
create_table >> insert_data
if __name__ == "__main__":
dag_psql.cli()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment