Last active
October 11, 2017 20:03
-
-
Save SohierDane/f545be3040a0de86f9016b94576dd24b to your computer and use it in GitHub Desktop.
NBER macrohistory database preparation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Download all files from | |
http://www.nber.org/databases/macrohistory/contents/ | |
and repackage into a data csv and a documentation csv | |
""" | |
import argparse | |
import os | |
import pandas as pd | |
import requests | |
from random import random | |
from time import sleep | |
CHAPTERS = [str(x).zfill(2) for x in range(1, 17)] | |
# format with chapter, filename, file format (dat or doc) | |
DATA_BASE_URL = "http://www.nber.org/databases/macrohistory/data/{0}/{1}.db" | |
DOCS_BASE_URL = "http://www.nber.org/databases/macrohistory/rectdata/{0}/docs/{1}.txt" | |
EXPORT_DATA_DIRECTORY = 'cleaned_files' | |
HTTP_RESPONSE_SUCCESS_CODE_PREFIX = '2' | |
MAX_RETRIES = 3 | |
MONTHS_PER_QUARTER = 3 | |
QUARTERS_PER_YEAR = 4 | |
MONTHS_PER_YEAR = 12 | |
RAW_DATA_DIRECTORY = 'raw_files' | |
ROOT_URL = "http://www.nber.org/databases/macrohistory/contents/chapter{0}.html" | |
def download_file(url, file_name, file_type): | |
file_path = os.path.join(RAW_DATA_DIRECTORY, file_name + '.' + file_type) | |
if os.path.exists(file_path): | |
return None | |
response = None | |
for i in range(MAX_RETRIES): | |
try: | |
response = requests.get(url) | |
break | |
except: | |
sleep(1 + random()) | |
if not str(response.status_code).startswith(HTTP_RESPONSE_SUCCESS_CODE_PREFIX) or not response: | |
print(f"Failed to download {url}. Last response code: {response.status_code}") | |
return None | |
with open(file_path, 'w+') as f_open: | |
f_open.write(response.text) | |
def period_to_month(period, observations_per_year): | |
if observations_per_year == QUARTERS_PER_YEAR: | |
return (period - 1) * MONTHS_PER_QUARTER + 1 | |
return period | |
def db_period_to_timestamp(db_period, observations_per_year): | |
# returns beginning of period date | |
db_components = db_period.split('.') | |
year = int(db_components[0]) | |
day = 1 | |
if len(db_components) > 1 and db_components[1]: | |
month = period_to_month(int(db_components[1]), observations_per_year) | |
else: | |
month = 1 | |
return pd.Timestamp(year, month, day) | |
def safe_float_conversion(num_string): | |
# there are some gibberish values (looks like file corruption) | |
# that must be dropped | |
try: | |
return float(num_string) | |
except: | |
return pd.np.nan | |
def microtsp_db_to_dataframe(file_name): | |
""" | |
Convert a micro tsp style .db file to a tidy pandas DataFrame | |
file format: https://en.wikipedia.org/wiki/Databank_format | |
example file: http://www.nber.org/databases/macrohistory/data/01/a01042a.db | |
""" | |
file_path = os.path.join(RAW_DATA_DIRECTORY, file_name + '.db') | |
with open(file_path, encoding='iso-8859-1') as f_open: | |
db_lines = f_open.readlines() | |
db_lines = [x.strip() for x in db_lines if not x.startswith('"')] | |
db_lines = [x for x in db_lines if x != ''] | |
try: | |
observations_per_year = db_lines.pop(0) | |
observations_per_year = abs(int(observations_per_year.strip('.'))) | |
start_date = db_lines.pop(0) | |
start_date = db_period_to_timestamp(start_date, observations_per_year) | |
db_lines.pop(0) | |
except: | |
# some files are malformed, skip them rather than trying to fix. | |
return None | |
if len(db_lines) == 0: | |
return None | |
date_range = pd.date_range(start=start_date, periods=len(db_lines), | |
freq=f'{int(MONTHS_PER_YEAR/observations_per_year)}M') | |
df = pd.DataFrame([x for x in db_lines], index=date_range, columns=['Value']) | |
df.index.name = 'Date' | |
df.reset_index(inplace=True) | |
df['Variable'] = file_name | |
df['Value'] = df['Value'].apply(safe_float_conversion) | |
return df | |
def get_chapter_urls(chapter): | |
df = pd.read_html(ROOT_URL.format(chapter))[1] | |
df.columns = ['db', 'dat', 'doc', 'file_name', 'description'] | |
df['data_url'] = df['file_name'].apply(lambda x: DATA_BASE_URL.format(chapter, x)) | |
df['doc_url'] = df['file_name'].apply(lambda x: DOCS_BASE_URL.format(chapter, x)) | |
return df[['file_name', 'data_url', 'doc_url', 'description']] | |
def download_chapter(chapter, file_descriptions): | |
chapter_metadata = get_chapter_urls(chapter) | |
chapter_metadata.apply(lambda row: | |
download_file(row['doc_url'], row['file_name'], 'txt'), axis=1) | |
chapter_metadata.apply(lambda row: | |
download_file(row['data_url'], row['file_name'], 'db'), axis=1) | |
if file_descriptions is None: | |
return chapter_metadata[['file_name', 'description']].copy() | |
else: | |
return file_descriptions.append(chapter_metadata[['file_name', 'description']]) | |
def prepare_folders(): | |
if not os.path.exists(RAW_DATA_DIRECTORY): | |
os.mkdir(RAW_DATA_DIRECTORY) | |
if not os.path.exists(EXPORT_DATA_DIRECTORY): | |
os.mkdir(EXPORT_DATA_DIRECTORY) | |
def download_all_chapters(): | |
file_descriptions = None | |
for chapter in CHAPTERS: | |
file_descriptions = download_chapter(chapter, file_descriptions) | |
print(f'Chapter {chapter} download complete') | |
file_desc_path = os.path.join(EXPORT_DATA_DIRECTORY, "file_descriptions.csv") | |
file_descriptions.to_csv(file_desc_path, index=False) | |
def process_raw_db_files(): | |
# aggregate .db files into a large csv, all docs into a csv | |
db_files = [x for x in os.listdir(RAW_DATA_DIRECTORY) if x.endswith('.db')] | |
db_files = [x.split('.')[0] for x in db_files] | |
db_files = [microtsp_db_to_dataframe(x) for x in db_files] | |
db_files = [x for x in db_files if x is not None] | |
df = pd.concat(db_files, ignore_index=True) | |
df.Date = df.Date.apply(lambda x: x.replace(day=1)) | |
df.to_csv(os.path.join(EXPORT_DATA_DIRECTORY, 'data.csv'), index=False) | |
def load_doc_file(file_name): | |
path = os.path.join(RAW_DATA_DIRECTORY, file_name + '.txt') | |
with open(path) as f_open: | |
doc = f_open.read() | |
return doc | |
def process_raw_doc_files(): | |
# aggregate all docs into a csv | |
df = pd.read_csv(os.path.join(EXPORT_DATA_DIRECTORY, "file_descriptions.csv")) | |
# original documentation format has a lot of cruft to be trimmed | |
df['documentation'] = df['file_name'].apply(load_doc_file) | |
df['documentation'] = df['documentation'].str.replace('"c\s+', '') | |
df['documentation'] = df['documentation'].str.replace('\.{2,}', '') | |
df['documentation'] = df['documentation'].str.replace('\s*"\n', '\n') | |
df['chapter'] = df['file_name'].str.extract('(\d{2})\d+' , expand=False) | |
df.to_csv(os.path.join(EXPORT_DATA_DIRECTORY, "documentation.csv"), index=False) | |
def prepare_data(should_download_data): | |
prepare_folders() | |
if should_download_data: | |
download_all_chapters() | |
process_raw_db_files() | |
process_raw_doc_files() | |
def parse_args(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--download-data", help="Download the data files?", action="store_true") | |
return parser.parse_args() | |
if __name__ == '__main__': | |
args = parse_args() | |
prepare_data(should_download_data=args.download_data) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment