Skip to content

Instantly share code, notes, and snippets.

@siteslave
Last active November 2, 2022 04:40
Show Gist options
  • Save siteslave/bddff767151d85bf46039c977cbbd44c to your computer and use it in GitHub Desktop.
Save siteslave/bddff767151d85bf46039c977cbbd44c to your computer and use it in GitHub Desktop.
Airflow Tutorial@MOPH

Ceate directories:

mkdir -p ./dags ./logs ./plugins

Create .env

echo -e "AIRFLOW_UID=$(id -u)" > .env

docker-compose.yaml

---
version: '3'
x-airflow-common:
  &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.4.2}
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: LocalExecutor
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: mysql+mysqldb://airflow:[email protected]:3306/airflow_db
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DEFAULT_TIMEZONE: 'Asia/Bangkok'
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
    ARIFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'false'
    AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth'
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
  extra_hosts: 
    - "host.docker.internal:host-gateway"
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
  user: "${AIRFLOW_UID:-50000}:0"

services:
  
  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - 8080:8080
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

  airflow-triggerer:
    <<: *airflow-common
    command: triggerer
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

  airflow-init:
    <<: *airflow-common
    entrypoint: /bin/bash
    # yamllint disable rule:line-length
    command:
      - -c
      - |
        function ver() {
          printf "%04d%04d%04d%04d" $${1//./ }
        }
        airflow_version=$$(AIRFLOW__LOGGING__LOGGING_LEVEL=INFO && gosu airflow airflow version)
        airflow_version_comparable=$$(ver $${airflow_version})
        min_airflow_version=2.2.0
        min_airflow_version_comparable=$$(ver $${min_airflow_version})
        if (( airflow_version_comparable < min_airflow_version_comparable )); then
          echo
          echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m"
          echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!"
          echo
          exit 1
        fi
        if [[ -z "${AIRFLOW_UID}" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
          echo "If you are on Linux, you SHOULD follow the instructions below to set "
          echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
          echo "For other operating systems you can get rid of the warning with manually created .env file:"
          echo "    See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user"
          echo
        fi
        one_meg=1048576
        mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
        cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
        disk_available=$$(df / | tail -1 | awk '{print $$4}')
        warning_resources="false"
        if (( mem_available < 4000 )) ; then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
          echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
          echo
          warning_resources="true"
        fi
        if (( cpus_available < 2 )); then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
          echo "At least 2 CPUs recommended. You have $${cpus_available}"
          echo
          warning_resources="true"
        fi
        if (( disk_available < one_meg * 10 )); then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
          echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
          echo
          warning_resources="true"
        fi
        if [[ $${warning_resources} == "true" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
          echo "Please follow the instructions to increase amount of resources available:"
          echo "   https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"
          echo
        fi
        mkdir -p /sources/logs /sources/dags /sources/plugins
        chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
        exec /entrypoint airflow version
    # yamllint enable rule:line-length
    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_UPGRADE: 'true'
      _AIRFLOW_WWW_USER_CREATE: 'true'
      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
      _PIP_ADDITIONAL_REQUIREMENTS: ''
    user: "0:0"
    volumes:
      - .:/sources

@siteslave
Copy link
Author

.env

AIRFLOW_UID=197609
AIRFLOW_IMAGE_NAME=siteslave/airflow-tutorial
_AIRFLOW_WWW_USER_USERNAME=admin
_AIRFLOW_WWW_USER_PASSWORD=admin

@siteslave
Copy link
Author

Dockerfile

FROM apache/airflow:2.4.2
RUN pip install --no-cache-dir apache-airflow-providers-http apache-airflow-providers-mysql apache-airflow-providers-postgres apache-airflow-providers-amazon apache-airflow-providers-oracle apache-airflow-providers-microsoft-mssql pandas minio 

@siteslave
Copy link
Author

MySQL

CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE USER 'airflow' IDENTIFIED BY 'airflow';
GRANT ALL PRIVILEGES ON airflow_db.* TO 'airflow';

Update connection

mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>

@siteslave
Copy link
Author

Start container

docker-compose up airflow-init

docker-compose down --volumes --remove-orphans

docker-compose up

docker-compose down --volumes --rmi all

@siteslave
Copy link
Author

01_first_step.py

from datetime import timedelta
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator

import pendulum

default_args = {
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    "owner": "satit"
}

with DAG(
    dag_id="01_first_step",
    start_date=pendulum.datetime(2022, 10, 26, tz="Asia/Bangkok"),
    schedule=None,
    catchup=False,
    tags=["tutorial"],
    default_args=default_args
) as dag:

    def show_name():
        print("Hello.")

    task1 = EmptyOperator(task_id="task1")
    task2 = EmptyOperator(task_id="task2")
    task3 = PythonOperator(task_id="task3", python_callable=show_name)

task1 >> [task2, task3]

@siteslave
Copy link
Author

from datetime import timedelta
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator

import pendulum

default_args = {
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    "owner": "satit"
}

with DAG(
    dag_id="01_first_step",
    start_date=pendulum.datetime(2022, 10, 26, tz="Asia/Bangkok"),
    schedule=None,
    catchup=False,
    tags=["tutorial"],
    default_args=default_args
) as dag:

    def show_name():
        print("Hello.")

    task1 = EmptyOperator(task_id="task1")
    task2 = EmptyOperator(task_id="task2")
    task3 = PythonOperator(task_id="task3", python_callable=show_name)
    task4 = EmptyOperator(task_id="task4")
    task5 = EmptyOperator(task_id="task5")
    success = EmptyOperator(task_id="success")
    failed = EmptyOperator(task_id="failed")

task1 >> [task2, task3] >> task4 >> task5 >> [success, failed]

@siteslave
Copy link
Author

from datetime import timedelta
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup

import pendulum

default_args = {
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    "owner": "satit"
}

with DAG(
    dag_id="01_first_step",
    start_date=pendulum.datetime(2022, 10, 26, tz="Asia/Bangkok"),
    schedule="@daily",
    catchup=True,
    tags=["tutorial"],
    default_args=default_args
) as dag:

    def show_name():
        print("Hello.")

    start = EmptyOperator(task_id="start")
    
    with TaskGroup(group_id='group1') as group1:
      task1 = EmptyOperator(task_id='task1')
      task2 = EmptyOperator(task_id='task2')
      [task1, task2]
    
    with TaskGroup(group_id='group2') as group2:
      task1 = EmptyOperator(task_id='task1')
      task2 = EmptyOperator(task_id='task2')
      [task1, task2]
      
    task3 = PythonOperator(task_id="task3", python_callable=show_name)
    task4 = EmptyOperator(task_id="task4")
    
    end = EmptyOperator(task_id="end")

start >> [group1, group2]
group1 >> task3
group2 >> task4

task3 >> end
task4 >> end

@siteslave
Copy link
Author

siteslave commented Oct 31, 2022

from datetime import timedelta
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.utils.trigger_rule import TriggerRule

import pendulum

default_args = {
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    "owner": "satit"
}

with DAG(
    dag_id="02_call_api",
    start_date=pendulum.datetime(2022, 10, 31, tz="Asia/Bangkok"),
    schedule="*/1 * * * *",
    catchup=False,
    tags=["tutorial"],
    default_args=default_args
) as dag:
  
  def get_data():
    import requests
    import os
    import json
    
    
    try:
      os.makedirs('./output', exist_ok=True)

      url = "https://h4u.moph.go.th/pcd/closet-station"

      querystring = {"lng":"90.124241","lat":"10.432141234"}

      payload = ""
      response = requests.request("GET", url, data=payload, params=querystring)

      # print(response.text)
      
      json_data = response.json()
      
      data = json_data["rows"][0]

      file_name = "air.json"
      outfile_path = os.path.join('./output', file_name)

      with open(outfile_path, 'w') as outputfile:
        json.dump(data, outputfile)
    except Exception:
      print(Exception)
  
  def load_data():
    
    import os
    import json
    
    hook = MySqlHook(mysql_conn_id="mysql-air")
    
    file_name = 'air.json'
    input_path = os.path.join('./output', file_name)

    with open(input_path, 'r') as inputfile:
      items = json.load(inputfile)
    
    d_update = f'{items["LastUpdate"]["date"]} {items["LastUpdate"]["time"]}'
    pm25 = items["LastUpdate"]["PM25"]["value"]
    pm10 = items["LastUpdate"]["PM10"]["value"]
    o3 = items["LastUpdate"]["O3"]["value"]
    co = items["LastUpdate"]["CO"]["value"]
    aqi = items["LastUpdate"]["AQI"]["aqi"]
    level = items["LastUpdate"]["PM25"]["level"]
    color = items["LastUpdate"]["PM25"]["color"]
    
    params = (pm25, pm10, o3, co, aqi, d_update, level, color)
    
    sql = """
    INSERT INTO `data` (pm25, pm10, o3, co, aqi, d_update, `level`, color) VALUES(%s, %s, %s, %s, %s, %s, %s, %s);
    """
    
    print(sql)
  
    hook.run(sql=sql, parameters=params)
  
  
  start = EmptyOperator(task_id="start")
  _get_data = PythonOperator(task_id="get_data", python_callable=get_data)
  _load_data = PythonOperator(task_id="load_data", python_callable=load_data)
  
  _success = EmptyOperator(task_id="success")
  _failed = EmptyOperator(task_id="failed", trigger_rule=TriggerRule.ALL_FAILED)
  
start >> _get_data >> _load_data >> [_success, _failed]

@siteslave
Copy link
Author

from datetime import timedelta
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.models.variable import Variable
from datetime import datetime
import os
from pandas import DataFrame

import pendulum

default_args = {
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    "owner": "satit"
}

with DAG(
    dag_id="03_mysql_to_s3",
    start_date=pendulum.datetime(2022, 11, 1, tz="Asia/Bangkok"),
    schedule=None,
    catchup=False,
    tags=["s3"],
    default_args=default_args
) as dag:
    
    def _get_data(ti):
        
        hook = MySqlHook(mysql_conn_id="mysql-air")
  
        level = "ดีมาก"
        
        sql = f"""
        SELECT pm25, pm10, o3, co, aqi, d_update, `level`, color, updated 
        FROM `data`
        WHERE level='{level}';
        """
        
        df: DataFrame = hook.get_pandas_df(sql=sql)
        
        now = datetime.now()
        str_export_path = now.strftime("%Y%m%d")
        
        export_path = f"./export/{str_export_path}"
        
        ti.xcom_push(key="export_path", value=export_path)
        
        os.makedirs(export_path, exist_ok=True)
        
        csv_path = f"{export_path}/air.csv"
        
        df.to_csv(csv_path, index=False)
    
    def _create_zip(ti):
        
        from zipfile import ZipFile
        from os.path import basename
        
        now = datetime.now()
        str_time = now.strftime("%Y%m%d%H%M%S")
        
        export_path = ti.xcom_pull(task_ids="get_data", key="export_path")

        csv_path = f"{export_path}/air.csv"
        
        zip_path = f"./files/satit-air-{str_time}.zip"
        
        ti.xcom_push(key="zip_file_path", value=zip_path)
        
        with ZipFile(zip_path, 'w') as myzip:
            myzip.write(csv_path, basename(csv_path))
    
    def _upload_to_s3(ti):
        from minio import Minio
        
        endpoint = Variable.get("minio_endpoint", "203.157.102.60:39000")
        access_key = Variable.get("minio_accesskey")
        secret_key = Variable.get("minio_secretkey")
        
        client = Minio(endpoint=endpoint, access_key=access_key, secret_key=secret_key, secure=False)
        
        zip_path = ti.xcom_pull(task_ids="create_zip", key="zip_file_path")
        # zip_path = f"./files/satit-air-20221101041420.zip"
        
        client.fput_object(bucket_name="airflow", object_name=os.path.basename(zip_path), file_path=zip_path)

    start = EmptyOperator(task_id="start")
    
    get_data = PythonOperator(task_id="get_data", python_callable=_get_data)
    create_zip = PythonOperator(task_id="create_zip", python_callable=_create_zip)
    upload_to_s3 = PythonOperator(task_id="upload_to_s3", python_callable=_upload_to_s3)
    
start >> get_data >> create_zip >> upload_to_s3
  

@siteslave
Copy link
Author

from datetime import timedelta
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from airflow.utils.trigger_rule import TriggerRule

from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.models.variable import Variable
from datetime import datetime
import os
from pandas import DataFrame

import pendulum

default_args = {
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
    "owner": "satit"
}

with DAG(
    dag_id="03_mysql_to_s3",
    start_date=pendulum.datetime(2022, 11, 1, tz="Asia/Bangkok"),
    schedule=None,
    catchup=False,
    tags=["s3"],
    default_args=default_args
) as dag:
    
    def _get_data1(ti):
        hook = MySqlHook(mysql_conn_id="mysql-air")
        
        sql = f"""
        SELECT pm25, pm10, o3, co, aqi, d_update, `level`, color, updated 
        FROM `data`;
        """
        
        df: DataFrame = hook.get_pandas_df(sql=sql)
    
        export_path = ti.xcom_pull(task_ids="start", key="export_path")
        
        os.makedirs(export_path, exist_ok=True)
        
        csv_path = f"{export_path}/data1.csv"
        
        df.to_csv(csv_path, index=False)
    
    def _get_data2(ti):
        
        hook = MySqlHook(mysql_conn_id="mysql-air")
  
        level = "ดีมาก"
        
        sql = f"""
        SELECT pm25, pm10, o3, co, aqi, d_update, `level`, color, updated 
        FROM `data`
        WHERE level='{level}';
        """
        
        df: DataFrame = hook.get_pandas_df(sql=sql)
    
        export_path = ti.xcom_pull(task_ids="start", key="export_path")
        
        os.makedirs(export_path, exist_ok=True)
        
        csv_path = f"{export_path}/data2.csv"
        
        df.to_csv(csv_path, index=False)
    
    def _create_zip(ti):
        
        from zipfile import ZipFile
        from os.path import abspath, basename
        
        now = datetime.now()
        str_time = now.strftime("%Y%m%d%H%M%S")
        
        export_path = ti.xcom_pull(task_ids="start", key="export_path")
        
        zip_path = f"./files/satit-air-{str_time}.zip"
        
        print(zip_path)
        
        ti.xcom_push(key="zip_file_path", value=zip_path)
        
        files = []
        
        for file in os.listdir(export_path):
            if (file.endswith(".csv")):
                csv_file = os.path.join(export_path, basename(file))
                files.append(csv_file)
        
        with ZipFile(zip_path, 'w') as myzip:
            for f in files:
                print(f)
                myzip.write(f, basename(f))
    
    def _upload_to_s3(ti):
        from minio import Minio
        
        endpoint = Variable.get("minio_endpoint", "203.157.102.60:39000")
        access_key = Variable.get("minio_accesskeyx")
        secret_key = Variable.get("minio_secretkey")
        
        client = Minio(endpoint=endpoint, access_key=access_key, secret_key=secret_key, secure=False)
        
        zip_path = ti.xcom_pull(task_ids="create_zip", key="zip_file_path")
        # zip_path = f"./files/satit-air-20221101041420.zip"
        
        client.fput_object(bucket_name="airflow", object_name=os.path.basename(zip_path), file_path=zip_path)

    def _start(ti):
        now = datetime.now()
        str_time = now.strftime("%Y%m%d%H%M%S")
        export_path = os.path.join("export", str_time)
        
        ti.xcom_push(key="export_path", value=export_path)
        
    start = PythonOperator(task_id="start", python_callable=_start)
    
    with TaskGroup(group_id='export_data') as group1:
        get_data1 = PythonOperator(task_id='get_data1', python_callable=_get_data1)
        get_data2 = PythonOperator(task_id='get_data2', python_callable=_get_data2)
        
        [get_data1, get_data2]
      
    create_zip = PythonOperator(task_id="create_zip", python_callable=_create_zip)
    upload_to_s3 = PythonOperator(task_id="upload_to_s3", python_callable=_upload_to_s3)
    
    success = EmptyOperator(task_id="success")
    failed = EmptyOperator(task_id="failed", trigger_rule=TriggerRule.ONE_FAILED)
    
start >> group1 >> create_zip >> upload_to_s3 >> [success, failed]
create_zip >> failed
  

@siteslave
Copy link
Author

siteslave commented Nov 1, 2022

from datetime import timedelta, datetime
import os
from pathlib import Path
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from airflow.models.variable import Variable
from airflow.providers.mysql.hooks.mysql import MySqlHook

import pendulum
import requests

default_args = {
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    "owner": "satit"
}

with DAG(
    dag_id="05_s3_to_mysql",
    start_date=pendulum.datetime(2022, 10, 26, tz="Asia/Bangkok"),
    schedule=None,
    catchup=True,
    tags=["s3"],
    default_args=default_args
) as dag:
  
    def _start(ti):
        now = datetime.now()
        str_date = now.strftime("%Y%m%d")
        download_path = os.path.join("./tmp/download", str_date)
        
        ti.xcom_push(key="download_path", value=download_path)
        
        os.makedirs(download_path, exist_ok=True)
    
    def _download(ti):
        
        from minio import Minio
        
        download_path = ti.xcom_pull(task_ids="start", key="download_path")
        
        endpoint = Variable.get("minio_endpoint", "203.157.102.60:39000")
        access_key = Variable.get("minio_accesskey")
        secret_key = Variable.get("minio_secretkey")
        
        client = Minio(endpoint=endpoint, access_key=access_key, secret_key=secret_key, secure=False)

        buckets = client.list_objects(bucket_name="airflow")
        
        for bucket in buckets:
            file_name = bucket.object_name
            
            file_path = os.path.join(download_path, file_name)
            
            client.fget_object(bucket_name="airflow", object_name=file_name, file_path=file_path)
    
        
    def _extract(ti):
        
        from zipfile import ZipFile
        import string
        import random
        
        download_path = ti.xcom_pull(task_ids="start", key="download_path")
        extract_path = os.path.join("./tmp", "extract")
        
        ti.xcom_push(key="extract_path", value=extract_path)
        
        for file in os.listdir(download_path):
            if file.endswith(".zip") and file.startswith("gw_"):
                zip_file = os.path.join(download_path, os.path.basename(file))
                # print(zip_file)
                with ZipFile(zip_file, 'r') as myzip:
                    letters = string.ascii_lowercase
                    str_random = ''.join(random.choice(letters) for i in range(10))
                    tmp_path = os.path.join(extract_path, str_random)
                    myzip.extractall(tmp_path)
        
    def _load(ti):
        
        hook = MySqlHook(mysql_conn_id="mysql-air")
        
        extract_path = ti.xcom_pull(task_ids="extract", key="extract_path")
        
        csv_files = list(Path(extract_path).rglob("*.csv"))
        
        for file in csv_files:
            file_name = os.path.basename(file).lower()
            
            csv_file = os.path.abspath(file)
            # print(csv_file)

            if (file_name == "person.csv"):
                hook.bulk_load_custom(table="person", tmp_file=csv_file, duplicate_key_handling="REPLACE", extra_options="FIELDS TERMINATED BY ',' ENCLOSED BY '' LINES TERMINATED BY '\n' IGNORE 1 LINES")
            elif(file_name == "visit.csv"):
                hook.bulk_load_custom(table="visit", tmp_file=csv_file, duplicate_key_handling="REPLACE", extra_options="FIELDS TERMINATED BY ',' ENCLOSED BY '' LINES TERMINATED BY '\n' IGNORE 1 LINES")
            else:
                print("File not found.")
                
    def _send_line_notify():
        url = 'https://notify-api.line.me/api/notify'
        token = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
        headers = {'content-type':'application/x-www-form-urlencoded','Authorization':'Bearer '+token}

        date = datetime.today() - timedelta(days=1)
        processDate = date.strftime("%Y-%m-%d")

        msg = 'ประมวลผลวันที่ %s เสร็จเรียบร้อย' % (processDate)
        r = requests.post(url, headers=headers, data = {'message':msg})    
        
        
    start = PythonOperator(task_id="start", python_callable=_start)
    end = PythonOperator(task_id="send_line_notify", python_callable=_send_line_notify)
    download = PythonOperator(task_id="download", python_callable=_download)
    extract = PythonOperator(task_id="extract", python_callable=_extract) 
    load = PythonOperator(task_id="load", python_callable=_load)
  
  
start >> download >> extract >> load >> end

@siteslave
Copy link
Author

from datetime import timedelta
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from airflow.utils.trigger_rule import TriggerRule

from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.models.variable import Variable
from datetime import datetime
import os
from pandas import DataFrame

import pendulum

default_args = {
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
    "owner": "satit",
    "pool": "my_pool",
}

with DAG(
    dag_id="04_jhcis_to_s3",
    start_date=pendulum.datetime(2022, 11, 1, tz="Asia/Bangkok"),
    schedule=None,
    catchup=False,
    tags=["s3"],
    default_args=default_args
) as dag:
    
    def _get_visit(ti):
        hook = MySqlHook(mysql_conn_id="mysql-jhcis")
        # now = datetime.now()
        
        visit_date = "2018-06-05" # now.strftime("%Y-%m-%d")
        
        sql = f"""
        SELECT pcucode, visitno, visitdate, pid, rightcode, weight, height, pressure, waist 
        from visit 
        where visitdate = date('{visit_date}')
        """
        
        print(sql)
        
        df: DataFrame = hook.get_pandas_df(sql=sql)
        vn = df["visitno"].to_list()
        hn = df["pid"].to_list()
        
        ti.xcom_push(key="vn", value=vn)
        ti.xcom_push(key="hn", value=hn)

        export_path = ti.xcom_pull(task_ids="start", key="export_path")
        csv_path = f"{export_path}/visit.csv"
        
        df.to_csv(csv_path, index=False)
    
    def _get_person(ti):
        
        hook = MySqlHook(mysql_conn_id="mysql-jhcis")
        
        pid = ti.xcom_pull(task_ids="get_visit", key="hn")
        
        pid_str = ()
        
        for id in pid:
          pid_str = pid_str + (id,)
        
        try:
            
            sql = f"""
            select pcucodeperson as hospcode, pid, prename, fname, lname, birth, sex 
            from person
            where pid in {pid_str};
            """
            
            print(sql)
            
            df: DataFrame = hook.get_pandas_df(sql=sql)
        
            export_path = ti.xcom_pull(task_ids="start", key="export_path")
            csv_path = f"{export_path}/person.csv"
            
            df.to_csv(csv_path, index=False)
        except Exception:
            raise("ไม่สามารถส่งออกข้อมูล PERSON ได้")
    
    def _create_zip(ti):
        
        from zipfile import ZipFile
        from os.path import abspath, basename
        
        now = datetime.now()
        str_time = now.strftime("%Y%m%d%H%M%S")
        
        export_path = ti.xcom_pull(task_ids="start", key="export_path")
        
        zip_path = f"./files/gw_{str_time}.zip"
                
        ti.xcom_push(key="zip_file_path", value=zip_path)
        
        files = []
        
        for file in os.listdir(export_path):
            if (file.endswith(".csv")):
                csv_file = os.path.join(export_path, basename(file))
                files.append(csv_file)
        
        with ZipFile(zip_path, 'w') as myzip:
            for f in files:
                print(f)
                myzip.write(f, basename(f))
    
    def _upload_to_s3(ti):
        from minio import Minio
        
        endpoint = Variable.get("minio_endpoint", "203.157.102.60:39000")
        access_key = Variable.get("minio_accesskey")
        secret_key = Variable.get("minio_secretkey")
        
        client = Minio(endpoint=endpoint, access_key=access_key, secret_key=secret_key, secure=False)
        
        zip_path = ti.xcom_pull(task_ids="create_zip", key="zip_file_path")
        # zip_path = f"./files/satit-air-20221101041420.zip"
        
        print(zip_path)
        
        client.fput_object(bucket_name="airflow", object_name=os.path.basename(zip_path), file_path=zip_path)

    def _start(ti):
        now = datetime.now()
        str_time = now.strftime("%Y%m%d%H%M%S")
        export_path = os.path.join("export", str_time)
        
        ti.xcom_push(key="export_path", value=export_path)
        
        os.makedirs(export_path, exist_ok=True)
        
    start = PythonOperator(task_id="start", python_callable=_start)
    get_visit = PythonOperator(task_id='get_visit', python_callable=_get_visit, pool="my_pool", pool_slots=1)
    
    with TaskGroup(group_id='export_data') as group1:
        get_person = PythonOperator(task_id='get_person', python_callable=_get_person, pool="my_pool", pool_slots=1)
        get_diag = EmptyOperator(task_id="get_diag")
        
        [get_person, get_diag]
      
    create_zip = PythonOperator(task_id="create_zip", python_callable=_create_zip)
    upload_to_s3 = PythonOperator(task_id="upload_to_s3", python_callable=_upload_to_s3)
    
    success = EmptyOperator(task_id="success")
    failed = EmptyOperator(task_id="failed", trigger_rule=TriggerRule.ONE_FAILED)
    
start >> get_visit >> group1 >> create_zip >> upload_to_s3 >> [success, failed]
create_zip >> failed
  

@siteslave
Copy link
Author

customize operator

his_gateway/demo_operator.py:

from airflow.models.baseoperator import BaseOperator

class DemoOperator(BaseOperator):
    def __init__(self, name: str, **kwargs) -> None:
        super().__init__(**kwargs)
        self.name = name

    def execute(self, context):
        message = f"Hello {self.name}"
        print(message)
        return message

06_demo_operator.py:

from datetime import timedelta
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from his_gateway.demo_operator import DemoOperator

import pendulum

# import sys
# sys.path.insert(0, './his_gateway')

default_args = {
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
    "owner": "satit"
}

with DAG(
    dag_id="06_demo_operator",
    start_date=pendulum.datetime(2022, 10, 26, tz="Asia/Bangkok"),
    schedule=None,
    catchup=True,
    tags=["tutorial"],
    default_args=default_args
) as dag:
  
  start = EmptyOperator(task_id="start")
  demo = DemoOperator(task_id="demo", name="Satit Rianpit")
  
start >> demo

@siteslave
Copy link
Author

Python send line notify

def send_line_notify():
  url = 'https://notify-api.line.me/api/notify'
  token = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
  headers = {'content-type':'application/x-www-form-urlencoded','Authorization':'Bearer '+token}

  date = datetime.today() - timedelta(days=1)
  processDate = date.strftime("%Y-%m-%d")

  msg = 'ประมวลผลวันที่ %s เสร็จเรียบร้อย' % (processDate)
  r = requests.post(url, headers=headers, data = {'message':msg})

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment