Last active
November 8, 2017 02:04
-
-
Save jdleslie/d087bcbeffa9d1776b4b394e21c6818d to your computer and use it in GitHub Desktop.
Forecast-responsive control script for radiant floors
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import os | |
from datetime import datetime, timedelta | |
import requests | |
import xmltodict | |
import pytz | |
TZ = os.environ.get("RADIANT_TZ", "UTC") | |
LAT = float(os.environ.get("RADIANT_LAT")) | |
LON = float(os.environ.get("RADIANT_LON")) | |
WWSD_TEMP = int( os.environ.get("RADIANT_WWSD", 60)) | |
MIN_ON = int( os.environ.get("RADIANT_MIN_ON", 3)) | |
MIN_OFF = int( os.environ.get("RADIANT_MIN_OFF", 3)) | |
webrelay_url = "http://{}/stateFull.xml".format(os.environ.get("RADIANT_WEBRELAY_HOST", "boiler2")) | |
webrelay_target = os.environ.get("RADIANT_WEBRELAY_TARGET", "relay2state") | |
logging.basicConfig( | |
format='%(asctime)s %(levelname)-8s %(message)s', | |
level=logging.INFO, | |
datefmt='%Y-%m-%d %H:%M:%S') | |
# Returns a list of dictionaries like this | |
# | |
# {u'detailedForecast': u'', | |
# u'endTime': u'2017-10-03T15:00:00-04:00', | |
# u'icon': u'https://api.weather.gov/icons/land/day/skc?size=small', | |
# u'isDaytime': True, | |
# u'name': u'', | |
# u'number': 50, | |
# u'shortForecast': u'Sunny', | |
# u'startTime': u'2017-10-03T14:00:00-04:00', | |
# u'temperature': 64, | |
# u'temperatureTrend': None, | |
# u'temperatureUnit': u'F', | |
# u'windDirection': u'SSE', | |
# u'windSpeed': u'5 mph'}, | |
# | |
def get_forecast(lat, lon, hours=None): | |
url = "https://api.weather.gov/points/{:.3f},{:.3f}/forecast/hourly".format(lat, lon) | |
headers = { | |
"User-Agent": "boiler-control <[email protected]>", | |
"Accept": "application/ld+json", | |
} | |
response = requests.get(url, headers=headers) | |
return response.json()['periods'][:hours] | |
def is_radiant_running(): | |
response = requests.get(webrelay_url) | |
root = xmltodict.parse(response.content) | |
assert root["datavalues"]["units"] == "F" | |
return root["datavalues"][webrelay_target] == "1" | |
def get_demand_schedule(lat, lon, max_temp): | |
future_demand = [] | |
for period in get_forecast(lat, lon): | |
assert period["temperatureUnit"] == "F" | |
temp = period["temperature"] | |
# Adjust for solar gain | |
if period["isDaytime"]: | |
forecast = period["shortForecast"] | |
if forecast == "Sunny": temp += 7 | |
elif forecast == "Mostly Sunny": temp += 5 | |
elif forecast == "Clear": temp += 5 | |
# Adjust for wind speed | |
wind = int(period['windSpeed'].split()[0]) | |
if wind > 20: temp -= 7 | |
elif wind > 15: temp -= 5 | |
elif wind > 10: temp -= 3 | |
if temp <= max_temp: | |
# Adjust period index so zero is the current period | |
future_demand.append(period['number'] - 1) | |
# https://docs.python.org/2.6/library/itertools.html#examples | |
def group_ranges(data): | |
"Combine adjacent demand periods in to (start, end) tuples" | |
from operator import itemgetter | |
from itertools import groupby | |
ranges = [] | |
for k, g in groupby(enumerate(data), lambda (i, x):i - x): | |
group = map(itemgetter(1), g) | |
ranges.append((group[0], group[-1],)) | |
return ranges | |
def elide_demand(schedule, next_demand): | |
"Combine demand periods separated by three or fewer hours and eliminate trivial length periods" | |
if not isinstance(schedule, list): | |
schedule = [schedule] | |
result = None | |
prev_demand = schedule[-1] | |
if next_demand[0] - prev_demand[1] > MIN_OFF: | |
if next_demand[1] - next_demand[0] > MIN_ON: | |
result = schedule + [next_demand] | |
else: | |
result = schedule | |
else: | |
result = schedule[:-1] + [ (prev_demand[0], next_demand[1],) ] | |
return result | |
result = reduce(elide_demand, group_ranges(future_demand)) or [] | |
if not any(isinstance(e, list) for e in result): | |
result = [result] | |
return result | |
def is_radiant_needed(demand_start, demand_end): | |
result = False | |
if demand_start <= 2: | |
# Demand is within three hours | |
if is_radiant_running(): | |
if demand_end >= 2: | |
# End of current cycle is more than two hours away | |
result = True | |
else: | |
if demand_end - demand_start >= 3: | |
# Start next cycle if demand will last more than 3 hours | |
result = True | |
return result | |
def set_radiant_state(run): | |
state = { webrelay_target: run and 1 or 0 } | |
requests.get(webrelay_url, params=state).raise_for_status() | |
def log_state(radiant_state, schedule): | |
schedule_descr = [] | |
now = datetime.now(pytz.utc).astimezone(pytz.timezone(TZ)) | |
for demand_start, demand_end in schedule: | |
start_date = now + timedelta(hours=demand_start) | |
length = demand_end - demand_start | |
hours_label = length > 1 and "hours" or "hour" | |
if demand_start == 0: | |
descr = "Now for {} {}".format(length, hours_label) | |
else: | |
descr = "{:%a at %-I %p} for {} {}".format( | |
start_date, | |
length, hours_label) | |
schedule_descr.append(descr) | |
if not len(schedule_descr): | |
schedule_descr = ["No demand"] | |
logging.info("Radiant is {}, schedule: {}".format( | |
radiant_state and "on" or "off", | |
", ".join(schedule_descr)) ) | |
def main(): | |
import time | |
from random import randint | |
while True: | |
schedule = get_demand_schedule(LAT, LON, WWSD_TEMP) | |
should_run = False | |
if len(schedule): | |
should_run = is_radiant_needed(*schedule[0]) | |
set_radiant_state(should_run) | |
log_state(should_run, schedule) | |
time.sleep(3600 + randint(-30, 30)) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment