Created
March 1, 2018 03:26
-
-
Save ashleysommer/2e11f232abc5509243ea408d5a33dbc0 to your computer and use it in GitHub Desktop.
Ginlong wifi payload decode and push
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import struct | |
import asyncio | |
import aiohttp | |
import json | |
import time | |
import os | |
import glob | |
import bz2 | |
import tarfile | |
from os import path, mkdir | |
from asyncio import StreamReader, StreamWriter | |
from collections import OrderedDict | |
from datetime import datetime, timedelta | |
_module = sys.modules[__name__] | |
_module.CONSTS = CONSTS = dict() | |
CONSTS['LOGFILE_DIRECTORY'] = LOGFILE_DIRECTORY = "./output" | |
CONSTS['PVOUTPUT_ENDPOINT'] = PVOUTPUT_ENDPOINT = "https://pvoutput.org/service/r2/addstatus.jsp" | |
CONSTS['HEADER_SIZE'] = HEADER_SIZE = 12 | |
CONSTS['TAIL_SIZE'] = TAIL_SIZE = 2 | |
CONSTS['SENSOR_VARIABLES'] = SENSOR_VARIABLES = OrderedDict([ | |
("temp", (2, b'>H', 0.1, float)), | |
("vpv1", (2, b'>H', 0.1, float)), | |
("vpv2", (2, b'>H', 0.1, float)), | |
("unknown1", (2, b'>H', 1, int)), | |
("ipv1", (2, b'>H', 0.1, float)), | |
("ipv2", (2, b'>H', 0.1, float)), | |
("unknown2", (2, b'>H', 1, int)), | |
("iac1", (2, b'>H', 0.1, float)), | |
("iac2", (2, b'>H', 0.1, float)), | |
("iac3", (2, b'>H', 0.1, float)), | |
("vac1", (2, b'>H', 0.1, float)), | |
("vac2", (2, b'>H', 0.1, float)), | |
("vac3", (2, b'>H', 0.1, float)), | |
("fac", (2, b'>H', 0.01, float)), | |
("pac1", (2, b'>H', 1, int)), | |
("pac2", (2, b'>H', 1, int)), | |
("pac3", (2, b'>H', 1, int)), | |
("unknown3", (2, b'>H', 1, int)), | |
("unknown4", (2, b'>H', 1, int)), | |
("e_today", (2, b'>H', 0.01, float)), | |
("e_total", (4, b'>I', 0.1, int)), | |
("h_total", (4, b'>I', 1, int)), | |
("model", (2, b'>H', 1, int)) | |
]) | |
sample = (0x68, 0x7c, 0x51, 0xb0, 0xe2, 0xe2, 0x8a, 0x25, 0xe2, 0xe2, 0x8a, 0x25, 0x81, 0x03, 0x05, 0x30, 0x30, 0x30, 0x41, 0x43, 0x30, 0x30, 0x31, 0x37, 0x36, 0x32, 0x31, 0x36, 0x35, 0x33, 0x20, 0x00, 0xea, 0x0b, 0x01, 0x08, 0x89, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x00, 0x09, 0x55, 0x00, 0x00, 0x00, 0x00, 0x13, 0x90, 0x00, 0xbf, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x0b, 0x0e, 0x0c, 0x62, 0x00, 0x00, 0x02, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0xac, 0x04, 0x04, 0x00, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x27, 0x00, 0x00, 0x00, 0xbf, 0x00, 0x00, 0xbf, 0x00, 0x00, 0x00, 0x00, 0x55, 0x09, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0x96, 0x16, 0x68, 0x29, 0x51, 0xb1, 0xe2, 0xe2, 0x8a, 0x25, 0xe2, 0xe2, 0x8a, 0x25, 0x80, 0x01, 0x48, 0x34, 0x2e, 0x30, 0x31, 0x2e, 0x35, 0x31, 0x59, 0x34, 0x2e, 0x30, 0x2e, 0x30, 0x32, 0x57, 0x31, 0x2e, 0x30, 0x2e, 0x35, 0x31, 0x28, 0x47, 0x4c, 0x31, 0x36, 0x2d, 0x30, 0x35, 0x2d, 0x30, 0x36, 0x31, 0x2d, 0x44, 0x29, 0x60, 0x00, 0x98, 0x16) | |
def first(iterable): | |
"""Util function""" | |
assert isinstance(iterable, (bytes, str, set, frozenset, list, tuple)) | |
return next(iter(iterable)) | |
# https://stackoverflow.com/a/1160227 | |
def touch(fname, mode=0o666, **kwargs): | |
"""ensure the file exists""" | |
flags = os.O_CREAT | os.O_APPEND | |
with os.fdopen(os.open(fname, flags=flags, mode=mode)) as f: | |
os.utime(f.fileno() if os.utime in os.supports_fd else fname, **kwargs) | |
def zip_log_files(logfile_dir: str, at=7): | |
csv_glob_str = "/".join([logfile_dir, "*.csv"]) | |
globs = glob.glob(csv_glob_str) | |
if len(globs) < 1: | |
print("found no log files. Not zipping.") | |
return | |
elif len(globs) < int(at): | |
print("found less than {} log files. Not zipping.".format(at)) | |
return | |
globs = sorted(globs) # this will hopefully sort them by filename alphabetically. | |
if len(globs) == 1: | |
base_zip_name = path.join(logfile_dir, ".".join([path.basename(globs[0]), "bz2"])) | |
do_tar = False | |
else: | |
top_name = path.basename(globs[0]) | |
bottom_name = path.basename(globs[-1]) | |
base_zip_name = path.join(logfile_dir, ".".join([top_name, "to", bottom_name, "tar.bz2"])) | |
do_tar = True | |
zip_name = base_zip_name | |
append_num = 0 | |
while path.exists(zip_name): | |
append_num += 1 | |
if append_num == 10: | |
raise RuntimeError("Something weird happened. Made too many log files for the same period.") | |
zip_name = ".".join([base_zip_name, str(append_num)]) | |
do_delete = False | |
if do_tar: | |
try: | |
with tarfile.open(zip_name, "w:bz2") as zipfile: # type: tarfile.TarFile | |
_ = [zipfile.add(g, arcname=path.basename(g), recursive=False) for g in globs] | |
assert path.isfile(zip_name), "our new tar.bz2 file could not be found!" | |
do_delete = True | |
except Exception as e: | |
print(e) | |
do_delete = False | |
else: | |
try: | |
with open(globs[0], mode="rb") as single_logfile: | |
with bz2.open(zip_name, mode="wb") as zipfile: # type: bz2.BZ2File | |
zipfile.write(single_logfile.read()) | |
assert path.isfile(zip_name), "our new .bz2 file could not be found!" | |
do_delete = True | |
except Exception as e: | |
print(e) | |
do_delete = False | |
if do_delete: | |
try: | |
for g in globs: | |
os.remove(g) | |
except Exception as e: | |
print(e) | |
def write_csv_headers(logfile_path): | |
headers_to_use = ['time'] | |
for k in SENSOR_VARIABLES: | |
if "unknown" in k: | |
continue | |
elif k == "h_total" or k == "model": | |
continue | |
elif k == "pac2" or k == "pac3" or k == "vac2" or k == "vac3": | |
continue | |
headers_to_use.append(k) | |
line = ",".join(headers_to_use) | |
with open(logfile_path, "w") as logfile: | |
logfile.write(line) | |
logfile.write("\n") | |
def write_csv_values(received_datetime, logfile_path, values_dict): | |
vars_to_use = list() | |
for k in SENSOR_VARIABLES: | |
if "unknown" in k: | |
continue | |
elif k == "h_total" or k == "model": | |
continue | |
elif k == "pac2" or k == "pac3" or k == "vac2" or k == "vac3": | |
continue | |
vars_to_use.append(k) | |
time_str = received_datetime.strftime("%H:%M:%S") | |
vals = list() | |
for k in vars_to_use: | |
try: | |
val = values_dict[k] | |
if isinstance(val, float): | |
val = "{0:.3f}".format(val) | |
val = val.rstrip('0') # if it ends with a zero, strip it (eg, "10.310" -> "10.31") | |
val = val.rstrip('.') # if it ends with a decimal, strip it (eg, "10." -> "10") | |
else: | |
val = str(val) | |
except Exception: | |
val = "NULL" | |
vals.append(val) | |
line = ",".join(vals) | |
with open(logfile_path, "a") as logfile: | |
logfile.write("{},".format(time_str)) | |
logfile.write(line) | |
logfile.write("\n") | |
def get_logfile_path(): | |
expanded_dir = path.expandvars(path.expanduser(LOGFILE_DIRECTORY)) | |
logfile_dir = path.abspath(expanded_dir) | |
if not path.exists(logfile_dir): | |
mkdir(logfile_dir) | |
elif path.isfile(logfile_dir): | |
raise RuntimeError("Output logfile directory \"{}\" is a file, not a directory!".format(logfile_dir)) | |
datetime_now = datetime.now() # get local time | |
datetime_string = datetime_now.strftime("%Y%m%d") | |
file_name = "log_{}.csv".format(datetime_string) | |
file_path = path.join(logfile_dir, file_name) | |
if not path.exists(file_path): | |
zip_log_files(logfile_dir, at=7) | |
touch(file_path, mode=0o777) | |
write_csv_headers(file_path) | |
return file_path | |
async def push_to_pvoutput(date_time: datetime, energy_gen=None, power_gen=None, energy_consumption=None, | |
power_consumption=None, temperature=None, voltage=None): | |
session = push_to_pvoutput.session | |
if session is None: | |
headers = {"X-Pvoutput-Apikey": SETTINGS['pvoutput_apikey'], | |
"X-Pvoutput-SystemId": SETTINGS['pvoutput_system_id']} | |
session = aiohttp.ClientSession(headers=headers) | |
push_to_pvoutput.session = session | |
data = {"d": date_time.strftime("%Y%m%d"), "t": date_time.strftime("%H:%M")} | |
if energy_gen is None and power_gen is None and energy_consumption is None and power_consumption is None: | |
raise RuntimeError("Must have at lest one v1-v4 variable to sent to PVOutput") | |
if energy_gen is not None: | |
data['v1'] = float(energy_gen) | |
if power_gen is not None: | |
data['v2'] = float(power_gen) | |
if energy_consumption is not None: | |
data['v3'] = float(energy_consumption) | |
if power_consumption is not None: | |
data['v4'] = float(power_consumption) | |
if temperature is not None: | |
data['v5'] = float(temperature) | |
if voltage is not None: | |
data['v6'] = float(voltage) | |
async with session.post(url=PVOUTPUT_ENDPOINT, data=data) as result: | |
status = result.status | |
body = await result.read() | |
if 200 <= status < 300: | |
return True | |
else: | |
print(body) | |
push_to_pvoutput.session = None | |
def decode_payload(received_datetime, payload_bytes): | |
offset = 0 | |
payload_datas = list() | |
while True: | |
start_char = payload_bytes[offset] | |
assert start_char == 0x68, "payload chunk must start with 0x68" | |
len_char = payload_bytes[offset+1] | |
len_int = int(len_char) | |
if (len_int + HEADER_SIZE + TAIL_SIZE) > len(payload_bytes): | |
raise ValueError("Did not receive the whole payload.") | |
body_start = offset + HEADER_SIZE | |
header = payload_bytes[offset:body_start] | |
tail_start = body_start+len_int | |
tail_end = tail_start+TAIL_SIZE | |
tail = payload_bytes[tail_start:tail_end] | |
assert tail[1] == 0x16, "payload chunk should start with 0x16" | |
j = 0 | |
for i in range(offset+1, tail_start): | |
j += (payload_bytes[i] & 0xFF) | |
j &= 0xFF | |
cs = tail[0] | |
assert int(cs) == int(j), "payload checksum does not match" | |
flag1 = first(bytes(header[1:2])) | |
flag2 = first(bytes(header[2:3])) | |
flag3 = first(bytes(header[3:4])) | |
target_buffer = bytes(header[4:8]) | |
client_buffer = bytes(header[8:12]) | |
target_id = first(struct.unpack(b'<I', target_buffer)) | |
client_id = first(struct.unpack(b'<I', client_buffer)) | |
body = bytes(payload_bytes[body_start:body_start+len_int]) | |
command = first(body) | |
if command == 1: | |
arg = body[1] | |
print("found command #{} with arg #{}".format(command, arg)) | |
message = bytes(body[2:tail_start]) | |
print("message: {}".format(message)) | |
if command == 128: | |
arg = body[1] | |
print("found command #{} with arg #{}".format(command, arg)) | |
message = bytes(body[2:tail_start]) | |
print("message: {}".format(message)) | |
elif command == 129: | |
sensor = first(struct.unpack(b'<H', bytes(body[1:3]))) | |
print("found command #{} from sensor type #{}".format(command, sensor)) | |
cursor = 3 | |
sn_buffer = bytes(body[cursor:cursor+16]) | |
sn = sn_buffer.decode("ISO-8859-1") | |
print("got sensor serial number: {}".format(sn)) | |
cursor += 16 | |
payload_data = dict() | |
for k,v in SENSOR_VARIABLES.items(): | |
(bytes_size, bytes_format, multiplier, data_type) = v | |
var_buffer = bytes(body[cursor:cursor+bytes_size]) | |
var_unpack = first(struct.unpack(bytes_format, var_buffer)) | |
if bytes_size == 2 and var_unpack == 65535: | |
var_unpack = 0 | |
if data_type == float: | |
if multiplier is not None: | |
var = float(var_unpack) * multiplier | |
else: | |
var = float(var_unpack) | |
elif data_type == int: | |
if multiplier is not None: | |
var = round(float(var_unpack) * multiplier) | |
else: | |
var = int(var_unpack) | |
else: | |
raise RuntimeError("Don't know what to do with data type: {}".format(data_type)) | |
print("var {}:\t{}".format(k,var)) | |
payload_data[k] = var | |
cursor += bytes_size | |
payload_data['pac'] = int(payload_data['pac1'])+int(payload_data['pac2'])+int(payload_data['pac3']) | |
payload_datas.append(payload_data) | |
offset = tail_end | |
if offset >= len(payload_bytes): | |
break | |
return payload_datas | |
async def handle_receive(reader: StreamReader, writer: StreamWriter): | |
data = await reader.read() | |
last_received = handle_receive.last_received | |
last_sent = handle_receive.last_sent | |
consecutive_zeros = handle_receive.consecutive_zeros | |
last_energy_gen = handle_receive.last_energy_gen | |
received_time = time.time() | |
if last_received is None: | |
last_received = 0 | |
print("first connection received.") | |
else: | |
delta_received = received_time - last_received | |
print("last received {} seconds ago.".format(delta_received)) | |
if last_energy_gen is None: | |
last_energy_gen = -1.0 | |
if consecutive_zeros is None: | |
consecutive_zeros = 0 | |
hex_data = ''.join('{:02x}'.format(x) for x in data) | |
addr = writer.get_extra_info('peername') | |
print("Received {} from {}".format(hex_data, addr)) | |
handle_receive.last_received = received_time | |
received_datetime = datetime.fromtimestamp(received_time) | |
payload_datas = decode_payload(received_datetime, data) | |
if len(payload_datas) < 1: | |
print("Didn't get any payload variables?!") | |
return | |
payload_data = first(payload_datas) | |
energy_day_accum = payload_data['e_today']*1000.0 | |
pac = payload_data['pac'] | |
idle = False | |
if round(energy_day_accum, 1) == round(last_energy_gen, 1) and round(pac, 1) == 0.0: | |
consecutive_zeros = consecutive_zeros + 1 | |
if consecutive_zeros >= 2: | |
idle = True | |
else: | |
consecutive_zeros = 0 | |
handle_receive.consecutive_zeros = consecutive_zeros | |
handle_receive.last_energy_gen = energy_day_accum | |
if not idle: | |
logfile_path = get_logfile_path() | |
write_csv_values(received_datetime, logfile_path, payload_data) | |
if last_sent is None: | |
print("sending first packet to pvoutput.") | |
delta_seconds = SETTINGS['pvoutput_interval'] | |
else: | |
delta_sent = received_time - last_sent | |
delta_seconds = delta_sent | |
if delta_seconds >= SETTINGS['pvoutput_interval']: | |
result = await push_to_pvoutput(received_datetime, energy_gen=energy_day_accum, power_gen=pac, temperature=payload_data['temp'], voltage=payload_data['vac1']) | |
if result is True: | |
handle_receive.last_sent = time.time() | |
else: | |
print("skipping send. Sent last one {} seconds ago.".format(delta_seconds)) | |
else: | |
pass # idle | |
#print("Send: %r" % message) | |
#writer.write(data) | |
#await writer.drain() | |
#print("Close the client socket") | |
writer.close() | |
handle_receive.last_received = None | |
handle_receive.last_sent = None | |
handle_receive.last_energy_gen = None | |
handle_receive.consecutive_zeros = None | |
def run_server(): | |
loop = asyncio.get_event_loop() | |
coro = asyncio.start_server(handle_receive, '0.0.0.0', 8081, loop=loop) | |
server = loop.run_until_complete(coro) | |
# Serve requests until Ctrl+C is pressed | |
print('Serving on {}'.format(server.sockets[0].getsockname())) | |
try: | |
loop.run_forever() | |
except KeyboardInterrupt: | |
pass | |
# Close the server | |
server.close() | |
loop.run_until_complete(server.wait_closed()) | |
loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment