Skip to content

Instantly share code, notes, and snippets.

@ryanteck
Created July 8, 2025 16:48
Show Gist options
  • Save ryanteck/255f4890cc141ff372821662895d6c25 to your computer and use it in GitHub Desktop.
Save ryanteck/255f4890cc141ff372821662895d6c25 to your computer and use it in GitHub Desktop.
EO Mini Pro 2 Python Control
# Solar Charger Code
# (C) Ryan Walmsley 2023
# This code will monitor and calculate the average solar export from the house, then charge from it.
# It then instructs my charger (an EO Mini Pro) to then charge at the set rate.
from requests import get, post, exceptions
import requests
from pprint import pprint
import json
from time import sleep
from math import floor
import threading
import datetime
from serial import rs485
import datetime
import sentry_sdk
sentry_sdk.init(
dsn="<SENTRY KEY>",
# Set traces_sample_rate to 1.0 to capture 100%
# of transactions for performance monitoring.
# We recommend adjusting this value in production.
traces_sample_rate=1.0
)
history = []
# Setup the API
hassio_base_url = "<HASSIO>"
power_consumption_url = "sensor.home_instant_electricity"
eo_target_current = "sensor.eomini_target_current"
car_charger_url = "sensor.car_monitoring_car_charger_w"
set_charge_rate_url = "sensor.set_car_charge_amps"
charger_relay_url = "switch.chargerrelay_charger_relay"
average_url = "sensor.charger_average"
hassio_key = "<<HASSIOKEY>>"
headers = {
"Authorization": "Bearer "+ hassio_key,
"content-type": "application/json",
}
# Current Switch Settings - The letters / numbers on the switch match up to these.
charge_switch_currents = {
'0': 0, '1': 6, '2': 8, '3': 10, '4': 13, '5': 15, '6': 16, '7': 18, '8': 20,
'9': 22, 'A': 24, 'B': 25, 'C': 26, 'D': 28, 'E': 30, 'F': 32
}
# Convert charge dutys from the charger to amps, times by 0.6 for fast chargers.
# Doesn't have the extra maths for fast charging but not required.
def convert_charge_hex_to_amps(charge_hex):
charge_hex_converted = int(charge_hex, 16)
charge_amps = int((charge_hex_converted * 0.6)/10)
return charge_amps
# Convert charge dutys from the amps to duty, divide by 0.6 for fast chargers.
# Doesn't have the extra maths for fast charging but not required.
def convert_charge_amps_to_hex(charge_amps):
if(charge_amps < 5.5):
charge_amps = 0
charge_duty = (int(charge_amps / 0.6)*10)
charge_hex = format(charge_duty, 'X')
if(len(charge_hex)<3):
charge_hex = '0' + charge_hex
return charge_hex
# Convert charge dutys from the amps to duty, divide by 0.6 for fast chargers.
# Doesn't have the extra maths for rapid charging but not required.
def convert_charge_watts_to_hex(charge_amps):
pprint(charge_amps)
pprint(floor(charge_amps*4)/4)
if(charge_amps < 6.0):
charge_amps = 0
charge_duty = (int(charge_amps / 0.6)*10)
charge_hex = format(charge_duty, 'X')
if(len(charge_hex)<3):
charge_hex = '0' + charge_hex
return charge_hex
# Main function to get the current export from the house, and consumption from the charger, and calculate how many amps to export at.
def calculate_charge():
global charge_ampage
while True:
try:
response_pc = get(hassio_base_url+power_consumption_url, headers=headers)
except requests.ConnectionError:
response_pc = ""
try:
response_cc = get(hassio_base_url+car_charger_url, headers=headers)
except requests.ConnectionError:
response_pc = ""
try:
power_consumption = float(response_pc.json()['state'])
except ValueError:
power_consumption = float(0)
except AttributeError:
power_consumption = float(0)
try:
charger_consumption = float(response_cc.json()['state'])
except ValueError:
charger_consumption = float(0)
except AttributeError:
power_consumption = float(0)
history.append((power_consumption - charger_consumption))
average = (round((sum(history)/len(history))))
pprint("Averages")
pprint(average)
average = average + 100
pprint(average)
average_json = {"state": float(average), "attributes": {"unit_of_measurement": "W"}}
average_response = post(hassio_base_url+average_url, headers=headers, json=average_json)
charge_rate = floor((-average / 240)*4)/4
charge_rate = charge_rate - 0.5
pprint(charge_rate)
if(charge_rate < 5.5):
charge_rate = 0
charge_ampage = charge_rate
#pprint(str(-average) + " " + str(charge_rate))
set_charge_rate = {"state": float(charge_rate), "attributes": {"unit_of_measurement": "A"}}
set_charge_rate_response = post(hassio_base_url+set_charge_rate_url, headers=headers, json=set_charge_rate)
if(len(history)>=15):
history.pop(0)
sleep(5)
def manage_charger():
global charge_ampage
global actual_charge_ampage
while True:
try:
hassio_response_manual_override = get(hassio_base_url+"input_boolean.charge_car_override", headers=headers).json()
if hassio_response_manual_override['state'] == 'off':
pprint("Solar Charging Enabled")
pprint("Solar Export Amps: " + str(charge_ampage))
actual_charge_ampage = charge_ampage
else:
pprint("Manual Charge Override")
try:
hassio_response_manual_current = get(hassio_base_url+"input_number.car_charge_amps", headers=headers).json()
actual_charge_ampage = int(float(hassio_response_manual_current['state']))
pprint("Manual Current Amps: " + str(actual_charge_ampage))
except requests.ConnectionError:
pprint("HASSIO Override Connection Issue, Disable charging for now.")
actual_charge_ampage = 0
except requests.ConnectionError:
pprint("HASSIO Override Connection Issue, Disable charging for now.")
actual_charge_ampage = 0
except KeyError:
pprint("HASSIO Override Connection Issue, Disable charging for now.")
actual_charge_ampage = 0
data = []
built_command = '>0000' + convert_charge_amps_to_hex(actual_charge_ampage) + "\r"
pprint("Charge Current Set To: " + str(actual_charge_ampage))
pprint(built_command)
for c in built_command.encode("ascii"):
data.append(int(c))
ser.write(data)
packet = ser.readline()
pprint(packet)
variables = {}
variables['target_current'] = actual_charge_ampage
variables['version'] = str(packet[1:3])
variables['current_switch_setting'] = charge_switch_currents[packet[3:4].decode('utf-8')] # Responds with Hex 7 which is 18A if correct.
variables['control_pilot_voltage'] = int(packet[4:7], 16)
variables['charge_duty'] = convert_charge_hex_to_amps(packet[7:10])
variables['plug_present_voltage'] = int(packet[10:13], 16)
variables['live_voltage'] = int(packet[13:16], 16)
variables['neutral_voltage'] = int(packet[16:19], 16)
variables['daylight_detection'] = str(packet[19:22])
variables['mains_frequency'] = int(packet[22:25], 16)
variables['charger_state'] = str(packet[25:27])
variables['relay_state'] = int(packet[27:28], 16)
variables['plug_state'] = int(packet[28:29],16)
variables['HUB_duty_limit'] = convert_charge_hex_to_amps(packet[29:32])
variables['charge_duty_timer'] = str(packet[32:36])
variables['station_uptime'] = str(datetime.timedelta(minutes=int(packet[36:40], 16)))
variables['charge_time'] = str(datetime.timedelta(minutes=int(packet[40:44], 16)))
variables['state_of_mains'] = str(packet[44:46])
variables['cp_line_state'] = str(packet[46])
variables['station_ID'] = int(packet[47:48],16)
variables['random_value'] = str(packet[48:50])
variables['max_current'] = convert_charge_hex_to_amps(packet[50:53])
variables['persistant_ID'] = int(packet[53:61],16)
variables['checksum'] = str(packet[53:55]) # Modified to 53-55 as these are the last chars. Looks correct.
for variable in variables.items():
pprint(variable)
target_current_json = {"state": float(variables['target_current']), "attributes": {"unit_of_measurement": "A"}}
try:
target_current_response = post(hassio_base_url+eo_target_current, headers=headers, json=target_current_json)
except requests.ConnectionError:
pprint("HASSIO Override Connection Issue, can't feed back.")
#if( variables['target_current'] > 0 ):
# try:
# hassio_relay_post = post(hassio_base_url+charger_relay_url, headers=headers, json={"state": "on"})
# except:
# pprint("Failed to change relay")
#else:
# try:
# hassio_relay_post = post(hassio_base_url+charger_relay_url, headers=headers, json={"state": "off"})
# except:
# pprint("Failed to change relay")
sleep(10)
if __name__ == "__main__":
ser = rs485.RS485()
ser.port = "/dev/ttyUSB0"
ser.baudrate = 115200
ser.timeout = 0
ser.baudrate = 115200
ser.rs485_mode = rs485.RS485Settings(rts_level_for_tx=False, rts_level_for_rx=True, delay_before_rx=0)
ser.timeout = 0.01
ser.open()
charge_ampage = 0
actual_charge_ampage = 0
calculate_thread = threading.Thread(target=calculate_charge)
charger_thread = threading.Thread(target=manage_charger)
calculate_thread.start()
charger_thread.start()
while True:
sleep(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment