Skip to content

Instantly share code, notes, and snippets.

@wholmgren
Created June 4, 2019 15:27
Show Gist options
  • Save wholmgren/6159ea6ed0a15012f2a3f899f99a69ab to your computer and use it in GitHub Desktop.
Save wholmgren/6159ea6ed0a15012f2a3f899f99a69ab to your computer and use it in GitHub Desktop.
proof of concept for combining solar forecast arbiter NWP processing with api requirements. don't use this code.
import datetime
from functools import partial
from pathlib import Path
import pandas as pd
from solarforecastarbiter import datamodel
from solarforecastarbiter.reference_forecasts import main, models
from solarforecastarbiter.io.fetch import nwp as fetch_nwp
from solarforecastarbiter.io import nwp
# find the files
base_path = Path(nwp.__file__).resolve().parents[0] / 'tests/data'
# define file loading function that knows where to find the files
load_forecast = partial(nwp.load_forecast, base_path=base_path)
site = datamodel.Site(
name='OASIS',
latitude=32.2,
longitude=-110.9,
elevation=700,
timezone='America/Phoenix'
)
forecast1 = datamodel.Forecast(
name='GFS',
issue_time_of_day=datetime.time(7),
lead_time_to_start=pd.Timedelta('0min'),
interval_length=pd.Timedelta('1h'),
run_length=pd.Timedelta('24hr'),
interval_label='beginning',
interval_value_type='interval_mean',
variable='ghi',
site=site,
extra_parameters={
'model': 'gfs_quarter_deg_to_hourly_mean',
'fetch_metadata': 'GFS_0P25_1HR'
}
)
forecast2 = datamodel.Forecast(
name='GFS',
issue_time_of_day=datetime.time(7),
lead_time_to_start=pd.Timedelta('0min'),
interval_length=pd.Timedelta('1h'),
run_length=pd.Timedelta('24hr'),
interval_label='beginning',
interval_value_type='interval_mean',
variable='air_temperature',
site=site,
extra_parameters={
'model': 'gfs_quarter_deg_to_hourly_mean',
'fetch_metadata': 'GFS_0P25_1HR'
}
)
forecast3 = datamodel.Forecast(
name='GFS',
issue_time_of_day=datetime.time(7),
lead_time_to_start=pd.Timedelta('0min'),
interval_length=pd.Timedelta('1h'),
run_length=pd.Timedelta('24hr'),
interval_label='beginning',
interval_value_type='interval_mean',
variable='ac_power',
site=site,
extra_parameters={
'model': 'gfs_quarter_deg_to_hourly_mean',
'fetch_metadata': 'GFS_0P25_1HR'
}
)
forecast4 = datamodel.Forecast(
name='GFS',
issue_time_of_day=datetime.time(7),
lead_time_to_start=pd.Timedelta('0min'),
interval_length=pd.Timedelta('1h'),
run_length=pd.Timedelta('48hr'),
interval_label='beginning',
interval_value_type='interval_mean',
variable='ghi',
site=site,
extra_parameters={
'model': 'gfs_quarter_deg_to_hourly_mean',
'fetch_metadata': 'GFS_0P25_1HR'
}
)
def groupby_forecasts(forecasts):
"""
Parameters
----------
forecasts : list of datamodel.Forecast
Returns
-------
grouped : dict
Keys are the Forecast objects to pass to
:py:func:`~solarforecastarbiter.reference_forecasts.main.run`.
Values are the Forecast objects for which to use processed data.
"""
index = [forecast.to_dict() for forecast in forecasts]
fx_series = pd.Series(forecasts, index=index)
# breakpoint()
grouped = fx_series.groupby(by=_forecast_by)
return grouped
def _forecast_by(forecast):
fx_dict = forecast.copy()#.to_dict()
fx_dict.pop('variable')
fx_dict.pop('forecast_id')
# remove dicts for hashability. in future pull out relevant info first
fx_dict.pop('extra_parameters')
fx_dict['site'].pop('extra_parameters')
fx_dict['site'] = tuple(fx_dict['site'].values())
vals = tuple(fx_dict.values())
return vals
def get_init_time(run_time, fetch_metadata):
"""Determine the most recent init time for which all forecast data is
available."""
run_finish = (pd.Timedelta(fetch_metadata['delay_to_first_forecast']) +
pd.Timedelta(fetch_metadata['avg_max_run_length']))
freq = fetch_metadata['update_freq']
init_time = (run_time - run_finish).floor(freq=freq)
return init_time
def run_reference_forecast(forecast, run_time, issue_time):
fetch_metadata = getattr(fetch_nwp,
forecast.extra_parameters['fetch_metadata'])
init_time = get_init_time(run_time, fetch_metadata)
forecast_start, forecast_end = main.get_forecast_start_end(forecast,
issue_time)
forecast_end -= pd.Timedelta('1s')
model = getattr(models, forecast.extra_parameters['model'])
# for testing
model = partial(model, load_forecast=load_forecast)
fx = main.run(forecast.site, model, init_time,
forecast_start, forecast_end)
# fx is tuple of ghi, dni, dhi, air_temperature, wind_speed, ac_power
# probably return for another function to post data to api
return fx
def post_forecast(session, forecast, forecast_values):
# need to determine how each series in forecast_values is mapped to
# observations. extra_parameters? dynamically? Assume extra_parameters...
obs = session.list_observations()
forecast.site
run_time = pd.Timestamp('20190515 0700Z')
issue_time = pd.Timestamp('20190515 0700Z')
forecasts = [forecast1, forecast2, forecast3, forecast4]
grouped = groupby_forecasts(forecasts)
for name, group in grouped:
print('new group:\n')
print(name, '\n', group)
print('\n')
key_fx = group.values[0]
for fx in group.values:
if fx.variable == 'ac_power':
key_fx = fx
print('key fx:\n', key_fx)
# not yet sure how run_time and issue_time interact with filtering above
fxs = run_reference_forecast(key_fx, run_time, issue_time)
variables = ('ghi', 'dni', 'dhi', 'air_temperature', 'wind_speed',
'ac_power')
for variable, fx in zip(variables, fxs):
for fx in group.values:
if fx.variable == variable:
print(f'posting {variable}')
print('\n\n')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment