Created
December 13, 2020 09:17
-
-
Save audhiaprilliant/8c51e0f6cfb33166018314aab0db9ca7 to your computer and use it in GitHub Desktop.
Apache Airflow as Job Orchestration
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Modules for airflow | |
| from airflow import DAG | |
| from datetime import timedelta, datetime | |
| from airflow.utils.dates import days_ago | |
| from airflow.operators.bash_operator import BashOperator | |
| from airflow.operators.python_operator import PythonOperator | |
| from airflow.operators.email_operator import EmailOperator | |
| # Modules for web scraping | |
| import requests | |
| from bs4 import BeautifulSoup | |
| # Module for data manipulation | |
| import pandas as pd | |
| # Module for regular expression | |
| import re | |
| # Module for file management | |
| import os | |
| # Module for timing | |
| from datetime import datetime | |
| # Module for reading JSON file | |
| import json | |
| # Setting for datetime and directory | |
| dir_path = '/home/audhi/airflow' | |
| current_date = pd.to_datetime('today').strftime('%Y-%m-%d') | |
| # Setting for date transformation | |
| dict_month = {'Januari':'01','Februari':'02','Maret':'03','April':'04','Mei':'05','Juni':'06','Juli':'07', | |
| 'Agustus':'08','September':'09','Oktober':'10','November':'11','Desember':'12'} | |
| # Read JSON object as a dictionary | |
| with open(dir_path+'/config/telegram.json') as tele_data: | |
| telegram_bot = json.load(tele_data) | |
| telegram_chatid = telegram_bot['result'][0]['message']['chat']['id'] | |
| telegram_token = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' | |
| def get_url(): | |
| # URL | |
| url = 'https://www.kompas.com/covid-19' | |
| page = requests.get(url) | |
| # Wrangling HTML with BeautifulSoup | |
| soup = BeautifulSoup(page.content,'html.parser') | |
| return(soup) | |
| def get_current_date(**kwargs): | |
| date_scrape = soup.find('span',class_='covid__date').text | |
| date_scrape = re.findall(r'Update terakhir: (\S+.+WIB)',date_scrape)[0].replace(', ',',') | |
| date = date_scrape.split(',')[0] | |
| time = date_scrape.split(',')[1] | |
| # Date manipulation | |
| date_format = re.findall(r'\w+',date)[0] | |
| month_format = re.findall(r'\w+',date)[1] | |
| year_format = re.findall(r'\w+',date)[2] | |
| # If condition | |
| if len(date_format) == 1: | |
| date_format = '0' + date_format | |
| else: | |
| date_format = date_format | |
| # New date format | |
| date = year_format+'/'+dict_month.get(month_format)+'/'+date_format | |
| return(date,time) | |
| def get_daily_summary(**kwargs): | |
| soup = get_url() | |
| date,time = get_current_date() | |
| # Get summary | |
| # Regular expression pattern | |
| pattern_summary = re.compile(r'\d[^\s]+') | |
| for job_elem in soup.find_all('div',class_='covid__box'): | |
| # Each job_elem is a new BeautifulSoup object. | |
| terkonfirmasi_elem = job_elem.find('div',class_='covid__box2 -cases') | |
| dirawat_elem = job_elem.find('div',class_='covid__box2 -odp') | |
| meninggal_elem = job_elem.find('div',class_='covid__box2 -gone') | |
| sembuh_elem = job_elem.find('div',class_='covid__box2 -health') | |
| # Daily update | |
| a = pattern_summary.findall(terkonfirmasi_elem.text)[0].replace(',','') | |
| b = pattern_summary.findall(dirawat_elem.text)[0].replace(',','') | |
| c = pattern_summary.findall(meninggal_elem.text)[0].replace(',','') | |
| d = pattern_summary.findall(sembuh_elem.text)[0].replace(',','') | |
| daily_update = ','.join([date,time,a,b,c,d]) | |
| return(daily_update) | |
| def get_daily_summary_provinces(**kwargs): | |
| soup = get_url() | |
| date,time = get_current_date() | |
| # Get summary - provinsi | |
| # Regular expression pattern | |
| pattern_prov = re.compile(r'\d+') | |
| provinsi = [] | |
| terkonfirmasi_prov = [] | |
| meninggal_prov = [] | |
| sembuh_prov = [] | |
| for elem in soup.find_all('div',class_='covid__row'): | |
| provinsi_elem = elem.find('div',class_='covid__prov') | |
| terkonfirmasi_elem = elem.find('span',class_='-odp') | |
| meninggal_elem = elem.find('span',class_='-gone') | |
| sembuh_elem = elem.find('span',class_='-health') | |
| # Append to list | |
| provinsi.append(provinsi_elem.text) | |
| terkonfirmasi_prov.append(pattern_prov.findall(terkonfirmasi_elem.text)[0]) | |
| meninggal_prov.append(pattern_prov.findall(meninggal_elem.text)[0]) | |
| sembuh_prov.append(pattern_prov.findall(sembuh_elem.text)[0]) | |
| # Create dataframe | |
| dic_data = {'date':[date]*len(provinsi),'time':[time]*len(provinsi),'provinces':provinsi, | |
| 'confirmed':terkonfirmasi_prov,'deaths':meninggal_prov,'recovered':sembuh_prov} | |
| df = pd.DataFrame(data = dic_data) | |
| return(df) | |
| def summary_save_txt(**context): | |
| value = context['task_instance'].xcom_pull(task_ids = 'summary_scraping_data') | |
| with open(dir_path+'/data/covid19/summary_covid19.txt','r') as f: | |
| lines = f.read().splitlines() | |
| last_line = lines[-1] | |
| if last_line == value: | |
| notif = 'Last update:',re.findall(r'^(.+?),',last_line)[0] | |
| else: | |
| with open(dir_path+'/data/covid19/summary_covid19.txt','a+') as ff: | |
| ff.write('{}\n'.format(value)) | |
| ff.close() | |
| notif = 'Up to date data:', re.findall(r'^(.+?),',value)[0] | |
| return(notif) | |
| def provinces_save_csv(**context): | |
| value = context['task_instance'].xcom_pull(task_ids = 'provinces_scraping_data') | |
| with open(dir_path+'/data/covid19/daily_update_covid.csv','r') as f: | |
| lines = f.read().splitlines() | |
| last_line = lines[-1] | |
| if re.findall(r'^(.+?),',last_line)[0] == value['date'].unique().tolist()[0]: | |
| notif = 'Last update:',re.findall(r'^(.+?),',last_line)[0] | |
| else: | |
| with open(dir_path+'/data/covid19/daily_update_covid.csv','a') as ff: | |
| value.to_csv(ff,header=False,index=False) | |
| ff.close() | |
| notif = 'Up to date data:', value['date'].unique().tolist()[0] | |
| return(notif) | |
| def telegram_bot(**context): | |
| value_daily = list(context['task_instance'].xcom_pull(task_ids = 'summary_scraping_data').split(',')) | |
| value_date = context['task_instance'].xcom_pull(task_ids = 'provinces_save_data') | |
| value_render = ' '.join([str(elem) for elem in list(value_date)]) | |
| message = ''' | |
| COVID-19 DATA - {update_date} {time} | |
| Kompas News | |
| Confirmed: {confirmed} | |
| Treated: {treated} | |
| Deaths: {deaths} | |
| Recovered: {recover} | |
| Scraped on: {scraped_date} WIB | |
| Developed by Audhi Aprilliant | |
| '''.format(update_date = value_render,time = value_daily[1],confirmed = value_daily[2],treated = value_daily[3],\ | |
| deaths = value_daily[4],recover = value_daily[5],scraped_date = pd.to_datetime('today').strftime('%m-%d-%Y %H:%M:%S')) | |
| # Send text message | |
| bot_token = telegram_token | |
| bot_chatID = str(telegram_chatid) | |
| send_text = 'https://api.telegram.org/bot'+bot_token+'/sendMessage?chat_id='+bot_chatID+'&parse_mode=Markdown&text='+message | |
| requests.get(send_text) | |
| # Set default args | |
| default_args = { | |
| 'owner': 'airflow', | |
| 'depends_on_past': False, | |
| 'start_date': datetime(2020, 5, 20), | |
| 'email': ['[email protected]'], | |
| 'email_on_failure': True, | |
| 'email_on_retry': False, | |
| 'retries': 3, | |
| 'retry_delay': timedelta(minutes=2) | |
| } | |
| # Set Schedule: Run pipeline once a day. | |
| # Use cron to define exact time (UTC). Eg. 8:15 AM would be '15 08 * * *' | |
| schedule_interval = '30 09 * * *' | |
| # Define DAG: Set ID and assign default args and schedule interval | |
| dag = DAG( | |
| dag_id = 'scraping_data_covid19', | |
| default_args = default_args, | |
| schedule_interval = schedule_interval | |
| ) | |
| # Echo task start | |
| task_start = BashOperator( | |
| task_id = 'start_task', | |
| bash_command = 'echo start', | |
| dag = dag | |
| ) | |
| # Task 1: scraping daily summary data | |
| summary_scraping = PythonOperator( | |
| task_id = 'summary_scraping_data', | |
| python_callable = get_daily_summary, | |
| dag = dag | |
| ) | |
| # Task 2: save the daily summary data | |
| summary_save = PythonOperator( | |
| task_id = 'summary_save_data', | |
| python_callable = summary_save_txt, | |
| provide_context = True, | |
| dag = dag | |
| ) | |
| # Task 3: scraping daily provinces data | |
| provinces_scraping = PythonOperator( | |
| task_id = 'provinces_scraping_data', | |
| python_callable = get_daily_summary_provinces, | |
| dag = dag | |
| ) | |
| # Task 4: save the daily summary data | |
| provinces_save = PythonOperator( | |
| task_id = 'provinces_save_data', | |
| python_callable = provinces_save_csv, | |
| provide_context = True, | |
| dag = dag | |
| ) | |
| # Task 5: send norification email | |
| send_email = EmailOperator( | |
| task_id = 'send_email', | |
| to = ['[email protected]'], | |
| subject = 'Covid19 data ', | |
| html_content = ''' | |
| {date} Covid19 data has been scraped! | |
| '''.format(date = current_date), | |
| dag = dag | |
| ) | |
| # Task 6: send notification Telegram | |
| send_telegram = PythonOperator( | |
| task_id = 'send_telegram', | |
| python_callable = telegram_bot, | |
| provide_context = True, | |
| dag = dag | |
| ) | |
| # Echo task finish | |
| finish_start = BashOperator( | |
| task_id = 'finish_task', | |
| bash_command = 'echo finish', | |
| dag = dag | |
| ) | |
| # Set up the dependencies | |
| task_start >> summary_scraping >> summary_save >> provinces_scraping | |
| provinces_scraping >> provinces_save >> send_email >> send_telegram >> finish_start |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment