Skip to content

Instantly share code, notes, and snippets.

@siteslave
Last active November 2, 2022 04:40
Show Gist options
  • Save siteslave/bddff767151d85bf46039c977cbbd44c to your computer and use it in GitHub Desktop.
Save siteslave/bddff767151d85bf46039c977cbbd44c to your computer and use it in GitHub Desktop.
Airflow Tutorial@MOPH

Ceate directories:

mkdir -p ./dags ./logs ./plugins

Create .env

echo -e "AIRFLOW_UID=$(id -u)" > .env

docker-compose.yaml

---
version: '3'
x-airflow-common:
  &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.4.2}
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: LocalExecutor
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: mysql+mysqldb://airflow:[email protected]:3306/airflow_db
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DEFAULT_TIMEZONE: 'Asia/Bangkok'
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
    ARIFLOW__CORE__LOAD_DEFAULT_CONNECTIONS: 'false'
    AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth'
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
  extra_hosts: 
    - "host.docker.internal:host-gateway"
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
  user: "${AIRFLOW_UID:-50000}:0"

services:
  
  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - 8080:8080
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

  airflow-triggerer:
    <<: *airflow-common
    command: triggerer
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

  airflow-init:
    <<: *airflow-common
    entrypoint: /bin/bash
    # yamllint disable rule:line-length
    command:
      - -c
      - |
        function ver() {
          printf "%04d%04d%04d%04d" $${1//./ }
        }
        airflow_version=$$(AIRFLOW__LOGGING__LOGGING_LEVEL=INFO && gosu airflow airflow version)
        airflow_version_comparable=$$(ver $${airflow_version})
        min_airflow_version=2.2.0
        min_airflow_version_comparable=$$(ver $${min_airflow_version})
        if (( airflow_version_comparable < min_airflow_version_comparable )); then
          echo
          echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m"
          echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!"
          echo
          exit 1
        fi
        if [[ -z "${AIRFLOW_UID}" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
          echo "If you are on Linux, you SHOULD follow the instructions below to set "
          echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
          echo "For other operating systems you can get rid of the warning with manually created .env file:"
          echo "    See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user"
          echo
        fi
        one_meg=1048576
        mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
        cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
        disk_available=$$(df / | tail -1 | awk '{print $$4}')
        warning_resources="false"
        if (( mem_available < 4000 )) ; then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
          echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
          echo
          warning_resources="true"
        fi
        if (( cpus_available < 2 )); then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
          echo "At least 2 CPUs recommended. You have $${cpus_available}"
          echo
          warning_resources="true"
        fi
        if (( disk_available < one_meg * 10 )); then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
          echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
          echo
          warning_resources="true"
        fi
        if [[ $${warning_resources} == "true" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
          echo "Please follow the instructions to increase amount of resources available:"
          echo "   https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"
          echo
        fi
        mkdir -p /sources/logs /sources/dags /sources/plugins
        chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
        exec /entrypoint airflow version
    # yamllint enable rule:line-length
    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_UPGRADE: 'true'
      _AIRFLOW_WWW_USER_CREATE: 'true'
      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
      _PIP_ADDITIONAL_REQUIREMENTS: ''
    user: "0:0"
    volumes:
      - .:/sources

@siteslave
Copy link
Author

customize operator

his_gateway/demo_operator.py:

from airflow.models.baseoperator import BaseOperator

class DemoOperator(BaseOperator):
    def __init__(self, name: str, **kwargs) -> None:
        super().__init__(**kwargs)
        self.name = name

    def execute(self, context):
        message = f"Hello {self.name}"
        print(message)
        return message

06_demo_operator.py:

from datetime import timedelta
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from his_gateway.demo_operator import DemoOperator

import pendulum

# import sys
# sys.path.insert(0, './his_gateway')

default_args = {
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
    "owner": "satit"
}

with DAG(
    dag_id="06_demo_operator",
    start_date=pendulum.datetime(2022, 10, 26, tz="Asia/Bangkok"),
    schedule=None,
    catchup=True,
    tags=["tutorial"],
    default_args=default_args
) as dag:
  
  start = EmptyOperator(task_id="start")
  demo = DemoOperator(task_id="demo", name="Satit Rianpit")
  
start >> demo

@siteslave
Copy link
Author

Python send line notify

def send_line_notify():
  url = 'https://notify-api.line.me/api/notify'
  token = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
  headers = {'content-type':'application/x-www-form-urlencoded','Authorization':'Bearer '+token}

  date = datetime.today() - timedelta(days=1)
  processDate = date.strftime("%Y-%m-%d")

  msg = 'ประมวลผลวันที่ %s เสร็จเรียบร้อย' % (processDate)
  r = requests.post(url, headers=headers, data = {'message':msg})

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment