Last active
August 24, 2024 13:03
-
-
Save Jan200101/ba0545e5670084547a8b9d6a077f32df to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from micropython import const | |
from machine import Pin, RTC | |
from rp2 import bootsel_button | |
import network | |
import aiohttp | |
import asyncio | |
import logging | |
import sys | |
import ntptime | |
import socket | |
import json | |
_level = logging.DEBUG | |
_formatter = logging.Formatter("[%(asctime)s][%(name)s][%(levelname)s] %(message)s", "%Y-%m-%d %H:%M:%S") | |
class LogHandler(logging.Handler): | |
def __init__(self, stream, filepath, level): | |
super().__init__(level) | |
self.stream = stream | |
self.file = open(filepath, mode="a") | |
print(self.file) | |
self.terminator = "\n" | |
self.formatter = _formatter | |
def close(self): | |
if hasattr(self.stream, "flush"): | |
self.stream.flush() | |
if hasattr(self.file, "flush"): | |
self.file.flush() | |
self.file.close() | |
def emit(self, record): | |
if record.levelno >= self.level: | |
m = self.format(record) + self.terminator | |
self.stream.write(m) | |
#self.file.write(m) | |
#self.file.flush() | |
root_log = logging.getLogger() | |
root_log.handlers.clear() | |
root_log.addHandler(LogHandler(sys.stderr, "/log", logging.DEBUG)) | |
root_log.setLevel(logging.DEBUG) | |
log = logging.getLogger("TempSens") | |
log.info("Program start") | |
led = Pin("LED", Pin.OUT) | |
led.off() | |
switch_output = Pin(16, Pin.OUT, value=0) | |
rtc = RTC() | |
# STAT_NO_IP isn't exposed via MicroPython | |
STATUS = {getattr(network, x):x for x in dir(network) if x.startswith("STAT")} | |
STAT_NO_IP = 2 | |
STATUS.update({ | |
STAT_NO_IP: "STAT_NO_IP" | |
}) | |
async def connect_network(sta_if, ssid, psk): | |
if not sta_if.active(): | |
# only enable if its not active | |
log.info("Enabling sta") | |
sta_if.active(True) | |
if not sta_if.isconnected() or sta_if.status() != network.STAT_GOT_IP: | |
log.info(f"Connecting to {ssid}") | |
sta_if.connect(ssid, psk) | |
status = sta_if.status() | |
while status == network.STAT_CONNECTING or status == network.STAT_IDLE: | |
# We are still connecting, sleep a bit | |
led.toggle() | |
await asyncio.sleep(0.5) | |
status = sta_if.status() | |
c = 0 | |
while status == STAT_NO_IP: | |
# Okay, maybe the DHCP server is taking a bit longer... | |
if c > 60: | |
# Yeah no, its been over a minute | |
# We aren't getting an IP | |
raise Exception("Unable to connect to Network:", "Failed to get IP") | |
elif c == 0: | |
log.info("Waiting for IP...") | |
led.toggle() | |
await asyncio.sleep(0.5) | |
c += 1 | |
status = sta_if.status() | |
if sta_if.isconnected(): | |
log.info(f"Connected ({sta_if.ifconfig()[0]})") | |
else: | |
raise Exception("Unable to connect to Network:", STATUS[sta_if.status()]) | |
else: | |
log.info(f"Already connected {sta_if.ifconfig()[0]}") | |
log.info("Setting time") | |
ntptime.settime() | |
def switch_on(): | |
led.on() | |
switch_output.on() | |
def switch_off(): | |
led.off() | |
switch_output.off() | |
def switch_toggle(): | |
if switch_output.value(): | |
switch_off() | |
else: | |
switch_on() | |
battery_level = -1 | |
expected_state = None | |
async def parse_data(result_data): | |
global battery_level | |
global expected_state | |
LEVEL_START = 95 | |
LEVEL_END = 85 | |
for l in result_data["list"]: | |
if l["data_name"] == "I18N_COMMON_BATTERY_SOC": | |
battery_level = float(l["data_value"]) | |
break | |
if expected_state != True and battery_level >= LEVEL_START: | |
switch_on() | |
expected_state = True | |
elif expected_state != False and battery_level <= LEVEL_END: | |
switch_off() | |
expected_state = False | |
log.info(f"Battery level: {battery_level}",) | |
async def button_handler(): | |
last_state = 0 | |
while True: | |
state = bootsel_button() | |
if state != last_state: | |
if state: | |
switch_toggle() | |
log.info("Button Press: " + str(switch_output.value())) | |
last_state = state | |
await asyncio.sleep(0) | |
async def http_callback(reader, writer): | |
global expected_state | |
global battery_level | |
method, path, *_ = (await reader.readline()).split(b" ") | |
# Ignore headers | |
while await reader.readline() != b"\r\n": | |
pass | |
query = {} | |
try: | |
log.info(f"Received HTTP Request: {method.decode()} {path.decode()}") | |
except: | |
log.info(f"Received HTTP Request") | |
query_start = path.find(b"?") | |
if query_start >= 0: | |
for query_item in path[query_start+1:].split(b"&"): | |
k, v = query_item.split(b"=") | |
query[k] = v | |
path = path[:query_start] | |
try: | |
turn = query[b"turn"].decode() | |
if turn == "on": | |
log.info("Turned relay on") | |
switch_on() | |
elif turn == "off": | |
log.info("Turned relay off") | |
switch_off() | |
elif turn == "auto": | |
log.info("Turned relay to auto") | |
if expected_state: | |
switch_on() | |
else: | |
switch_off() | |
else: | |
log.warn(f"Invalid `turn` state: {turn}") | |
except KeyError: | |
pass | |
def formatted_datetime(): | |
rtc_dt = rtc.datetime() | |
return '{year:04d}-{month:02d}-{day:02d}T{hour:02d}:{minute:02d}:{second:02d}'.format( | |
year=rtc_dt[0], | |
month=rtc_dt[1], | |
day=rtc_dt[2], | |
hour=rtc_dt[4], | |
minute=rtc_dt[5], | |
second=rtc_dt[6] | |
) | |
data = { | |
"datetime": formatted_datetime(), | |
"battery_level": battery_level, | |
"state": bool(switch_output.value()), | |
"auto_state": expected_state, | |
} | |
body = json.dumps(data).encode() | |
await writer.awrite(b"HTTP/1.1 200 OK\r\n") | |
await writer.awrite(b"Content-type: application/json\r\n") | |
await writer.awrite(b"Content-Length: ") | |
await writer.awrite(str(len(body)).encode()) | |
await writer.awrite(b"\r\n") | |
await writer.awrite(b"\r\n") | |
await writer.awrite(body) | |
await writer.wait_closed() | |
async def http_server(): | |
server = asyncio.start_server(http_callback, "0.0.0.0", 8080) | |
log.info("Started HTTP Server") | |
await server | |
async def temp_handler(): | |
ws_url = const("ws://192.168.178.65:8082/ws/home/overview") | |
ws_request = {"lang":"en_us","dev_id":"1","service":"real_battery"} | |
sta_if = network.WLAN(network.STA_IF) | |
while True: | |
try: | |
led.on() | |
if not sta_if.isconnected() or sta_if.status() != network.STAT_GOT_IP: | |
await connect_network(sta_if, "easybox1", "Zwiebelkuchen2001") | |
except Exception as e: | |
log.exception(e) | |
await asyncio.sleep(1) | |
continue | |
led.value(switch_output.value()) | |
try: | |
async with aiohttp.ClientSession() as session: | |
log.info("Client Session started") | |
while True: | |
data = None | |
async with session.ws_connect(ws_url) as ws: | |
log.info("Connection to socket established") | |
await ws.send_json(ws_request) | |
await asyncio.sleep(1) # sleep to ensure the data will arrive | |
# Toggle led twice to signal we got something | |
led.toggle() | |
data = await ws.receive_json() | |
led.toggle() | |
log.info("Closing connection") | |
if data: | |
log.info("Parsing data") | |
await parse_data(data["result_data"]) | |
await asyncio.sleep(120) | |
log.error("Session Interrupted, restarting...") | |
continue | |
except Exception as e: | |
log.exception(e) | |
await asyncio.sleep(0) | |
continue | |
async def main(): | |
asyncio.create_task(button_handler()) | |
asyncio.create_task(http_server()) | |
await temp_handler() | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment