Created
June 24, 2020 20:16
-
-
Save benfasoli/ed84258db1450dd262801d39a512be76 to your computer and use it in GitHub Desktop.
Bulk download from MesoWest API and use BigQuery streaming API to load into table
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
from datetime import datetime, timedelta | |
from multiprocessing import cpu_count, Pool | |
import os | |
from typing import List | |
from google.cloud import bigquery | |
import pandas as pd | |
import requests | |
API_URL = 'https://api.synopticdata.com/v2/stations/timeseries' | |
CACHE = 'cache.p' | |
GBQ_TABLE = 'gsv-aq-x.external.0_native' | |
TIME_FORMAT = '%Y%m%d%H%M' | |
client = bigquery.Client() | |
def bigquery_stream(payload: pd.DataFrame, table: str): | |
"""Insert dataframe into table using BigQuery streaming API | |
Args: | |
payload (pd.DataFrame): rows to insert | |
table (str): target table name, passed to `client.get_table` to | |
retrieve existing table metadata | |
Raises: | |
AssertionError: | |
""" | |
table = client.get_table(table) | |
payload = payload.copy() | |
payload.time = payload.time.astype(int) / 10**9 | |
payload = payload.where(pd.notnull(payload), None) | |
payload = payload.to_dict(orient='records') | |
n_rows = len(payload) | |
n_rows_per_page = 1000 | |
n_pages = int(n_rows / n_rows_per_page) + 1 | |
breaks = [x * n_rows_per_page for x in range(n_pages)] + [n_rows] | |
pages = [payload[breaks[i]:breaks[i+1]] for i in range(n_pages)] | |
for page in pages: | |
errors = client.insert_rows(table, page) | |
assert errors == [], 'streaming API rejected insert' | |
def paginate_request(params: dict) -> List[dict]: | |
"""Split query params into daily time ranges to avoid API quotas | |
Args: | |
params (dict): query url params passed to API, containing at least | |
`start` and `end` keys | |
Returns: | |
List[dict]: query params paginated by day | |
""" | |
t1 = datetime.strptime(params['start'], TIME_FORMAT) | |
t2 = datetime.strptime(params['end'], TIME_FORMAT) | |
dt = t2 - t1 | |
if dt < timedelta(days=6): | |
return [params] | |
breaks = [] | |
for i in range(dt.days): | |
breaks.append({ | |
**params, | |
'start': (t1 + timedelta(days=i)).strftime(TIME_FORMAT), | |
'end': (t1 + timedelta(days=i+1)).strftime(TIME_FORMAT), | |
}) | |
return breaks | |
def get_page(params: dict) -> pd.DataFrame: | |
"""Request data from MesoWest API | |
Args: | |
params (dict): query url params passed to API, containing at least | |
`start` and `end` keys. See example in __main__ section. | |
Returns: | |
pd.DataFrame: concatenated result rows with columns for x (longitude), | |
y (latitude), stid (unique station identifier), stid_name (human | |
readable station name), time, and pm25_ugm3 (mass concentration of | |
PM2.5). | |
""" | |
res = requests.get(API_URL, params=params).json() | |
if res['SUMMARY']['RESPONSE_CODE'] != 1: | |
raise requests.RequestException(res['SUMMARY']['RESPONSE_MESSAGE']) | |
df_list = [] | |
for x in res['STATION']: | |
if x['QC_FLAGGED']: | |
continue | |
try: | |
pm25_ugm3 = x['OBSERVATIONS']['PM_25_concentration_set_1'] | |
except KeyError: | |
continue | |
try: | |
pm25_ugm3_alt = x['OBSERVATIONS']['PM_25_concentration_set_2'] | |
if len(pm25_ugm3_alt) is not len(pm25_ugm3): | |
raise ValueError('Dual response sets of different length') | |
for i, y in enumerate(pm25_ugm3_alt): | |
# validate difference between multiple returned values is | |
# smaller than threshold | |
dy = pm25_ugm3[i] - y | |
if abs(dy) > 1: | |
pm25_ugm3[i] = None | |
else: | |
pm25_ugm3[i] = (pm25_ugm3[i] + y) / 2 | |
except KeyError: | |
pass | |
except (TypeError, ValueError): | |
continue | |
df_list.append( | |
pd.DataFrame({ | |
'x': x['LONGITUDE'], | |
'y': x['LATITUDE'], | |
'stid': x['STID'], | |
'stid_name': x['NAME'], | |
'time': x['OBSERVATIONS']['date_time'], | |
'pm25_ugm3': pm25_ugm3 | |
}) | |
) | |
df = pd.concat(df_list, ignore_index=True) | |
df = df.dropna() | |
df.time = pd.to_datetime(df.time) | |
return df | |
def get_batch(params: dict) -> pd.DataFrame: | |
"""Paginate params and execute parallel requests | |
Args: | |
params (dict): query url params passed to API, containing at least | |
`start` and `end` keys. See example in __main__ section. | |
Returns: | |
pd.DataFrame: concatenated result rows with columns for x (longitude), | |
y (latitude), stid (unique station identifier), stid_name (human | |
readable station name), time, and pm25_ugm3 (mass concentration of | |
PM2.5). | |
""" | |
page_params = paginate_request(params) | |
n_pages = len(page_params) | |
n_cpus = cpu_count() | |
p = Pool(min(n_cpus, n_pages)) | |
result = p.map(get_page, page_params) | |
# if result too large to fit in memory, call bigquery_stream from get_page | |
# rather than concatenating results | |
return pd.concat(result, ignore_index=True) | |
if __name__ == '__main__': | |
params = { | |
'start': '201905010000', | |
'end': datetime.utcnow().strftime(TIME_FORMAT), # 'end': '201905010100', | |
'vars': 'PM_25_concentration', | |
'bbox': '-112.5,40.4,-111.5,41', | |
'token': '<token>' | |
} | |
if CACHE and os.path.exists(CACHE): | |
df = pd.read_pickle(CACHE) | |
else: | |
df = get_batch(params) | |
df.to_pickle(CACHE) | |
bigquery_stream(df, GBQ_TABLE) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
google-cloud-bigquery[pandas,pyarrow] | |
pandas | |
requests |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment