Skip to content

Instantly share code, notes, and snippets.

@benfasoli
Created June 24, 2020 20:16
Show Gist options
  • Save benfasoli/ed84258db1450dd262801d39a512be76 to your computer and use it in GitHub Desktop.
Save benfasoli/ed84258db1450dd262801d39a512be76 to your computer and use it in GitHub Desktop.
Bulk download from MesoWest API and use BigQuery streaming API to load into table
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from datetime import datetime, timedelta
from multiprocessing import cpu_count, Pool
import os
from typing import List
from google.cloud import bigquery
import pandas as pd
import requests
API_URL = 'https://api.synopticdata.com/v2/stations/timeseries'
CACHE = 'cache.p'
GBQ_TABLE = 'gsv-aq-x.external.0_native'
TIME_FORMAT = '%Y%m%d%H%M'
client = bigquery.Client()
def bigquery_stream(payload: pd.DataFrame, table: str):
"""Insert dataframe into table using BigQuery streaming API
Args:
payload (pd.DataFrame): rows to insert
table (str): target table name, passed to `client.get_table` to
retrieve existing table metadata
Raises:
AssertionError:
"""
table = client.get_table(table)
payload = payload.copy()
payload.time = payload.time.astype(int) / 10**9
payload = payload.where(pd.notnull(payload), None)
payload = payload.to_dict(orient='records')
n_rows = len(payload)
n_rows_per_page = 1000
n_pages = int(n_rows / n_rows_per_page) + 1
breaks = [x * n_rows_per_page for x in range(n_pages)] + [n_rows]
pages = [payload[breaks[i]:breaks[i+1]] for i in range(n_pages)]
for page in pages:
errors = client.insert_rows(table, page)
assert errors == [], 'streaming API rejected insert'
def paginate_request(params: dict) -> List[dict]:
"""Split query params into daily time ranges to avoid API quotas
Args:
params (dict): query url params passed to API, containing at least
`start` and `end` keys
Returns:
List[dict]: query params paginated by day
"""
t1 = datetime.strptime(params['start'], TIME_FORMAT)
t2 = datetime.strptime(params['end'], TIME_FORMAT)
dt = t2 - t1
if dt < timedelta(days=6):
return [params]
breaks = []
for i in range(dt.days):
breaks.append({
**params,
'start': (t1 + timedelta(days=i)).strftime(TIME_FORMAT),
'end': (t1 + timedelta(days=i+1)).strftime(TIME_FORMAT),
})
return breaks
def get_page(params: dict) -> pd.DataFrame:
"""Request data from MesoWest API
Args:
params (dict): query url params passed to API, containing at least
`start` and `end` keys. See example in __main__ section.
Returns:
pd.DataFrame: concatenated result rows with columns for x (longitude),
y (latitude), stid (unique station identifier), stid_name (human
readable station name), time, and pm25_ugm3 (mass concentration of
PM2.5).
"""
res = requests.get(API_URL, params=params).json()
if res['SUMMARY']['RESPONSE_CODE'] != 1:
raise requests.RequestException(res['SUMMARY']['RESPONSE_MESSAGE'])
df_list = []
for x in res['STATION']:
if x['QC_FLAGGED']:
continue
try:
pm25_ugm3 = x['OBSERVATIONS']['PM_25_concentration_set_1']
except KeyError:
continue
try:
pm25_ugm3_alt = x['OBSERVATIONS']['PM_25_concentration_set_2']
if len(pm25_ugm3_alt) is not len(pm25_ugm3):
raise ValueError('Dual response sets of different length')
for i, y in enumerate(pm25_ugm3_alt):
# validate difference between multiple returned values is
# smaller than threshold
dy = pm25_ugm3[i] - y
if abs(dy) > 1:
pm25_ugm3[i] = None
else:
pm25_ugm3[i] = (pm25_ugm3[i] + y) / 2
except KeyError:
pass
except (TypeError, ValueError):
continue
df_list.append(
pd.DataFrame({
'x': x['LONGITUDE'],
'y': x['LATITUDE'],
'stid': x['STID'],
'stid_name': x['NAME'],
'time': x['OBSERVATIONS']['date_time'],
'pm25_ugm3': pm25_ugm3
})
)
df = pd.concat(df_list, ignore_index=True)
df = df.dropna()
df.time = pd.to_datetime(df.time)
return df
def get_batch(params: dict) -> pd.DataFrame:
"""Paginate params and execute parallel requests
Args:
params (dict): query url params passed to API, containing at least
`start` and `end` keys. See example in __main__ section.
Returns:
pd.DataFrame: concatenated result rows with columns for x (longitude),
y (latitude), stid (unique station identifier), stid_name (human
readable station name), time, and pm25_ugm3 (mass concentration of
PM2.5).
"""
page_params = paginate_request(params)
n_pages = len(page_params)
n_cpus = cpu_count()
p = Pool(min(n_cpus, n_pages))
result = p.map(get_page, page_params)
# if result too large to fit in memory, call bigquery_stream from get_page
# rather than concatenating results
return pd.concat(result, ignore_index=True)
if __name__ == '__main__':
params = {
'start': '201905010000',
'end': datetime.utcnow().strftime(TIME_FORMAT), # 'end': '201905010100',
'vars': 'PM_25_concentration',
'bbox': '-112.5,40.4,-111.5,41',
'token': '<token>'
}
if CACHE and os.path.exists(CACHE):
df = pd.read_pickle(CACHE)
else:
df = get_batch(params)
df.to_pickle(CACHE)
bigquery_stream(df, GBQ_TABLE)
google-cloud-bigquery[pandas,pyarrow]
pandas
requests
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment