Created
January 20, 2022 18:59
-
-
Save NelsonMinar/37208cc6e33e76a6ee63cf0a9d0b4b71 to your computer and use it in GitHub Desktop.
Solar monitoring setup: PVS6, PGE, forecast.solar to Influx
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Data imports for a custom solar monitoring system I build using Sunpower's PVS6, Telegraf, Influx, and Grafana. | |
See http://nelsonslog.wordpress.com/ for details | |
This code is provided without license or support. If you do something interested and related let me know! | |
[email protected] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
"""Get a solar forecast from forecast.solar and emit it in Influx format | |
Nelson Minar <[email protected]> | |
""" | |
import json, sys, urllib.request | |
import pendulum | |
forecast_url = 'https://api.forecast.solar/estimate/watts/39.2/-121.1/35/70/11.5' | |
def parse(fp): | |
data = json.load(fp) | |
tz_name = data['message']['info']['timezone'] | |
for date, watts in data.get('result', {}).items(): | |
dt = pendulum.parse(date, tz=tz_name) | |
ts = dt.int_timestamp * 1_000_000_000 | |
kw = watts / 1000.0 | |
print(f'forecast_dot_solar kw={kw:.4} {ts}') | |
if __name__ == '__main__': | |
if len(sys.argv) == 1: | |
parse(urllib.request.urlopen(forecast_url, timeout=120)) | |
for fn in sys.argv[1:]: | |
parse(open(fn)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
"""Import CSV Green Button usage files from PG&E and emit them in Influx format | |
Nelson Minar <[email protected]> | |
""" | |
import csv, sys, gzip | |
import pendulum | |
def parse(fn: str): | |
fp = gzip.open(fn, mode='rt', encoding='utf8') if fn.endswith('.gz') else open(fn) | |
for _ in range(5): | |
fp.readline() | |
rows = csv.DictReader(fp) | |
for row in rows: | |
assert row['TYPE'] == 'Electric usage' | |
assert row['UNITS'] == 'kWh' | |
dt = pendulum.parse(f'{row["DATE"]} {row["END TIME"]}', tz='America/Los_Angeles') | |
ts = dt.int_timestamp * 1_000_000_000 | |
kwh = row["USAGE"] | |
cost = float(row["COST"].replace('$', '')) | |
print(f'pge kwh={kwh},cost={cost:.4} {ts}') | |
if __name__ == '__main__': | |
for fn in sys.argv[1:]: | |
parse(fn) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
"""Parse status data from the Sunpower PVS6 and turn it into Influx format, for use with Telegraf. | |
PVS6 data comes from its endpoint /cgi-bin/dl_cgi?Command=DeviceList' | |
The PVS6 itself can be the server for this, or a Raspberry Pi or something running as a proxy. | |
Output contains lots of data from the Sunpower mointors. | |
Computer status, production and consumption wattages etc (as measured by CT sensors), | |
also power data from each individual inverter. | |
Also produces some summary statistics rolling up info about all the inverters. | |
By Nelson Minar <[email protected]>. | |
I have no intent to support this or turn it into a resuable product. | |
""" | |
import json, sys, os.path, re, statistics, urllib.request, time | |
_PVS6_URL = 'http://192.168.3.140/cgi-bin/dl_cgi?Command=DeviceList' | |
output = [] | |
def extract_fields_tags(record, field_keys, tag_keys, ignore_keys): | |
"""Given a JSON blob, return three dicts. | |
One containing all keys in field_keys, one for tag_keys, and one for things not in fields, tags, or ignore.""" | |
field_keys = field_keys.split(',') | |
tag_keys = tag_keys.split(',') | |
ignore_keys = ignore_keys.split(',') | |
fields = {k: v for k, v in record.items() if k in field_keys} | |
tags = {k: v for k, v in record.items() if k in tag_keys} | |
others = {k: v for k, v in record.items() if not (k in (field_keys + tag_keys + ignore_keys))} | |
return fields, tags, others | |
_strip_whitespace = re.compile(r'\s+') | |
def _s(s): | |
"Remove all whitespace from string" | |
return _strip_whitespace.sub("_", s) | |
def _v(v): | |
"Turn the input into a string for influx format. Quotes strings, converts numbers" | |
try: | |
return float(v) | |
except ValueError: | |
pass | |
return f'"{v}"' | |
# Some rollup data populated by the parsing system | |
summary = {} | |
inverter_kws = [] | |
inverter_temps = [] | |
def init_summary(): | |
global inverter_kws, inverter_temps, summary | |
inverter_kws = [] | |
inverter_temps = [] | |
summary = { | |
'count_not_working': 0, | |
'count_not_working_inverters': 0, | |
'avg_inverter_kw': 0, | |
'sd_inverter_kw': 0, | |
'count_inverter_low_kw': 0, | |
'avg_inverter_temp': 0, | |
'sd_inverter_temp': 0, | |
} | |
def create_summary(): | |
summary['avg_inverter_kw'] = statistics.fmean(inverter_kws) | |
summary['sd_inverter_kw'] = statistics.pstdev(inverter_kws) | |
summary['avg_inverter_temp'] = statistics.fmean(inverter_temps) | |
summary['sd_inverter_temp'] = statistics.pstdev(inverter_temps) | |
# Count "low" inverters | |
mean_kw = summary['avg_inverter_kw'] | |
if mean_kw < 0.02: # if no power is being generated then skip this check | |
low = 0 | |
else: | |
# low means less than 80% of average | |
low_factor = 0.8 | |
low = sum([1 for x in inverter_kws if x < low_factor * summary['avg_inverter_kw']]) | |
summary['count_inverter_low_kw'] = low | |
return "pvs6_summary", summary, {}, {} | |
def parse(data, timestamp): | |
output = [] | |
skipped = 0 | |
init_summary() | |
# Parse every record in the JSON input | |
for record in data['devices']: | |
device_type = record.get("DEVICE_TYPE") | |
if device_type == 'Power Meter' and record.get('TYPE') == 'PVS5-METER-P': | |
output.append(parse_production(record)) | |
elif device_type == 'Power Meter' and record.get('TYPE') == 'PVS5-METER-C': | |
output.append(parse_consumption(record)) | |
elif device_type == 'Inverter': | |
output.append(parse_inverter(record)) | |
elif device_type == 'PVS': | |
output.append(parse_pvs(record)) | |
else: | |
skipped += 1 | |
if skipped > 0: | |
sys.stderr.write(f'Skipped {skipped} records of unknown type.\n') | |
# Make a rollup from the collected data | |
output.append(create_summary()) | |
# Emit the records in Influx format | |
for measurement, fields, tags, others in output: | |
if len(others) != 0: | |
sys.stderr.write(f'Ignoring {len(others)} unknown data items in {measurement}: {others}\n') | |
# Strip whitespace from tag names and values and format as k=v,k=v lists for Influx | |
tag_set = ','.join(f"{_s(k)}={_s(v)}" for k, v in tags.items()) | |
field_set = ','.join(f"{_s(k)}={_v(v)}" for k, v in fields.items()) | |
# Format timestamp into nanoseconds | |
ts = int(1_000_000_000 * timestamp) | |
# Only print this delimeter if there are tags | |
d = ',' if len(tag_set) > 0 else '' | |
# Emit the Influx record | |
print(f'{measurement}{d}{tag_set} {field_set} {ts}') | |
def parse_pvs(record): | |
fields, tags, others = extract_fields_tags( | |
record, | |
"STATE,SWVER,dl_comm_err,dl_cpu_load,dl_err_count,dl_flash_avail,dl_mem_used,dl_scan_time,dl_skipped_scans,dl_untransmitted,dl_uptime", | |
"SERIAL", "CURTIME,DATATIME,DETAIL,DEVICE_TYPE,HWVER,MODEL,STATEDESCR,panid") | |
if fields['STATE'] != 'working': | |
summary['count_not_working'] += 1 | |
return "pvs6_computer", fields, tags, others | |
def parse_production(record): | |
fields, tags, others = extract_fields_tags( | |
record, "STATE,freq_hz,net_ltea_3phsum_kwh,p_3phsum_kw,q_3phsum_kvar,s_3phsum_kva,tot_pf_rto", "SERIAL", | |
"CAL0,CURTIME,DATATIME,DESCR,DEVICE_TYPE,ISDETAIL,MODEL,OPERATION,PORT,STATEDESCR,SWVER,TYPE,ct_scl_fctr,interface,origin,subtype" | |
) | |
if fields['STATE'] != 'working': | |
summary['count_not_working'] += 1 | |
return "pvs6_production", fields, tags, others | |
def parse_consumption(record): | |
fields, tags, others = extract_fields_tags( | |
record, | |
"STATE,freq_hz,i1_a,i2_a,neg_ltea_3phsum_kwh,net_ltea_3phsum_kwh,p1_kw,p2_kw,p_3phsum_kw,pos_ltea_3phsum_kwh,q_3phsum_kvar,s_3phsum_kva,tot_pf_rto,v12_v,v1n_v,v2n_v", | |
"SERIAL", | |
"CAL0,CURTIME,DATATIME,DESCR,DEVICE_TYPE,ISDETAIL,MODEL,OPERATION,PORT,STATEDESCR,SWVER,TYPE,ct_scl_fctr,interface,origin,subtype" | |
) | |
if fields['STATE'] != 'working': | |
summary['count_not_working'] += 1 | |
return "pvs6_consumption", fields, tags, others | |
def parse_inverter(record): | |
fields, tags, others = extract_fields_tags( | |
record, | |
"STATE,freq_hz,i_3phsum_a,i_mppt1_a,ltea_3phsum_kwh,p_3phsum_kw,p_mppt1_kw,stat_ind,t_htsnk_degc,v_mppt1_v,vln_3phavg_v", | |
"SERIAL", | |
"CURTIME,DATATIME,DESCR,DEVICE_TYPE,ISDETAIL,MODEL,MOD_SN,NMPLT_SKU,OPERATION,PANEL,PORT,STATEDESCR,SWVER,TYPE,hw_version,interface,origin" | |
) | |
if fields['STATE'] != 'working': | |
summary['count_not_working'] += 1 | |
summary['count_not_working_inverters'] += 1 | |
inverter_kws.append(float(fields['p_3phsum_kw'])) | |
inverter_temps.append(float(fields['t_htsnk_degc'])) | |
return "pvs6_inverter", fields, tags, others | |
def load_from_file(fn): | |
ts = os.path.getmtime(fn) | |
try: | |
data = json.load(open(fn)) | |
except: | |
sys.stderr.write(f'Error parsing JSON file {fn}, skipping.\n') | |
return | |
parse(data, ts) | |
def load_from_url(url): | |
# PVS6 often takes 10-15 seconds to respond | |
fp = urllib.request.urlopen(url, timeout=50) | |
data = json.load(fp) | |
parse(data, time.time()) | |
if __name__ == '__main__': | |
if len(sys.argv) > 1: | |
for fn in sys.argv[1:]: | |
load_from_file(fn) | |
else: | |
load_from_url(_PVS6_URL) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment