Created
July 8, 2025 16:48
-
-
Save ryanteck/255f4890cc141ff372821662895d6c25 to your computer and use it in GitHub Desktop.
EO Mini Pro 2 Python Control
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Solar Charger Code | |
| # (C) Ryan Walmsley 2023 | |
| # This code will monitor and calculate the average solar export from the house, then charge from it. | |
| # It then instructs my charger (an EO Mini Pro) to then charge at the set rate. | |
| from requests import get, post, exceptions | |
| import requests | |
| from pprint import pprint | |
| import json | |
| from time import sleep | |
| from math import floor | |
| import threading | |
| import datetime | |
| from serial import rs485 | |
| import datetime | |
| import sentry_sdk | |
| sentry_sdk.init( | |
| dsn="<SENTRY KEY>", | |
| # Set traces_sample_rate to 1.0 to capture 100% | |
| # of transactions for performance monitoring. | |
| # We recommend adjusting this value in production. | |
| traces_sample_rate=1.0 | |
| ) | |
| history = [] | |
| # Setup the API | |
| hassio_base_url = "<HASSIO>" | |
| power_consumption_url = "sensor.home_instant_electricity" | |
| eo_target_current = "sensor.eomini_target_current" | |
| car_charger_url = "sensor.car_monitoring_car_charger_w" | |
| set_charge_rate_url = "sensor.set_car_charge_amps" | |
| charger_relay_url = "switch.chargerrelay_charger_relay" | |
| average_url = "sensor.charger_average" | |
| hassio_key = "<<HASSIOKEY>>" | |
| headers = { | |
| "Authorization": "Bearer "+ hassio_key, | |
| "content-type": "application/json", | |
| } | |
| # Current Switch Settings - The letters / numbers on the switch match up to these. | |
| charge_switch_currents = { | |
| '0': 0, '1': 6, '2': 8, '3': 10, '4': 13, '5': 15, '6': 16, '7': 18, '8': 20, | |
| '9': 22, 'A': 24, 'B': 25, 'C': 26, 'D': 28, 'E': 30, 'F': 32 | |
| } | |
| # Convert charge dutys from the charger to amps, times by 0.6 for fast chargers. | |
| # Doesn't have the extra maths for fast charging but not required. | |
| def convert_charge_hex_to_amps(charge_hex): | |
| charge_hex_converted = int(charge_hex, 16) | |
| charge_amps = int((charge_hex_converted * 0.6)/10) | |
| return charge_amps | |
| # Convert charge dutys from the amps to duty, divide by 0.6 for fast chargers. | |
| # Doesn't have the extra maths for fast charging but not required. | |
| def convert_charge_amps_to_hex(charge_amps): | |
| if(charge_amps < 5.5): | |
| charge_amps = 0 | |
| charge_duty = (int(charge_amps / 0.6)*10) | |
| charge_hex = format(charge_duty, 'X') | |
| if(len(charge_hex)<3): | |
| charge_hex = '0' + charge_hex | |
| return charge_hex | |
| # Convert charge dutys from the amps to duty, divide by 0.6 for fast chargers. | |
| # Doesn't have the extra maths for rapid charging but not required. | |
| def convert_charge_watts_to_hex(charge_amps): | |
| pprint(charge_amps) | |
| pprint(floor(charge_amps*4)/4) | |
| if(charge_amps < 6.0): | |
| charge_amps = 0 | |
| charge_duty = (int(charge_amps / 0.6)*10) | |
| charge_hex = format(charge_duty, 'X') | |
| if(len(charge_hex)<3): | |
| charge_hex = '0' + charge_hex | |
| return charge_hex | |
| # Main function to get the current export from the house, and consumption from the charger, and calculate how many amps to export at. | |
| def calculate_charge(): | |
| global charge_ampage | |
| while True: | |
| try: | |
| response_pc = get(hassio_base_url+power_consumption_url, headers=headers) | |
| except requests.ConnectionError: | |
| response_pc = "" | |
| try: | |
| response_cc = get(hassio_base_url+car_charger_url, headers=headers) | |
| except requests.ConnectionError: | |
| response_pc = "" | |
| try: | |
| power_consumption = float(response_pc.json()['state']) | |
| except ValueError: | |
| power_consumption = float(0) | |
| except AttributeError: | |
| power_consumption = float(0) | |
| try: | |
| charger_consumption = float(response_cc.json()['state']) | |
| except ValueError: | |
| charger_consumption = float(0) | |
| except AttributeError: | |
| power_consumption = float(0) | |
| history.append((power_consumption - charger_consumption)) | |
| average = (round((sum(history)/len(history)))) | |
| pprint("Averages") | |
| pprint(average) | |
| average = average + 100 | |
| pprint(average) | |
| average_json = {"state": float(average), "attributes": {"unit_of_measurement": "W"}} | |
| average_response = post(hassio_base_url+average_url, headers=headers, json=average_json) | |
| charge_rate = floor((-average / 240)*4)/4 | |
| charge_rate = charge_rate - 0.5 | |
| pprint(charge_rate) | |
| if(charge_rate < 5.5): | |
| charge_rate = 0 | |
| charge_ampage = charge_rate | |
| #pprint(str(-average) + " " + str(charge_rate)) | |
| set_charge_rate = {"state": float(charge_rate), "attributes": {"unit_of_measurement": "A"}} | |
| set_charge_rate_response = post(hassio_base_url+set_charge_rate_url, headers=headers, json=set_charge_rate) | |
| if(len(history)>=15): | |
| history.pop(0) | |
| sleep(5) | |
| def manage_charger(): | |
| global charge_ampage | |
| global actual_charge_ampage | |
| while True: | |
| try: | |
| hassio_response_manual_override = get(hassio_base_url+"input_boolean.charge_car_override", headers=headers).json() | |
| if hassio_response_manual_override['state'] == 'off': | |
| pprint("Solar Charging Enabled") | |
| pprint("Solar Export Amps: " + str(charge_ampage)) | |
| actual_charge_ampage = charge_ampage | |
| else: | |
| pprint("Manual Charge Override") | |
| try: | |
| hassio_response_manual_current = get(hassio_base_url+"input_number.car_charge_amps", headers=headers).json() | |
| actual_charge_ampage = int(float(hassio_response_manual_current['state'])) | |
| pprint("Manual Current Amps: " + str(actual_charge_ampage)) | |
| except requests.ConnectionError: | |
| pprint("HASSIO Override Connection Issue, Disable charging for now.") | |
| actual_charge_ampage = 0 | |
| except requests.ConnectionError: | |
| pprint("HASSIO Override Connection Issue, Disable charging for now.") | |
| actual_charge_ampage = 0 | |
| except KeyError: | |
| pprint("HASSIO Override Connection Issue, Disable charging for now.") | |
| actual_charge_ampage = 0 | |
| data = [] | |
| built_command = '>0000' + convert_charge_amps_to_hex(actual_charge_ampage) + "\r" | |
| pprint("Charge Current Set To: " + str(actual_charge_ampage)) | |
| pprint(built_command) | |
| for c in built_command.encode("ascii"): | |
| data.append(int(c)) | |
| ser.write(data) | |
| packet = ser.readline() | |
| pprint(packet) | |
| variables = {} | |
| variables['target_current'] = actual_charge_ampage | |
| variables['version'] = str(packet[1:3]) | |
| variables['current_switch_setting'] = charge_switch_currents[packet[3:4].decode('utf-8')] # Responds with Hex 7 which is 18A if correct. | |
| variables['control_pilot_voltage'] = int(packet[4:7], 16) | |
| variables['charge_duty'] = convert_charge_hex_to_amps(packet[7:10]) | |
| variables['plug_present_voltage'] = int(packet[10:13], 16) | |
| variables['live_voltage'] = int(packet[13:16], 16) | |
| variables['neutral_voltage'] = int(packet[16:19], 16) | |
| variables['daylight_detection'] = str(packet[19:22]) | |
| variables['mains_frequency'] = int(packet[22:25], 16) | |
| variables['charger_state'] = str(packet[25:27]) | |
| variables['relay_state'] = int(packet[27:28], 16) | |
| variables['plug_state'] = int(packet[28:29],16) | |
| variables['HUB_duty_limit'] = convert_charge_hex_to_amps(packet[29:32]) | |
| variables['charge_duty_timer'] = str(packet[32:36]) | |
| variables['station_uptime'] = str(datetime.timedelta(minutes=int(packet[36:40], 16))) | |
| variables['charge_time'] = str(datetime.timedelta(minutes=int(packet[40:44], 16))) | |
| variables['state_of_mains'] = str(packet[44:46]) | |
| variables['cp_line_state'] = str(packet[46]) | |
| variables['station_ID'] = int(packet[47:48],16) | |
| variables['random_value'] = str(packet[48:50]) | |
| variables['max_current'] = convert_charge_hex_to_amps(packet[50:53]) | |
| variables['persistant_ID'] = int(packet[53:61],16) | |
| variables['checksum'] = str(packet[53:55]) # Modified to 53-55 as these are the last chars. Looks correct. | |
| for variable in variables.items(): | |
| pprint(variable) | |
| target_current_json = {"state": float(variables['target_current']), "attributes": {"unit_of_measurement": "A"}} | |
| try: | |
| target_current_response = post(hassio_base_url+eo_target_current, headers=headers, json=target_current_json) | |
| except requests.ConnectionError: | |
| pprint("HASSIO Override Connection Issue, can't feed back.") | |
| #if( variables['target_current'] > 0 ): | |
| # try: | |
| # hassio_relay_post = post(hassio_base_url+charger_relay_url, headers=headers, json={"state": "on"}) | |
| # except: | |
| # pprint("Failed to change relay") | |
| #else: | |
| # try: | |
| # hassio_relay_post = post(hassio_base_url+charger_relay_url, headers=headers, json={"state": "off"}) | |
| # except: | |
| # pprint("Failed to change relay") | |
| sleep(10) | |
| if __name__ == "__main__": | |
| ser = rs485.RS485() | |
| ser.port = "/dev/ttyUSB0" | |
| ser.baudrate = 115200 | |
| ser.timeout = 0 | |
| ser.baudrate = 115200 | |
| ser.rs485_mode = rs485.RS485Settings(rts_level_for_tx=False, rts_level_for_rx=True, delay_before_rx=0) | |
| ser.timeout = 0.01 | |
| ser.open() | |
| charge_ampage = 0 | |
| actual_charge_ampage = 0 | |
| calculate_thread = threading.Thread(target=calculate_charge) | |
| charger_thread = threading.Thread(target=manage_charger) | |
| calculate_thread.start() | |
| charger_thread.start() | |
| while True: | |
| sleep(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment