Skip to content

Instantly share code, notes, and snippets.

@steveh
Created October 26, 2016 22:39
Show Gist options
  • Save steveh/b44b53b566fd3c1e48b55e82ad88ce6a to your computer and use it in GitHub Desktop.
Save steveh/b44b53b566fd3c1e48b55e82ad88ce6a to your computer and use it in GitHub Desktop.
import csv
import datetime
import serial
import time
import urllib
import math
from time import strftime
from sense_hat import SenseHat
from decimal import Decimal
from influxdb import InfluxDBClient
# get the POC from http://www.ea.govt.nz/consumers/your-power-data-in-your-hands/my-meter/
def getGenerationPrice( time, poc ):
params = {
"INchoice": "SEL",
"INdate": strftime("%d/%m/%Y"),
"INgip": poc,
"INperiodfrom": "1",
"INperiodto": "50",
"INtype": "Price",
}
url = "https://www.electricityinfo.co.nz/comitFta/five_min_prices.download?" + urllib.urlencode(params)
response = urllib.urlopen(url)
reader = csv.DictReader(response)
row = reader.next()
response.close()
price = float(row["Price"])
return price / 10 # convert $/MWh to c/kWh
# http://www.flickelectric.co.nz/downloads/price_schedules_Apr2016/Wellington-Price-Schedule-2016-04-01-v2.pdf
# Inclusive : Standard : Smart Plan Option
def getVariablePrice( time ):
# night rate between 11pm and 7am
if (time.tm_hour >= 23) or (time.tm_hour < 7):
return 3.343 - 1.5
else:
return 6.603 - 1.5
def getElectricityPrice( poc ):
now = time.localtime()
gst_factor = 1.15
generation_price = getGenerationPrice(now, poc)
variable_price = getVariablePrice(now)
total_price = (generation_price + variable_price) * gst_factor
return Decimal(total_price).quantize(Decimal("1.00"))
def byte2int(bstr, width=32):
"""
Convert a byte string into a signed integer value of specified width.
"""
val = sum(ord(b) << 8*n for (n, b) in enumerate(reversed(bstr)))
if val >= (1 << (width - 1)):
val = val - (1 << width)
return val
def getConsumption(port, index):
ser = serial.Serial()
ser.port=port
ser.baudrate=9600
ser.open()
ser.write('\xaa\x02\x00\xad')
time.sleep(1)
output = ser.read(ser.inWaiting())
if output == "":
exit()
date = int(datetime.datetime.now().strftime("%s"))
data = [ date ]
numb = 8
for y in range(0, 15):
value = int(byte2int(output[(y*numb)+5:(y*numb)+6]) + (byte2int(output[(y*numb)+6:(y*numb)+7])*256))
if value > 12000:
exit()
data.append(value)
ser.close()
m = map(str, data)
print m
consumption = m[index]
return Decimal(consumption).quantize(Decimal("1.000"))
def getTemperature(sense):
return Decimal(sense.get_temperature()).quantize(Decimal("1.00"))
def getSeptile(value, median, deviation):
candidates = [
median - (deviation * 3),
median - (deviation * 2),
median - (deviation * 1),
median,
median + (deviation * 1),
median + (deviation * 2),
median + (deviation * 3),
]
# print candidates
#print value
candidates.reverse()
for idx, candidate in enumerate(candidates):
if value > candidate:
return len(candidates) - idx
return 1
blank = [0, 0, 0]
yellow = [255, 230, 0]
red = [230, 0, 0]
blue = [0, 0, 230]
blankLine = [blank] * 8
colors = [
[39, 174, 96],
[46, 204, 113],
[241, 196, 15],
[243, 156, 18],
[230, 126, 34],
[211, 84, 0],
[192, 57, 43],
]
def getColorLine(num):
return colors[0:num] + ([blank] * (7 - num))
def getLine(color, num):
return [color] + getColorLine(num)
sense = SenseHat()
price = getElectricityPrice("CPK0111")
temperature = getTemperature(sense)
consumption = getConsumption("/dev/ttyUSB0", 3)
cost = price * consumption/1000
priceSeptile = getSeptile(price, 14.0, 1.0)
temperatureSeptile = getSeptile(temperature, 21.0, 3.0)
consumptionSeptile = getSeptile(consumption, 1000, 333)
print "price: %0.2f c/kWh" % (price)
print "temperature: %0.2f C" % (temperature)
print "consumption: %0.2f W" % (consumption)
print "cost: %0.2f c" % (cost)
totalCells = 32
priceCells = int(math.ceil(price))
if priceCells > totalCells:
priceCells = totalCells
consumptionCells = int(math.ceil(consumption / 100))
if consumptionCells > totalCells:
consumptionCells = totalCells
costCells = int(math.ceil(cost))
if costCells > totalCells:
costCells = totalCells
cells = []
cells += [yellow] * costCells
cells += [blank] * (totalCells - costCells)
cells += [blank] * 32
#cells += [yellow] * priceCells
#cells += [blank] * (totalCells - priceCells)
#cells += [red] * consumptionCells
#cells += [blank] * (totalCells - consumptionCells)
#inst = "%0.1f" % (price * (consumption/1000))
#inst = "%0.0f" % (price * (consumption/1000))
sense.clear()
sense.low_light = True
sense.set_pixels(cells)
#sense.show_message(inst)
#sense.show_letter(inst)
json_body = [
{
"measurement": "electricity_consumption",
"tags": {
"meter": "total_house"
},
"fields": { "value": consumption }
}, {
"measurement": "electricity_price",
"tags": {
"meter": "net"
},
"fields": { "value": price }
}
]
print json_body
client = InfluxDBClient('10.16.64.40', 8086, "", "", "tanera")
client.write_points(json_body)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment