Last active
October 29, 2018 04:01
-
-
Save ceshine/630a7836b8554633c1e0aaa462af577b to your computer and use it in GitHub Desktop.
Scripts to scrape and extract data from the Tourism Bureau of Taiwan
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# WARNING: this script is out-dated since the last update of the Tourism Bureau website. | |
from pathlib import Path | |
import pandas as pd | |
SCHEMAS = [ | |
(201201, "schema/residence-2012-01.csv"), | |
(201101, "schema/residence-2011-01.csv") | |
] | |
DATA_FILE_PATTERN = "raw_data/{year}-{month}.xls" | |
SHEET = "Sheet3" | |
OUTPUT_FILE_PATTERN = "residence-{year}-{month}.csv" | |
OUTPUT_COLUMNS = ["Residence", "Region", "Sub-Region", "Total", "Period"] | |
def get_schema(year, month, data_path) -> pd.DataFrame: | |
i = 0 | |
while SCHEMAS[i][0] > year * 100 + month: | |
i += 1 | |
print(year, month, SCHEMAS[i]) | |
schema = pd.read_csv(Path(data_path) / SCHEMAS[i][1]) | |
return schema | |
def extract_from_excel(year: int, month: int, data_path: str = "../data"): | |
xl = pd.ExcelFile( | |
Path(data_path) / DATA_FILE_PATTERN.format(year=year, month=month)) | |
df_data = xl.parse(SHEET, skiprows=2) | |
df_schema = get_schema(year, month, data_path) | |
df_schema["Total"] = 0 | |
df_schema["Period"] = f"{year}-{month:02d}" | |
for i, row in df_schema.iterrows(): | |
df_schema.loc[i, "Total"] = int(df_data.iloc[row["Row"] - 4, 3]) | |
# Make sure the residences from both sources match | |
residence = df_schema.loc[i, "Residence"] | |
if ("Others" in residence or "Korea" in residence or | |
"United" in residence or "Russian" in residence): | |
# Exceptions | |
continue | |
data_str = df_data.iloc[row["Row"] - 4, 2] | |
if not isinstance(data_str, str) or data_str == "": | |
data_str = df_data.iloc[row["Row"] - 4, 1] | |
assert ( | |
residence == " ".join(data_str.split(" ")[1:]) | |
) | |
# Make sure the grand total is correct | |
assert ( | |
df_schema["Total"].sum() == | |
int(df_data.iloc[df_schema["Row"].max() - 4 + 1, 3]) | |
) | |
del df_schema["Row"] | |
df_schema[OUTPUT_COLUMNS].to_csv( | |
Path(data_path) / | |
OUTPUT_FILE_PATTERN.format(year=year, month=month), | |
index=False, header=False | |
) | |
if __name__ == "__main__": | |
for year in range(2011, 2018): | |
for month in range(1, 13): | |
extract_from_excel(year, month) | |
for month in range(1, 9): | |
extract_from_excel(2018, month) | |
with open("../data/residence.csv", "w") as fout: | |
fout.write(",".join(OUTPUT_COLUMNS) + "\n") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# WARNING: this script is out-dated since the last update of the Tourism Bureau website. | |
import os | |
import time | |
import re | |
from typing import List, Tuple | |
import requests | |
from retrying import retry | |
from selenium.webdriver.support.ui import WebDriverWait, Select | |
from selenium.common.exceptions import TimeoutException, NoSuchElementException | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.support import expected_conditions as EC | |
from selenium import webdriver | |
PROXY_URL = os.environ.get("SOCKS_PROXY", "192.168.199.10:12133") | |
def retry_if_timeout(exception): | |
"""Return True if we should retry (in this case when it's an IOError), False otherwise""" | |
return isinstance(exception, TimeoutException) | |
@retry(retry_on_exception=retry_if_timeout, stop_max_attempt_number=5) | |
def get_url(driver, url): | |
print(f"Fetching {url}") | |
driver.get(url) | |
def get_driver(headless: bool = False): | |
options = webdriver.ChromeOptions() | |
options.binary_location = "/usr/bin/google-chrome" | |
if headless: | |
options.add_argument('headless') | |
# options.add_argument('window-size=1920x1080') | |
options.add_argument(f'--proxy-server=socks5://{PROXY_URL}') | |
options.add_argument('--proxy-bypass-list=127.0.0.1;localhost') | |
driver = webdriver.Chrome( | |
'/opt/chromedriver', | |
chrome_options=options) | |
return driver | |
def visitors_by_residence(timestamps: List[Tuple[int, int]], output_pattern: str = "../data/raw_data/{}-{}.xls"): | |
driver = get_driver() | |
try: | |
get_url( | |
driver, "https://admin.taiwan.net.tw/statistics/month_en.aspx?no=14") | |
for year, month in timestamps: | |
select = Select(driver.find_element_by_id( | |
"ctl00_ctl00_ContentPlaceHolder1_ContentPlaceHolder1_searItm" | |
)) | |
select.select_by_visible_text("Visitor Arrivals by Residence") | |
select = Select(driver.find_element_by_id( | |
"ctl00_ctl00_ContentPlaceHolder1_ContentPlaceHolder1_searYear" | |
)) | |
select.select_by_visible_text(str(year)) | |
select = Select(driver.find_element_by_id( | |
"ctl00_ctl00_ContentPlaceHolder1_ContentPlaceHolder1_searMonth" | |
)) | |
select.select_by_visible_text(str(month)) | |
driver.find_element_by_id( | |
"ctl00_ctl00_ContentPlaceHolder1_ContentPlaceHolder1_imgSend" | |
).click() | |
link_elems = driver.find_elements_by_css_selector( | |
"div.cOneTableC9 a") | |
found = False | |
for elem in link_elems: | |
link = elem.get_attribute("href") | |
if link.endswith(".xls"): | |
found = True | |
print("Found link: ", link) | |
res = requests.get( | |
link, allow_redirects=True, | |
proxies={ | |
"https": f"socks5://{PROXY_URL}" | |
}, verify=False | |
) | |
with open(output_pattern.format(year, month), 'wb') as fout: | |
fout.write(res.content) | |
if found is False: | |
print(f"WARNING: link not found for {year}-{month}") | |
except Exception as e: | |
driver.quit() | |
raise e | |
driver.quit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment