Skip to content

Instantly share code, notes, and snippets.

@audhiaprilliant
Created December 13, 2020 09:17
Show Gist options
  • Select an option

  • Save audhiaprilliant/8c51e0f6cfb33166018314aab0db9ca7 to your computer and use it in GitHub Desktop.

Select an option

Save audhiaprilliant/8c51e0f6cfb33166018314aab0db9ca7 to your computer and use it in GitHub Desktop.
Apache Airflow as Job Orchestration
# Modules for airflow
from airflow import DAG
from datetime import timedelta, datetime
from airflow.utils.dates import days_ago
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator
# Modules for web scraping
import requests
from bs4 import BeautifulSoup
# Module for data manipulation
import pandas as pd
# Module for regular expression
import re
# Module for file management
import os
# Module for timing
from datetime import datetime
# Module for reading JSON file
import json
# Setting for datetime and directory
dir_path = '/home/audhi/airflow'
current_date = pd.to_datetime('today').strftime('%Y-%m-%d')
# Setting for date transformation
dict_month = {'Januari':'01','Februari':'02','Maret':'03','April':'04','Mei':'05','Juni':'06','Juli':'07',
'Agustus':'08','September':'09','Oktober':'10','November':'11','Desember':'12'}
# Read JSON object as a dictionary
with open(dir_path+'/config/telegram.json') as tele_data:
telegram_bot = json.load(tele_data)
telegram_chatid = telegram_bot['result'][0]['message']['chat']['id']
telegram_token = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
def get_url():
# URL
url = 'https://www.kompas.com/covid-19'
page = requests.get(url)
# Wrangling HTML with BeautifulSoup
soup = BeautifulSoup(page.content,'html.parser')
return(soup)
def get_current_date(**kwargs):
date_scrape = soup.find('span',class_='covid__date').text
date_scrape = re.findall(r'Update terakhir: (\S+.+WIB)',date_scrape)[0].replace(', ',',')
date = date_scrape.split(',')[0]
time = date_scrape.split(',')[1]
# Date manipulation
date_format = re.findall(r'\w+',date)[0]
month_format = re.findall(r'\w+',date)[1]
year_format = re.findall(r'\w+',date)[2]
# If condition
if len(date_format) == 1:
date_format = '0' + date_format
else:
date_format = date_format
# New date format
date = year_format+'/'+dict_month.get(month_format)+'/'+date_format
return(date,time)
def get_daily_summary(**kwargs):
soup = get_url()
date,time = get_current_date()
# Get summary
# Regular expression pattern
pattern_summary = re.compile(r'\d[^\s]+')
for job_elem in soup.find_all('div',class_='covid__box'):
# Each job_elem is a new BeautifulSoup object.
terkonfirmasi_elem = job_elem.find('div',class_='covid__box2 -cases')
dirawat_elem = job_elem.find('div',class_='covid__box2 -odp')
meninggal_elem = job_elem.find('div',class_='covid__box2 -gone')
sembuh_elem = job_elem.find('div',class_='covid__box2 -health')
# Daily update
a = pattern_summary.findall(terkonfirmasi_elem.text)[0].replace(',','')
b = pattern_summary.findall(dirawat_elem.text)[0].replace(',','')
c = pattern_summary.findall(meninggal_elem.text)[0].replace(',','')
d = pattern_summary.findall(sembuh_elem.text)[0].replace(',','')
daily_update = ','.join([date,time,a,b,c,d])
return(daily_update)
def get_daily_summary_provinces(**kwargs):
soup = get_url()
date,time = get_current_date()
# Get summary - provinsi
# Regular expression pattern
pattern_prov = re.compile(r'\d+')
provinsi = []
terkonfirmasi_prov = []
meninggal_prov = []
sembuh_prov = []
for elem in soup.find_all('div',class_='covid__row'):
provinsi_elem = elem.find('div',class_='covid__prov')
terkonfirmasi_elem = elem.find('span',class_='-odp')
meninggal_elem = elem.find('span',class_='-gone')
sembuh_elem = elem.find('span',class_='-health')
# Append to list
provinsi.append(provinsi_elem.text)
terkonfirmasi_prov.append(pattern_prov.findall(terkonfirmasi_elem.text)[0])
meninggal_prov.append(pattern_prov.findall(meninggal_elem.text)[0])
sembuh_prov.append(pattern_prov.findall(sembuh_elem.text)[0])
# Create dataframe
dic_data = {'date':[date]*len(provinsi),'time':[time]*len(provinsi),'provinces':provinsi,
'confirmed':terkonfirmasi_prov,'deaths':meninggal_prov,'recovered':sembuh_prov}
df = pd.DataFrame(data = dic_data)
return(df)
def summary_save_txt(**context):
value = context['task_instance'].xcom_pull(task_ids = 'summary_scraping_data')
with open(dir_path+'/data/covid19/summary_covid19.txt','r') as f:
lines = f.read().splitlines()
last_line = lines[-1]
if last_line == value:
notif = 'Last update:',re.findall(r'^(.+?),',last_line)[0]
else:
with open(dir_path+'/data/covid19/summary_covid19.txt','a+') as ff:
ff.write('{}\n'.format(value))
ff.close()
notif = 'Up to date data:', re.findall(r'^(.+?),',value)[0]
return(notif)
def provinces_save_csv(**context):
value = context['task_instance'].xcom_pull(task_ids = 'provinces_scraping_data')
with open(dir_path+'/data/covid19/daily_update_covid.csv','r') as f:
lines = f.read().splitlines()
last_line = lines[-1]
if re.findall(r'^(.+?),',last_line)[0] == value['date'].unique().tolist()[0]:
notif = 'Last update:',re.findall(r'^(.+?),',last_line)[0]
else:
with open(dir_path+'/data/covid19/daily_update_covid.csv','a') as ff:
value.to_csv(ff,header=False,index=False)
ff.close()
notif = 'Up to date data:', value['date'].unique().tolist()[0]
return(notif)
def telegram_bot(**context):
value_daily = list(context['task_instance'].xcom_pull(task_ids = 'summary_scraping_data').split(','))
value_date = context['task_instance'].xcom_pull(task_ids = 'provinces_save_data')
value_render = ' '.join([str(elem) for elem in list(value_date)])
message = '''
COVID-19 DATA - {update_date} {time}
Kompas News
Confirmed: {confirmed}
Treated: {treated}
Deaths: {deaths}
Recovered: {recover}
Scraped on: {scraped_date} WIB
Developed by Audhi Aprilliant
'''.format(update_date = value_render,time = value_daily[1],confirmed = value_daily[2],treated = value_daily[3],\
deaths = value_daily[4],recover = value_daily[5],scraped_date = pd.to_datetime('today').strftime('%m-%d-%Y %H:%M:%S'))
# Send text message
bot_token = telegram_token
bot_chatID = str(telegram_chatid)
send_text = 'https://api.telegram.org/bot'+bot_token+'/sendMessage?chat_id='+bot_chatID+'&parse_mode=Markdown&text='+message
requests.get(send_text)
# Set default args
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 5, 20),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=2)
}
# Set Schedule: Run pipeline once a day.
# Use cron to define exact time (UTC). Eg. 8:15 AM would be '15 08 * * *'
schedule_interval = '30 09 * * *'
# Define DAG: Set ID and assign default args and schedule interval
dag = DAG(
dag_id = 'scraping_data_covid19',
default_args = default_args,
schedule_interval = schedule_interval
)
# Echo task start
task_start = BashOperator(
task_id = 'start_task',
bash_command = 'echo start',
dag = dag
)
# Task 1: scraping daily summary data
summary_scraping = PythonOperator(
task_id = 'summary_scraping_data',
python_callable = get_daily_summary,
dag = dag
)
# Task 2: save the daily summary data
summary_save = PythonOperator(
task_id = 'summary_save_data',
python_callable = summary_save_txt,
provide_context = True,
dag = dag
)
# Task 3: scraping daily provinces data
provinces_scraping = PythonOperator(
task_id = 'provinces_scraping_data',
python_callable = get_daily_summary_provinces,
dag = dag
)
# Task 4: save the daily summary data
provinces_save = PythonOperator(
task_id = 'provinces_save_data',
python_callable = provinces_save_csv,
provide_context = True,
dag = dag
)
# Task 5: send norification email
send_email = EmailOperator(
task_id = 'send_email',
to = ['[email protected]'],
subject = 'Covid19 data ',
html_content = '''
{date} Covid19 data has been scraped!
'''.format(date = current_date),
dag = dag
)
# Task 6: send notification Telegram
send_telegram = PythonOperator(
task_id = 'send_telegram',
python_callable = telegram_bot,
provide_context = True,
dag = dag
)
# Echo task finish
finish_start = BashOperator(
task_id = 'finish_task',
bash_command = 'echo finish',
dag = dag
)
# Set up the dependencies
task_start >> summary_scraping >> summary_save >> provinces_scraping
provinces_scraping >> provinces_save >> send_email >> send_telegram >> finish_start
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment