Last active
October 22, 2018 13:03
-
-
Save temoto/72cb25f924ef0e7c52a360ab600fe822 to your computer and use it in GitHub Desktop.
Automate lights color temperature via MQTT enabled dimmer (like Sonoff). Temporary location, will be moved to full repository.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python2 | |
import os, sys, time | |
import astral, datetime | |
import paho.mqtt.client | |
# EDIT HERE TO CONFIGURE + run with env `mqtt_broker=addr:port mqtt_topic=cmnd/house/ct script.py` | |
config = 'midnight scale_down sunrise 0% noon scale_up sunset 100%' | |
lat, lng = (58.0, 56.316) # Perm | |
def utc_to_local(utc): | |
epoch = time.mktime(utc.timetuple()) | |
offset = datetime.datetime.fromtimestamp(epoch) - datetime.datetime.utcfromtimestamp(epoch) | |
return utc + offset | |
def sun_minutes(): | |
today = datetime.date.today() | |
a = astral.Astral() | |
# a.solar_depression = 'civil' | |
a.solar_depression = 'nautical' | |
# a.solar_depression = 'astronomic' | |
dt_minute = lambda dt: dt.hour*60 + dt.minute | |
sunrise = dt_minute(utc_to_local(a.sunrise_utc(today, lat, lng))) | |
sunset = dt_minute(utc_to_local(a.sunset_utc(today, lat, lng))) | |
return sunrise, sunset | |
def scale(v, vrange, target, reverse=False): | |
vlo, vhi = vrange | |
assert vlo <= v <= vhi, "config error. fail {}<={}<={}".format(vlo, v, vhi) | |
vk = float(v-vlo) / (vhi-vlo) | |
assert 0 <= vk <= 1, "code bug. fail vk=[0;1] locals={}".format(locals()) | |
if reverse: | |
vk = 1 - vk | |
tlo, thi = target | |
t = int(vk * (thi-tlo)) + tlo | |
assert tlo <= t <= thi, "code bug. fail {}<={}<={} locals={}".format(tlo, t, thi, locals()) | |
return t | |
def act(action, v, vrange, target): | |
if isinstance(action, (float, int)): | |
return action | |
elif action == 'scale_up': | |
return scale(v, vrange, target, reverse=False) | |
elif action == 'scale_down': | |
return scale(v, vrange, target, reverse=True) | |
elif action.endswith('%'): | |
return scale(int(action[:-1]), (0, 100), target) | |
raise Exception("code bug. invalid action={} locals={}".format(act, locals())) | |
def color_temp(config_text, minute, sun_range): | |
target = (153, 500) | |
sunrise, sunset = sun_range | |
mid_day = sunrise + int((sunset - sunrise) / 2) | |
mid_night = (sunrise + sunset) % 1440 | |
time_name_map = { | |
'sunrise': sunrise, | |
'sunset': sunset, | |
'night': mid_night, | |
'midnight': mid_night, | |
'mid-night': mid_night, | |
'noon': mid_day, | |
'midday': mid_day, | |
'mid-day': mid_day, | |
} | |
def parse_config_time(s): | |
named = time_name_map.get(s) | |
if named is not None: | |
return named | |
if s.beginswith('T'): | |
h, m = map(int, s[1:].split(':', 1)) | |
return h*60 + m | |
raise Exception("config error. invalid time={}".format(s)) | |
# parse config text | |
words = config_text.split() | |
assert len(words)%2 == 0, "config error, word count must be even (x%2==0) text: {}".format(config_text) | |
times = tuple(map(parse_config_time, words[0::2])) | |
actions = tuple(words[1::2]) | |
for a in actions: | |
assert isinstance(a, (float, int)) or (a in ('scale_up', 'scale_down')) or a.endswith('%') | |
time_low = times[0] | |
time_high = times[-1] | |
# handle over-night wrap | |
if minute < time_low: | |
return act(actions[-1], minute+1440, (time_high, time_low+1440), target) | |
elif minute > time_high: | |
return act(actions[-1], minute, (time_high, time_low+1440), target) | |
# all other values for `minute` inside `config` ranges | |
for i, lo in enumerate(times): | |
hi = times[i+1] | |
if lo <= minute < hi: | |
return act(actions[i], minute, (lo, hi), target) | |
raise Exception("likely config error. range not found. minute={} locals={}".format(minute, locals())) | |
def parse_net_addr(s): | |
parts = s.rsplit(':', 1) | |
addr = parts[0] | |
port = int(parts[1]) | |
return (addr, port) | |
def main(): | |
local = time.localtime() | |
minute = local.tm_hour * 60 + local.tm_min | |
sunrise, sunset = sun_minutes() | |
ct = color_temp(config, minute, (sunrise, sunset)) | |
sys.stderr.write("minute={minute} sunrise={sunrise} sunset={sunset} ct={ct}\n".format(**locals())) | |
if ct is None: | |
sys.stderr.write("wrong interval\n") | |
sys.exit(1) | |
mqtt_addr = parse_net_addr(os.environ.get('mqtt_broker', 'localhost:1883')) | |
mqtt_topic = os.environ.get('mqtt_topic') | |
if not mqtt_topic: | |
sys.stderr.write("must define mqtt_topic environment\n") | |
sys.exit(1) | |
mqtt = paho.mqtt.client.Client("color-temp-controller-v0") | |
mqtt.connect(*mqtt_addr) | |
mqtt.publish(mqtt_topic, ct, retain=True) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment