Last active
April 20, 2025 09:26
-
-
Save enkiusz/6408645efd622b8a638a14957cd37f47 to your computer and use it in GitHub Desktop.
ZKETECH EBC-A20 Control code
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
A simple CLI tool to control and monitor the ZKETECH EBC-A20 battery | |
tester from https://www.zketech.com/en/ | |
It can trigger charge or discharge as well as perform charge-discharge | |
cycles to measure battery capacity. | |
Currenly only the CC (Constant Current) load type is implemented, the | |
CP (Constant Power) load type is a simple fixme. Additionally, some | |
small improvement opportunities are documented in the code as FIXME | |
comments. | |
Miscellaneous features include monitoring packets being sent out by the | |
tester without triggering any actions and decoding of arbitrary status | |
packets. | |
""" | |
#pylint: disable=too-many-lines | |
import argparse | |
import csv | |
import json | |
import logging | |
import operator | |
import re | |
import time | |
from binascii import hexlify, unhexlify | |
from collections import namedtuple | |
from contextlib import nullcontext, AbstractContextManager | |
from enum import IntEnum | |
from functools import reduce | |
from pathlib import Path | |
from statistics import mean | |
from struct import pack, unpack | |
from typing import Tuple | |
import pint | |
import serial | |
import structlog | |
# Reference: https://stackoverflow.com/a/49724281 | |
LOG_LEVEL_NAMES = [logging.getLevelName(v) for v in | |
sorted(getattr(logging, '_levelToName', None) or | |
logging._levelNames) if getattr(v, "real", 0)] | |
log = structlog.get_logger() | |
ureg = pint.UnitRegistry() | |
class ZKETechEBCModels(IntEnum): | |
""" | |
ZKETECH EBC model codes. | |
""" | |
EBC_A05 = 0x05 | |
EBC_A10H = 0x06 | |
EBC_A20 = 0x09 | |
# Packet start & end markers | |
SOF = 0xfa | |
EOF = 0xf8 | |
class CommandCode(IntEnum): | |
""" | |
Command codes for building packets sent from the PC | |
to the battery tester. | |
""" | |
CONNECT = 0x05 | |
DISCONNECT = 0x06 | |
STOP = 0x02 | |
CHRG_CCCV_START = 0x21 | |
# Send the number of minutes of charging time elapsed to the charger ??? | |
# Sent every 60 seconds after CHRG_CCCV is sent | |
CHRG_TIME = 0x0a | |
DISCH_CC_START = 0x01 | |
DISCH_CC_ADJUST = 0x07 | |
DISCH_CC_STOP = 0x08 | |
class StatusCode(IntEnum): | |
""" | |
Status codes for decoding packets sent from the | |
battery tester to the PC. | |
""" | |
# ??? | |
# Sent when charger is idle | |
IDLE = 0x02 | |
IDLE_MONITOR = 0x66 | |
CHRG_CCCV = 0x0c | |
CHRG_CCCV_MONITOR = 0x70 | |
CHRG_CCCV_END = 0x16 | |
# Why two status codes during CC discharge | |
DISC_CC_IDLE = 0x00 | |
DISC_CC = 0x0a | |
DISC_CC_END = 0x14 | |
DISC_CC_MONITOR = 0x64 | |
UNK1 = 0x6e | |
# This encoding is used to prevent bytes > 240 to prevent in the byte stream. | |
# This feature allows the bytestream to safely use 0xfa and 0xf8 as SOF (Start Of Frame) | |
# and EOF (End Of Frame) | |
def encode_base240(value: int) -> Tuple[int, int]: | |
""" | |
Encode a value into base240 encoded 2 byte sequence. | |
""" | |
assert value < 0xf0 * 0xf0 + 0xf0 | |
h = value // 0xf0 | |
l = value - h * 0xf0 | |
return (h,l) | |
def decode_base240(h: int, l: int) -> int: | |
""" | |
Decode a base240 encoded 2 byte sequence into a single integer. | |
""" | |
return 0xf0 * h + l | |
#pylint: disable=line-too-long | |
# Reference: https://github.com/dev-strom/esp-ebc-mqtt/blob/main/lib/commands/EbcA20.cpp | |
# bool EbcA20::Decode(uint16_t source, double& value, ParameterPacking pp) const | |
# { | |
# switch (pp) // PP_None, PP_V, PP_V_set, PP_A, PP_A_set, PP_P, PP_T, PP_Ah | |
# { | |
# case PP_V: | |
# // TODO: is PP_V the same way decoded as PP_Ah? (don't know, because I never had a voltage above 10V) | |
# value = (source == 0x0000) ? 0.0 : ((static_cast<double> (((source >> 8) * 240) + (source & 0xFF))) / 1000.0); | |
# break; | |
# case PP_Ah: | |
# if (source & 0x8000) { | |
# // capacity >= 10.0 Ah | |
# if ((source & 0xE000) == 0xE000) { | |
# // capacity >= 200.0 Ah | |
# value = ((static_cast<double> ((((source >> 8) & 0x3F) * 240) + (source & 0xFF) - 0x1C00)) / 10.0); | |
# } else { | |
# // capacity < 200.0 Ah | |
# value = ((static_cast<double> ((((source >> 8) & 0x7F) * 240) + (source & 0xFF) - 0x0800)) / 100.0); | |
# } | |
# } else { | |
# // capacity < 10.0 Ah | |
# value = (source == 0x0000) ? 0.0 : ((static_cast<double> (((source >> 8) * 240) + (source & 0xFF))) / 1000.0); | |
# } | |
#pylint: enable=line-too-long | |
def decode_ranged(h: int, l: int) -> float: | |
""" | |
Decode a base240 value to a float taking into account different offsets and scaling | |
factors for multiple ranges. | |
""" | |
if h & 0x80 == 0x80: | |
if h & 0xe0 == 0xe0: | |
# range is > 200.0 | |
v = 240 * (h & 0x3f) + l | |
m = 0.1 | |
offset = 0x1c00 | |
else: | |
# 10.00 < range < 200.0 | |
v = 240 * (h & 0x7f) + l | |
m = 0.01 | |
offset = 0x800 | |
else: | |
# range is < 10.00 | |
v = 240 * h + l | |
m = 0.001 | |
offset = 0 | |
return (v - offset) * m | |
Field = namedtuple('Field', 'key value') | |
#pylint: disable=unnecessary-lambda | |
pkt_field_defs = { | |
# Example packet: | |
# fa 02 0000 0a13 0014 0000 0032 010a 000a 09 35 f8 | |
(ZKETechEBCModels.EBC_A20, StatusCode.IDLE): [ | |
# Sent when device is idle ('OFF' on display) | |
# Current data | |
Field(key='current', | |
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere), | |
Field(key='voltage', | |
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.volt), | |
Field(key='charge', | |
value=lambda h,l: decode_ranged(h,l) * ureg.ampere_hour), | |
# Does not seem to work, always 0 | |
Field(key='energy', | |
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.watt_hour), | |
# Setpoint | |
Field(key='chrg_cc_current', | |
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere), | |
Field(key='chrg_cv_voltage', | |
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.volt), | |
Field(key='chrg_cutoff_current', | |
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere) | |
], | |
# Example packet: | |
# fa 66 0000 0888 0014 0000 013e 0c8f 0905 09 4b f8 | |
(0x09, StatusCode.IDLE_MONITOR): [ | |
# Sent when device is idle ('OFF' on display) | |
# Contains FW and/or hardware information | |
# Current data | |
Field(key='current', | |
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere), | |
Field(key='voltage', | |
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.volt), | |
Field(key='charge', | |
value=lambda h,l: decode_ranged(h,l) * ureg.ampere_hour), | |
# Does not seem to work, always 0 | |
Field(key='energy', | |
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.watt_hour), | |
# Device | |
# Displayed in the ZKETECH application UI as 'V3.02' | |
Field(key='fw_ver', | |
value=lambda h,l: str(decode_base240(h,l) / 100)), | |
Field(key='unk2', | |
value=lambda h,l: decode_base240(h,l)), # Always 3023 | |
Field(key='unk3', | |
value=lambda h,l: decode_base240(h,l)) # Always 2165 | |
], | |
# Example packets:V3 | |
# fa 0c 0032 07de 0000 0000 0032 01b4 000a 09 63 f8 | |
# fa 0c 0214 1059 299e 0000 0214 01b4 000a 09 44 f8 | |
# Charge = 9998 mAh | |
# fa 0c 0214 1059 8ca8 0000 0214 01b4 000a 09 d7 f8 | |
# Charge > 10000 mAh = 10 Ah display will be 10.01 Ah | |
# fa 0c 0214 1078 9124 0000 0214 01b4 000a 09 67 f8 | |
# charge = 20.69 Ah | |
# fa 0c 000b 1178 c478 0000 0214 01b4 000a 09 72 f8 | |
# charge = 143.9 Ah | |
(ZKETechEBCModels.EBC_A20, StatusCode.CHRG_CCCV): [ | |
# Mode: CHRG-CCCV | |
# Sent in both CC and CV phases of charging | |
# Current data | |
Field(key='current', | |
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere), | |
Field(key='voltage', | |
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.volt), | |
Field(key='charge', | |
value=lambda h,l: decode_ranged(h,l) * ureg.ampere_hour), | |
# Does not seem to work, always 0 | |
Field(key='energy', | |
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.watt_hour), | |
# Setpoint | |
Field(key='chrg_cccv_current', | |
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere), | |
Field(key='chrg_cccv_voltage', | |
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.volt), | |
Field(key='chrg_cutoff_current', | |
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere) | |
], | |
# Example packet: | |
# fa 70 0000 0453 0000 0000 013e 0c8f 0905 09 9e f8 | |
(ZKETechEBCModels.EBC_A20, StatusCode.CHRG_CCCV_MONITOR): [ | |
# Mode: CHRG-CCCV | |
# Sent during the first few seconds of charging | |
# Current data | |
Field(key='current', | |
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere), | |
Field(key='voltage', | |
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.volt), | |
Field(key='charge', | |
value=lambda h,l: decode_ranged(h,l) * ureg.ampere_hour), | |
# Does not seem to work, always 0 | |
Field(key='energy', | |
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.watt_hour), | |
# Device | |
# Displayed in the ZKETECH application UI as 'V3.02' | |
Field(key='fw_ver', value=lambda h,l: str(decode_base240(h,l) / 100)), | |
Field(key='unk2', value=lambda h,l: decode_base240(h,l)), # Always 3023 | |
Field(key='unk3', value=lambda h,l: decode_base240(h,l)) # Always 2165 | |
], | |
# Example packet: | |
# fa 16 000a 0a64 0014 0000 0032 010a 000a 09 5c f8 | |
(ZKETechEBCModels.EBC_A20, StatusCode.CHRG_CCCV_END): [ | |
# Mode: CHRG-CCCV | |
# Sent when charging stops due to current cutoff reached | |
# Current data | |
Field(key='current', | |
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere), | |
Field(key='voltage', | |
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.volt), | |
Field(key='charge', | |
value=lambda h,l: decode_ranged(h,l) * ureg.ampere_hour), | |
# Does not seem to work, always 0 | |
Field(key='energy', | |
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.watt_hour), | |
# Setpoint | |
Field(key='cccv_current', | |
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere), | |
Field(key='cccv_voltage', | |
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.volt), | |
Field(key='cutoff_current', | |
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere) | |
], | |
# Example packet: | |
# fa 00 0000 1049 0000 0000 0032 013c 0078 09 27 f8 | |
(ZKETechEBCModels.EBC_A20, StatusCode.DISC_CC_IDLE): [ | |
# Mode: DISC-CC | |
# Current data | |
Field(key='current', | |
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere), | |
Field(key='voltage', | |
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.volt), | |
Field(key='charge', | |
value=lambda h,l: decode_ranged(h,l) * ureg.ampere_hour), | |
# Does not seem to work, always 0 | |
Field(key='energy', | |
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.watt_hour), | |
# Setpoint | |
Field(key='disc_cc_current', value= | |
lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere), | |
Field(key='disc_cc_cutoff_voltage', value= | |
lambda h,l: decode_base240(h,l) * 0.01 * ureg.volt), | |
Field(key='disc_max_time', value= | |
lambda h,l: decode_base240(h,l) * ureg.minute) | |
], | |
# Example packets: | |
# fa 0a 0214 0f22 bbcf 0000 0214 013c 0000 09 67 f8 | |
# here the charge is > 100 Ah | |
# charge == 123.2Ah | |
# fa 0a 0214 0e80 dbd0 0000 0214 013c 0000 09 bb f8 | |
# charge = 200.0 (decoded) | |
# fa 0a 0214 0e80 e630 0000 0214 013c 0000 09 66 f8 | |
# charge = 200.0Ah | |
# fa 00 0000 0dbf e777 0000 0214 013c 0000 09 00 f8 | |
# charge = 231.1Ah | |
# fa 0a 0214 0e4f e68d 0000 0214 013c 0000 09 14 f8 | |
# charge = 209.3Ah | |
# BUG!!! | |
# 2025-01-22 18:46:37 [debug ] decode status pkt | |
# pkt=b'fa0a02140f01cf8800000214013c00000977f8' | |
# {'.calculated_crc': 119, '.crc_check': True, 'ptype': <StatusCode.DISC_CC: 10>, | |
# 'dev_model': <ZKETechEBCModels.EBC_A20: 9>, | |
# 'current': <Quantity(5.0, 'ampere')>, 'voltage': <Quantity(3.601, 'volt')>, | |
# 'charge': <Quantity(-343.2, 'ampere_hour')>, | |
# 'energy': <Quantity(0.0, 'watt_hour')>, 'disc_cc_current': <Quantity(5.0, 'ampere')>, | |
# 'disc_cc_cutoff_voltage': <Quantity(3.0, 'volt')>, 'disc_max_time': <Quantity(0, 'minute')>} | |
# charge is 170.5Ah on display | |
(ZKETechEBCModels.EBC_A20, StatusCode.DISC_CC): [ | |
# Mode: DISC-CC | |
# Current data | |
Field(key='current', | |
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere), | |
Field(key='voltage', | |
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.volt), | |
Field(key='charge', | |
value=lambda h,l: decode_ranged(h,l) * ureg.ampere_hour), | |
# Does not seem to work, always 0 | |
Field(key='energy', | |
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.watt_hour), | |
# Setpoint | |
Field(key='disc_cc_current', | |
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere), | |
Field(key='disc_cc_cutoff_voltage', | |
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.volt), | |
Field(key='disc_max_time', | |
value=lambda h,l: decode_base240(h,l) * ureg.minute) | |
], | |
(ZKETechEBCModels.EBC_A20, StatusCode.DISC_CC_MONITOR): [ | |
# Current data | |
Field(key='current', | |
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere), | |
Field(key='voltage', | |
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.volt), | |
Field(key='charge', | |
value=lambda h,l: decode_ranged(h,l) * ureg.ampere_hour), | |
# Does not seem to work, always 0 | |
Field(key='energy', | |
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.watt_hour), | |
# Device | |
# Displayed in the ZKETECH application UI as 'V3.02' | |
Field(key='fw_ver', | |
value=lambda h,l: str(decode_base240(h,l) / 100)), | |
Field(key='unk2', | |
value=lambda h,l: decode_base240(h,l)), # Always 3023 | |
Field(key='unk3', | |
value=lambda h,l: decode_base240(h,l)) # Always 2165 | |
], | |
# Example packet: | |
# fa 0a 0032 0f41 0002 0000 0032 013c 003c 09 4e f8 | |
(ZKETechEBCModels.EBC_A20, StatusCode.DISC_CC): [ # Mode: D-CC | |
# Current data | |
Field(key='current', | |
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere), | |
Field(key='voltage', | |
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.volt), | |
Field(key='charge', | |
value=lambda h,l: decode_ranged(h,l) * ureg.ampere_hour), | |
# Does not seem to work, always 0 | |
Field(key='energy', | |
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.watt_hour), | |
# Setpoint | |
Field(key='disc_cc_current', | |
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere), | |
Field(key='disc_cc_cutoff_voltage', | |
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.volt), | |
Field(key='disc_max_time', | |
value=lambda h,l: decode_base240(h,l) * ureg.minute) | |
], | |
# Example packet: | |
# fa 14 0032 0c77 0159 0000 0032 013c 0078 09 7b f8 | |
(ZKETechEBCModels.EBC_A20, StatusCode.DISC_CC_END): [ # Mode: D-CC | |
# Current data | |
Field(key='current', | |
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere), | |
Field(key='voltage', | |
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.volt), | |
Field(key='charge', | |
value=lambda h,l: decode_ranged(h,l) * ureg.ampere_hour), | |
# Does not seem to work, always 0 | |
Field(key='energy', | |
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.watt_hour), | |
# Setpoint | |
Field(key='disc_cc_current', | |
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere), | |
Field(key='disc_cc_cutoff_voltage', | |
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.volt), | |
Field(key='disc_max_time', | |
value=lambda h,l: decode_base240(h,l) * ureg.minute) | |
], | |
# Example packet: | |
# fa 6e 0032 0f41 0002 0000 013e 0c8f 0905 09 a9 f8 | |
(ZKETechEBCModels.EBC_A20, StatusCode.UNK1): [ | |
# Current data | |
Field(key='current', | |
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere), | |
Field(key='voltage', | |
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.volt), | |
Field(key='charge', | |
value=lambda h,l: decode_ranged(h,l) * ureg.ampere_hour), | |
# Does not seem to work, always 0 | |
Field(key='energy', | |
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.watt_hour), | |
# Device | |
# Displayed in the ZKETECH application UI as 'V3.02' | |
Field(key='fw_ver', value=lambda h,l: str(decode_base240(h,l) /100)), | |
Field(key='unk2', value=lambda h,l: decode_base240(h,l)), | |
Field(key='unk3', value=lambda h,l: decode_base240(h,l)) | |
], | |
} | |
#pylint: enable=unnecessary-lambda | |
def decode_status_pkt(pkt: bytes) -> dict: | |
""" | |
Decode a status packet received from the battery tester into | |
a dictionary of fields. | |
Some of the fields are metadata fields which contain information | |
about the decoding process itself. For example, the '.crc_check' | |
key contains a boolean indicating whether the CRC is valid for the | |
status packet. | |
""" | |
log.debug('decode status pkt', pkt=hexlify(pkt)) | |
if len(pkt) < 4: | |
log.error('packet too short', pkt=hexlify(pkt)) | |
return None | |
pktb = bytearray(pkt) | |
pkt_fields = {} | |
marker1 = pktb.pop(0) | |
marker2 = pktb.pop() | |
crc = pktb.pop() | |
if marker1 != SOF or marker2 != EOF: | |
log.error('unknown packet format', pkt=hexlify(pkt)) | |
return None | |
calculated_crc = reduce(operator.xor, pktb) | |
pkt_fields['.calculated_crc'] = calculated_crc | |
# CRC values which have the highest 4 bits set are transmitted | |
# with those bits turned off to not insert a SOF or EOF byte | |
# by chance and confuse the framing decoder | |
# Reference: https://github.com/dev-strom/esp-ebc-mqtt/blob/main/lib/commands/message.cpp | |
if (calculated_crc & 0xf0) == 0xf0: | |
calculated_crc = calculated_crc & 0x0f | |
if calculated_crc != crc: | |
log.warn('crc verification failed', pkt=hexlify(pkt)) | |
pkt_fields['.crc_check'] = False | |
else: | |
pkt_fields['.crc_check'] = True | |
ptype = pktb.pop(0) | |
dev_model = pktb.pop() | |
if ptype in StatusCode and dev_model in ZKETechEBCModels: | |
ptype = StatusCode( ptype ) | |
dev_model = ZKETechEBCModels( dev_model ) | |
pkt_fields['ptype'] = ptype | |
pkt_fields['dev_model'] = dev_model | |
field_defs = pkt_field_defs.get( (dev_model, ptype) ) | |
if not field_defs: | |
log.error('no field definitions', dev_model=dev_model, ptype=ptype, pkt=hexlify(pkt)) | |
return None | |
for field_def in field_defs: | |
(h, l) = unpack('2B', pktb[0:2]) | |
pkt_fields[field_def.key] = field_def.value(h, l) | |
pktb = pktb[2:] | |
return pkt_fields | |
log.warn('cannot recognize packet', packet_type=ptype, device=dev_model, pkt=hexlify(pkt)) | |
return None | |
def wrap_pkt(payload: bytes) -> bytes: | |
""" | |
Wrap the control packet payload with a CRC and start/stop frame indicators. | |
""" | |
crc = reduce(operator.xor, payload) | |
return bytes([ SOF ]) + payload + bytes([crc]) + bytes([ EOF ]) | |
def ctl_packet(command: CommandCode, p1: int = 0, p2: int = 0, p3: int = 0) -> bytes: | |
""" | |
Build a control packet payload with a command code and 3 parameters | |
""" | |
log.debug('building control packet', command=command, p1=p1, p2=p2, p3=p3) | |
payload = pack('B 2B 2B 2B', int(command), | |
*encode_base240(p1), | |
*encode_base240(p2), | |
*encode_base240(p3)) | |
pkt = wrap_pkt(payload) | |
log.debug('control packet bytes', pkt=hexlify(pkt)) | |
return pkt | |
def read_packet(s: serial.Serial) -> bytes: | |
""" | |
Read data from the serial port and return a complete packet. | |
It uses the SOF and EOF tags as delimiters. | |
""" | |
pkt = [] | |
while True: | |
b = s.read(1)[0] | |
if b != SOF: | |
continue | |
break | |
pkt.append(SOF) | |
while True: | |
b = s.read(1)[0] | |
pkt.append(b) | |
if b != EOF: | |
continue | |
break | |
return bytes(pkt) | |
def packet_rx(s: serial.Serial, terminate_on: list[StatusCode]): | |
""" | |
Receive and yield packets until one of the termination status codes | |
is received. | |
This is used to stop when a "Charging End" status packet is received. | |
""" | |
# fields in pkt_fields are numeric not enum | |
terminate_on = [ int(t) for t in terminate_on] | |
while True: | |
pkt = read_packet(s) | |
pkt_fields = decode_status_pkt(pkt) | |
if not pkt_fields: | |
log.warn('unknown status packet', pkt=hexlify(pkt)) | |
yield (pkt, pkt_fields) | |
if pkt_fields is not None and pkt_fields['ptype'] in terminate_on: | |
break | |
def curve_plot_filename(args: argparse.ArgumentParser, name: str = None) -> Tuple[Path, AbstractContextManager]: #pylint: disable=line-too-long | |
""" | |
Build the filename for the charge/discharge curve CSV plot and a file context | |
based on argument flags and base CSV filename. | |
""" | |
if args.output_prefix and args.save_curves: | |
csv_filename = args.output_prefix | |
if name is not None: | |
csv_filename = csv_filename.with_name(name) | |
csv_filename = csv_filename.with_suffix('.csv') | |
csv_file_ctx = csv_filename.open("w") | |
else: | |
csv_filename = None | |
csv_file_ctx = nullcontext(None) | |
return (csv_filename, csv_file_ctx) | |
def charge(s: serial.Serial, settings: dict, **kwargs): | |
""" | |
Start charging a battery using the provided settings. | |
Monitors the charging progress and stores the charge curve points | |
using a specified CSV writer. | |
Returns a dictionary with summary data about the charging process. | |
""" | |
log.info('charge begin', serial=s, settings=settings, kwargs=kwargs) | |
summary_data = { | |
'action': 'charge', | |
'algorithm': 'CC-CV', | |
# pint.Quantity cannot be generically serialized to JSON | |
'settings': { k: str(v) for (k,v) in settings.items() }, | |
'flags': {}, | |
} | |
if kwargs.get('plot_filename'): | |
summary_data['plot_filename'] = kwargs['plot_filename'] | |
if kwargs.get('transmit', False): | |
pkt = ctl_packet(CommandCode.CHRG_CCCV_START, | |
int(settings['current'].to(ureg.milliampere).m / 10), | |
# pint defines mA but does not define mV | |
int( (settings['voltage'] * 1000).m / 10 ), | |
int( settings['cutoff_current'].to(ureg.milliampere).m / 10) | |
) | |
log.info('transmit control packet', pkt=hexlify(pkt)) | |
s.write(pkt) | |
# CHRG_TIME | |
# p1 -> current time [min] | |
# p2 -> always 0 | |
# p3 -> always 0 | |
#chrg_time = ctl_packet(CommandCode.CHRG_TIME, 50, 420, 2) | |
#s.write(chrg_time) | |
summary_data['start_ts'] = time.time() | |
last_status_update = time.time() | |
for (pkt, pkt_fields) in packet_rx(s, [ StatusCode.CHRG_CCCV_END ]): | |
# In case packet was not decoded | |
if not pkt_fields: | |
continue | |
if time.time() - last_status_update > 30: | |
log.info('charging', voltage=str(pkt_fields['voltage'].to(ureg.volt)), | |
current=str(pkt_fields['current'].to(ureg.ampere)), | |
charge=str(pkt_fields['charge'].to(ureg.ampere_hour))) | |
last_status_update = time.time() | |
if kwargs.get('csv_writer'): | |
kwargs['csv_writer'].writerow({ | |
# TODO: Make precision depend on the reading like on device LCD | |
# For example 100mAh is "100mAh" but 40Ah is "40.00Ah" and 111Ah is | |
# "111.0Ah" | |
't': time.time(), | |
'voltage': round(pkt_fields['voltage'].to(ureg.volt).m, 3), | |
'chrg_current': round(pkt_fields['current'].to(ureg.ampere).m, 2), | |
}) | |
if pkt_fields['ptype'] == int(StatusCode.CHRG_CCCV_END): | |
log.info('charging finished') | |
summary_data['finish_ts'] = time.time() | |
summary_data['charge'] = str(pkt_fields['charge']) | |
summary_data['flags']['current_cutoff'] = True | |
break | |
time.sleep(1) | |
log.info('charge end', summary=summary_data) | |
return summary_data | |
def cmd_charge(args: argparse.Namespace, s: serial.Serial): | |
""" | |
Handle the 'charge' CLI command. | |
""" | |
kwargs = { 'transmit': args.transmit } | |
(csv_filename, csv_file_ctx) = curve_plot_filename(args) | |
kwargs['plot_filename'] = str(csv_filename) | |
with csv_file_ctx as csv_file: | |
if csv_file: | |
csv_writer = csv.DictWriter(csv_file, fieldnames=['t', 'voltage', 'chrg_current']) | |
csv_writer.writeheader() | |
kwargs['csv_writer'] = csv_writer | |
summary_data = charge(s, settings={ | |
'current': args.current, | |
'voltage': args.voltage, | |
'cutoff_current': args.cutoff_current | |
}, **kwargs) | |
return summary_data | |
def discharge(s: serial.Serial, settings: dict, **kwargs): | |
""" | |
Start discharging a battery using the provided settings. | |
Currently supports only the Constant Current (CC) load mode. | |
Monitors the discharge progress and stores the discharge curve points | |
using a specified CSV writer. | |
Returns a dictionary with summary data about the discharge process. | |
""" | |
log.info('discharge start', serial=s, settings=settings, kwargs=kwargs) | |
# TODO: Add support for Constant Power discharge | |
summary_data = { | |
'action': 'discharge', | |
# pint.Quantity cannot be generically serialized to JSON | |
'settings': { k: str(v) for (k,v) in settings.items() }, | |
'flags': {}, | |
} | |
summary_data['settings']['load'] = 'CC' | |
if kwargs.get('plot_filename'): | |
summary_data['plot_filename'] = kwargs['plot_filename'] | |
if kwargs.get('transmit', False): | |
# DISC-CC start | |
# p1 -> current, unit is 10 mA | |
# p2 -> cutoff voltage, unit is 10 mV | |
# p3 -> maximum time, unit is min | |
pkt = ctl_packet(CommandCode.DISCH_CC_START, | |
int(settings['current'].to(ureg.milliampere).m / 10), | |
# pint defines mA but does not define mV | |
int( (settings['cutoff_voltage'] * 1000).m / 10 ), | |
int( settings['time_limit'].to(ureg.minute).m ) | |
) | |
log.info('transmit control packet', pkt=hexlify(pkt)) | |
s.write(pkt) | |
summary_data['start_ts'] = time.time() | |
last_status_update = time.time() | |
for (pkt, pkt_fields) in packet_rx(s, [ StatusCode.DISC_CC_END ]): | |
# In case packet was not decoded | |
if not pkt_fields: | |
continue | |
if time.time() - last_status_update > 30: | |
log.info('discharging', voltage=str(pkt_fields['voltage'].to(ureg.volt)), | |
current=str(pkt_fields['current'].to(ureg.ampere)), | |
charge=str(pkt_fields['charge'].to(ureg.ampere_hour))) | |
last_status_update = time.time() | |
if kwargs.get('csv_writer'): | |
kwargs['csv_writer'].writerow({ | |
't': time.time(), | |
# TODO: Make precision configurable | |
'voltage': round(pkt_fields['voltage'].to(ureg.volt).m, 3), | |
'disch_current': round(pkt_fields['current'].to(ureg.ampere).m, 2), | |
}) | |
# TODO: Handle time limit properly | |
if pkt_fields and pkt_fields['ptype'] == int(StatusCode.DISC_CC_END): | |
log.info('discharge finished') | |
summary_data['finish_ts'] = time.time() | |
summary_data['charge'] = str(pkt_fields['charge']) | |
summary_data['flags']['voltage_cutoff'] = True | |
break | |
time.sleep(1) | |
log.info('discharge end', summary=summary_data) | |
return summary_data | |
def cmd_discharge(args: argparse.Namespace, s: serial.Serial): | |
""" | |
Handle the 'charge' CLI command. | |
""" | |
kwargs = { 'transmit' : args.transmit } | |
(csv_filename, csv_file_ctx) = curve_plot_filename(args) | |
kwargs['plot_filename'] = str(csv_filename) | |
with csv_file_ctx as csv_file: | |
if csv_file: | |
csv_writer = csv.DictWriter(csv_file, fieldnames=['t', 'voltage', 'disch_current']) | |
csv_writer.writeheader() | |
kwargs['csv_writer'] = csv_writer | |
summary_data = discharge(s, settings={ | |
'current': args.current, | |
'cutoff_voltage': args.cutoff_voltage, | |
'time_limit': args.time_limit | |
}, **kwargs) | |
return summary_data | |
def cmd_cycle(args: argparse.Namespace, s: serial.Serial): | |
""" | |
Handle the 'cycle' CLI command. | |
""" | |
log.info('cycle', serial=s, num_cycles=args.num_cycles) | |
if not args.transmit: | |
log.error('cycle requires tx enabled') | |
return None | |
main_summary_data = { | |
'action': 'cycle', | |
'settings': { | |
'num_cycles': args.num_cycles | |
}, | |
'cycles': [], | |
'result': {} | |
} | |
# Store charge measurements for each cycle | |
charge_measurements = [] | |
# Pre-charge | |
kwargs = { 'transmit': args.transmit } | |
(csv_filename, csv_file_ctx) = curve_plot_filename(args, | |
name=f'{args.output_prefix.name}-precharge') | |
kwargs['plot_filename'] = str(csv_filename) | |
with csv_file_ctx as csv_file: | |
if csv_file: | |
csv_writer = csv.DictWriter(csv_file, fieldnames=['t', 'voltage', 'chrg_current']) | |
csv_writer.writeheader() | |
kwargs['csv_writer'] = csv_writer | |
precharge_summary_data = charge(s, settings={ | |
'current': args.pre_charge_current, | |
'voltage': args.pre_charge_voltage, | |
'cutoff_current': args.pre_cutoff_current | |
}, **kwargs) | |
main_summary_data['cycles'].append(precharge_summary_data) | |
cycles_left = args.num_cycles | |
cycle_num = 1 | |
while cycles_left > 0: | |
# Test discharge | |
kwargs = { 'transmit': args.transmit } | |
(csv_filename, csv_file_ctx) = curve_plot_filename(args, | |
name=f'{args.output_prefix.name}-cycle-{cycle_num}-discharge') | |
kwargs['plot_filename'] = str(csv_filename) | |
with csv_file_ctx as csv_file: | |
if csv_file: | |
csv_writer = csv.DictWriter(csv_file, fieldnames=['t', 'voltage', 'disch_current']) | |
csv_writer.writeheader() | |
kwargs['csv_writer'] = csv_writer | |
discharge_summary_data = discharge(s, settings={ | |
'current': args.discharge_current, | |
'cutoff_voltage': args.discharge_cutoff_voltage, | |
'time_limit': args.discharge_time_limit | |
}, **kwargs) | |
charge_measurements.append(discharge_summary_data['charge']) | |
main_summary_data['cycles'].append(discharge_summary_data) | |
# Post-charge | |
kwargs = { 'transmit': args.transmit } | |
(csv_filename, csv_file_ctx) = curve_plot_filename(args, | |
name=f'{args.output_prefix.name}-cycle-{cycle_num}-postcharge') | |
kwargs['plot_filename'] = str(csv_filename) | |
with csv_file_ctx as csv_file: | |
if csv_file: | |
csv_writer = csv.DictWriter(csv_file, fieldnames=['t', 'voltage', 'chrg_current']) | |
csv_writer.writeheader() | |
kwargs['csv_writer'] = csv_writer | |
postcharge_summary_data = charge(s, settings={ | |
'current': args.post_charge_current, | |
'voltage': args.post_charge_voltage, | |
'cutoff_current': args.post_cutoff_current | |
}, **kwargs) | |
main_summary_data['cycles'].append(postcharge_summary_data) | |
cycle_num += 1 | |
cycles_left -= 1 | |
mean_capacity = mean([ureg.parse_expression(c) for c in charge_measurements]) | |
# mean is often returned as a Fraction | |
mean_capacity = pint.Quantity(float(mean_capacity.m), ureg.ampere_hour) | |
main_summary_data['results'] = { | |
# We need to parse as pint expressions again as charge measurements | |
# are presented as strings in the summary_data | |
'mean_capacity': str( round(mean_capacity, 3) ) | |
} | |
log.info('cycle finished', mean_capacity=round(mean_capacity, 3)) | |
return main_summary_data | |
def cmd_monitor(args: argparse.Namespace, s: serial.Serial): | |
""" | |
Handle the 'monitor' CLI command. | |
""" | |
log.info('monitor', serial=s) | |
if args.output_prefix and args.save_curves: | |
csv_file_ctx = args.output_prefix.with_suffix('.csv').open("w") | |
else: | |
csv_file_ctx = nullcontext(None) | |
with csv_file_ctx as csv_file: | |
if csv_file: | |
csv_writer = csv.DictWriter(csv_file, fieldnames=['t', 'voltage', 'current']) | |
csv_writer.writeheader() | |
while True: | |
pkt = read_packet(s) | |
pkt_fields = decode_status_pkt(pkt) | |
print(pkt_fields) | |
if not pkt_fields: | |
log.warn('unknown status packet', pkt=hexlify(pkt)) | |
else: | |
if csv_file: | |
csv_writer.writerow({ | |
't': time.time(), | |
# TODO: Make precision configurable | |
'voltage': round(pkt_fields['voltage'].to(ureg.volt).m, 3), | |
'current': round(pkt_fields['current'].to(ureg.ampere).m, 2), | |
}) | |
time.sleep(1) | |
def cmd_decode(args: argparse.Namespace, | |
s: serial.Serial # pylint: disable=unused-argument | |
): | |
""" | |
Handle the 'decode' CLI command. | |
""" | |
log.info('decode') | |
for pkt in args.packets: | |
pkt = re.sub(r'\s', '', pkt) | |
pkt = unhexlify(pkt) | |
pkt_fields = decode_status_pkt(pkt) | |
print(pkt_fields) | |
def pint_bind_unit(unit: pint.Unit) -> argparse.Action: | |
""" | |
Handle a CLI argument parsing it as a pint expression and | |
converting to the expected unit (for example miliamperes will be | |
converted to amperes). | |
""" | |
class PintArgument(argparse.Action): | |
""" | |
argparse action using pint. | |
""" | |
def __call__(self, parser, args, value, option_string=None): | |
try: | |
setattr( args, self.dest, ureg.parse_expression(value).to(unit) ) | |
except pint.errors.PintError as e: | |
log.error('cannot parse expression', | |
dest=self.dest, expected_unit=unit, value=value, msg=str(e)) | |
return PintArgument | |
def output_data_path(value: str) -> Path: | |
""" | |
Check if a path can be used as a prefix for the output summary | |
data. | |
""" | |
p = Path(value) | |
if not p.parent.exists(): | |
raise argparse.ArgumentTypeError(f'path {p.parent} needs to exist') | |
return p | |
def add_charge_args(parser: argparse.ArgumentParser) -> argparse.ArgumentParser: | |
""" | |
Add arguments for the 'charge' CLI command. | |
""" | |
parser.add_argument('--current', | |
metavar='A', required=True, | |
action=pint_bind_unit(ureg.ampere), | |
help='Charging current') | |
parser.add_argument('--voltage', | |
metavar='V', required=True, | |
action=pint_bind_unit(ureg.volt), | |
help='Charging voltage') | |
parser.add_argument('--cutoff-current', metavar='A', | |
action=pint_bind_unit(ureg.ampere), | |
# This is the lowest cutoff current for EBC-A20 | |
default=ureg.parse_expression("100 mA"), | |
help='Charge termination current') | |
parser.set_defaults(cmd=cmd_charge) | |
parser.set_defaults(need_serial=True) | |
return parser | |
def add_discharge_args(parser: argparse.ArgumentParser) -> argparse.ArgumentParser: | |
""" | |
Add arguments for the 'discharge' CLI command. | |
""" | |
parser.add_argument('--current', metavar='A', required=True, | |
action=pint_bind_unit(ureg.ampere), | |
help='Constant Current discharge current') | |
# Constant Power discharge not yet implemented | |
#discharge_parser.add_argument('--power', | |
# action=pint_bind_unit(ureg.watt), | |
# help='Constant Power discharge power') | |
parser.add_argument('--cutoff-voltage', metavar='V', required=True, | |
action=pint_bind_unit(ureg.volt), | |
help='Discharge cutoff voltage') | |
parser.add_argument('--time-limit', metavar='T', | |
action=pint_bind_unit(ureg.minute), | |
default=0 * ureg.second, | |
help='Time limit') | |
parser.set_defaults(cmd=cmd_discharge) | |
parser.set_defaults(need_serial=True) | |
return parser | |
def add_cycle_args(parser: argparse.ArgumentParser) -> argparse.ArgumentParser: | |
""" | |
Add arguments for the 'cycle' CLI command. | |
""" | |
parser.add_argument('--num-cycles', | |
type=int, default=1, help='Number of measurement test cycles') | |
prepare_charge_group = parser.add_argument_group('Prepare Charge') | |
prepare_charge_group.add_argument('--pre-charge-current', | |
metavar='A', required=True, | |
action=pint_bind_unit(ureg.ampere), | |
help='Charging current') | |
prepare_charge_group.add_argument('--pre-charge-voltage', | |
metavar='V', required=True, | |
action=pint_bind_unit(ureg.volt), | |
help='Charging voltage') | |
prepare_charge_group.add_argument('--pre-cutoff-current', | |
metavar='A', | |
action=pint_bind_unit(ureg.ampere), | |
default=ureg.parse_expression('100 mA'), | |
help='Charge termination current') | |
test_discharge_group = parser.add_argument_group('Test discharge') | |
test_discharge_group.add_argument('--discharge-current', | |
metavar='A', required=True, | |
action=pint_bind_unit(ureg.ampere), | |
help='Discharging current') | |
test_discharge_group.add_argument('--discharge-cutoff-voltage', | |
metavar='V', required=True, | |
action=pint_bind_unit(ureg.volt), | |
help='Discharge cutoff voltage') | |
test_discharge_group.add_argument('--discharge-time-limit', | |
metavar='T', | |
action=pint_bind_unit(ureg.minute), | |
default=0 * ureg.second, | |
help='Discharge time limit') | |
post_charge_group = parser.add_argument_group('Post charge') | |
post_charge_group.add_argument('--post-charge-current', | |
metavar='A', required=True, | |
action=pint_bind_unit(ureg.ampere), | |
help='Charging current') | |
post_charge_group.add_argument('--post-charge-voltage', | |
metavar='V', required=True, | |
action=pint_bind_unit(ureg.volt), | |
help='Charging voltage') | |
post_charge_group.add_argument('--post-cutoff-current', | |
metavar='T', | |
action=pint_bind_unit(ureg.ampere), | |
help='Charging termination current') | |
parser.set_defaults(cmd=cmd_cycle) | |
parser.set_defaults(need_serial=True) | |
return parser | |
def add_monitor_args(parser: argparse.ArgumentParser) -> argparse.ArgumentParser: | |
""" | |
Add arguments for the 'monitor' CLI command. | |
""" | |
parser.set_defaults(cmd=cmd_monitor) | |
parser.set_defaults(need_serial=True) | |
return parser | |
def add_decode_args(parser: argparse.ArgumentParser) -> argparse.ArgumentParser: | |
""" | |
Add arguments for the 'decode' CLI command. | |
""" | |
parser.add_argument('packets', metavar='PKT', nargs='+', | |
help='Hexlified packets to decode') | |
parser.set_defaults(cmd=cmd_decode) | |
parser.set_defaults(need_serial=False) | |
parser.set_defaults(transmit=False) | |
return parser | |
def main(): | |
""" | |
Main CLI function. | |
""" | |
parser = argparse.ArgumentParser('ebc-a20', | |
description='CLI for the ZKETECH EBC-A20 Charger/Dummy Load') | |
parser.add_argument('--port', type=str, default='/dev/ttyUSB0', | |
help='Serial port to use') | |
parser.add_argument('--loglevel', choices=LOG_LEVEL_NAMES, default='INFO', | |
help='Change log level') | |
parser.add_argument('--transmit', action=argparse.BooleanOptionalAction, default=True, | |
help='Transmit commands to the charger') | |
parser.add_argument('--output-prefix', metavar='PATH', default=None, | |
type=output_data_path, help='Output path prefix for charge/discharge data') | |
parser.add_argument('--save-curves', action=argparse.BooleanOptionalAction, default=True, | |
help='Save charge/discharge curves as .csv') | |
parser.set_defaults(need_serial=False) | |
subparsers = parser.add_subparsers(help='sub-commands help') | |
charge_parser = subparsers.add_parser('charge', help='Charge a cell') | |
add_charge_args(charge_parser) | |
discharge_parser = subparsers.add_parser('discharge', help='Discharge a cell') | |
add_discharge_args(discharge_parser) | |
cycle_parser = subparsers.add_parser('cycle', | |
help='Charge-Discharge-Charge capacity measurement cycle') | |
add_cycle_args(cycle_parser) | |
monitor_parser = subparsers.add_parser('monitor', | |
help='Monitor and log the charger state') | |
add_monitor_args(monitor_parser) | |
decode_parser = subparsers.add_parser('decode', help='Test packet decoder') | |
add_decode_args(decode_parser) | |
args = parser.parse_args() | |
structlog.configure( | |
wrapper_class=structlog.make_filtering_bound_logger(getattr(logging, args.loglevel)) | |
) | |
log.debug('arguments', args=args) | |
try: | |
if args.need_serial: | |
s = serial.Serial(port=args.port, | |
baudrate=9600, | |
bytesize=serial.EIGHTBITS, | |
parity=serial.PARITY_ODD) | |
else: | |
s = None | |
if args.transmit and s is not None: | |
pkt = ctl_packet(CommandCode.CONNECT) | |
log.info('transmit control packet', pkt=hexlify(pkt)) | |
s.write(pkt) | |
if hasattr(args, 'cmd'): | |
summary_data = args.cmd(args, s=s if args.need_serial else None) | |
if args.output_prefix: | |
args.output_prefix.with_suffix('.json').write_text(json.dumps(summary_data)) | |
else: | |
log.fatal('command needs to be specified') | |
except KeyboardInterrupt: | |
pass | |
finally: | |
if args.transmit: | |
pkt = ctl_packet(CommandCode.STOP) | |
log.info('transmit control packet', pkt=hexlify(pkt)) | |
s.write(pkt) | |
pkt = ctl_packet(CommandCode.DISCONNECT) | |
log.info('transmit control packet', pkt=hexlify(pkt)) | |
s.write(pkt) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment