Skip to content

Instantly share code, notes, and snippets.

@enkiusz
Last active April 20, 2025 09:26
Show Gist options
  • Save enkiusz/6408645efd622b8a638a14957cd37f47 to your computer and use it in GitHub Desktop.
Save enkiusz/6408645efd622b8a638a14957cd37f47 to your computer and use it in GitHub Desktop.
ZKETECH EBC-A20 Control code
#!/usr/bin/env python3
"""
A simple CLI tool to control and monitor the ZKETECH EBC-A20 battery
tester from https://www.zketech.com/en/
It can trigger charge or discharge as well as perform charge-discharge
cycles to measure battery capacity.
Currenly only the CC (Constant Current) load type is implemented, the
CP (Constant Power) load type is a simple fixme. Additionally, some
small improvement opportunities are documented in the code as FIXME
comments.
Miscellaneous features include monitoring packets being sent out by the
tester without triggering any actions and decoding of arbitrary status
packets.
"""
#pylint: disable=too-many-lines
import argparse
import csv
import json
import logging
import operator
import re
import time
from binascii import hexlify, unhexlify
from collections import namedtuple
from contextlib import nullcontext, AbstractContextManager
from enum import IntEnum
from functools import reduce
from pathlib import Path
from statistics import mean
from struct import pack, unpack
from typing import Tuple
import pint
import serial
import structlog
# Reference: https://stackoverflow.com/a/49724281
LOG_LEVEL_NAMES = [logging.getLevelName(v) for v in
sorted(getattr(logging, '_levelToName', None) or
logging._levelNames) if getattr(v, "real", 0)]
log = structlog.get_logger()
ureg = pint.UnitRegistry()
class ZKETechEBCModels(IntEnum):
"""
ZKETECH EBC model codes.
"""
EBC_A05 = 0x05
EBC_A10H = 0x06
EBC_A20 = 0x09
# Packet start & end markers
SOF = 0xfa
EOF = 0xf8
class CommandCode(IntEnum):
"""
Command codes for building packets sent from the PC
to the battery tester.
"""
CONNECT = 0x05
DISCONNECT = 0x06
STOP = 0x02
CHRG_CCCV_START = 0x21
# Send the number of minutes of charging time elapsed to the charger ???
# Sent every 60 seconds after CHRG_CCCV is sent
CHRG_TIME = 0x0a
DISCH_CC_START = 0x01
DISCH_CC_ADJUST = 0x07
DISCH_CC_STOP = 0x08
class StatusCode(IntEnum):
"""
Status codes for decoding packets sent from the
battery tester to the PC.
"""
# ???
# Sent when charger is idle
IDLE = 0x02
IDLE_MONITOR = 0x66
CHRG_CCCV = 0x0c
CHRG_CCCV_MONITOR = 0x70
CHRG_CCCV_END = 0x16
# Why two status codes during CC discharge
DISC_CC_IDLE = 0x00
DISC_CC = 0x0a
DISC_CC_END = 0x14
DISC_CC_MONITOR = 0x64
UNK1 = 0x6e
# This encoding is used to prevent bytes > 240 to prevent in the byte stream.
# This feature allows the bytestream to safely use 0xfa and 0xf8 as SOF (Start Of Frame)
# and EOF (End Of Frame)
def encode_base240(value: int) -> Tuple[int, int]:
"""
Encode a value into base240 encoded 2 byte sequence.
"""
assert value < 0xf0 * 0xf0 + 0xf0
h = value // 0xf0
l = value - h * 0xf0
return (h,l)
def decode_base240(h: int, l: int) -> int:
"""
Decode a base240 encoded 2 byte sequence into a single integer.
"""
return 0xf0 * h + l
#pylint: disable=line-too-long
# Reference: https://github.com/dev-strom/esp-ebc-mqtt/blob/main/lib/commands/EbcA20.cpp
# bool EbcA20::Decode(uint16_t source, double& value, ParameterPacking pp) const
# {
# switch (pp) // PP_None, PP_V, PP_V_set, PP_A, PP_A_set, PP_P, PP_T, PP_Ah
# {
# case PP_V:
# // TODO: is PP_V the same way decoded as PP_Ah? (don't know, because I never had a voltage above 10V)
# value = (source == 0x0000) ? 0.0 : ((static_cast<double> (((source >> 8) * 240) + (source & 0xFF))) / 1000.0);
# break;
# case PP_Ah:
# if (source & 0x8000) {
# // capacity >= 10.0 Ah
# if ((source & 0xE000) == 0xE000) {
# // capacity >= 200.0 Ah
# value = ((static_cast<double> ((((source >> 8) & 0x3F) * 240) + (source & 0xFF) - 0x1C00)) / 10.0);
# } else {
# // capacity < 200.0 Ah
# value = ((static_cast<double> ((((source >> 8) & 0x7F) * 240) + (source & 0xFF) - 0x0800)) / 100.0);
# }
# } else {
# // capacity < 10.0 Ah
# value = (source == 0x0000) ? 0.0 : ((static_cast<double> (((source >> 8) * 240) + (source & 0xFF))) / 1000.0);
# }
#pylint: enable=line-too-long
def decode_ranged(h: int, l: int) -> float:
"""
Decode a base240 value to a float taking into account different offsets and scaling
factors for multiple ranges.
"""
if h & 0x80 == 0x80:
if h & 0xe0 == 0xe0:
# range is > 200.0
v = 240 * (h & 0x3f) + l
m = 0.1
offset = 0x1c00
else:
# 10.00 < range < 200.0
v = 240 * (h & 0x7f) + l
m = 0.01
offset = 0x800
else:
# range is < 10.00
v = 240 * h + l
m = 0.001
offset = 0
return (v - offset) * m
Field = namedtuple('Field', 'key value')
#pylint: disable=unnecessary-lambda
pkt_field_defs = {
# Example packet:
# fa 02 0000 0a13 0014 0000 0032 010a 000a 09 35 f8
(ZKETechEBCModels.EBC_A20, StatusCode.IDLE): [
# Sent when device is idle ('OFF' on display)
# Current data
Field(key='current',
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere),
Field(key='voltage',
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.volt),
Field(key='charge',
value=lambda h,l: decode_ranged(h,l) * ureg.ampere_hour),
# Does not seem to work, always 0
Field(key='energy',
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.watt_hour),
# Setpoint
Field(key='chrg_cc_current',
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere),
Field(key='chrg_cv_voltage',
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.volt),
Field(key='chrg_cutoff_current',
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere)
],
# Example packet:
# fa 66 0000 0888 0014 0000 013e 0c8f 0905 09 4b f8
(0x09, StatusCode.IDLE_MONITOR): [
# Sent when device is idle ('OFF' on display)
# Contains FW and/or hardware information
# Current data
Field(key='current',
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere),
Field(key='voltage',
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.volt),
Field(key='charge',
value=lambda h,l: decode_ranged(h,l) * ureg.ampere_hour),
# Does not seem to work, always 0
Field(key='energy',
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.watt_hour),
# Device
# Displayed in the ZKETECH application UI as 'V3.02'
Field(key='fw_ver',
value=lambda h,l: str(decode_base240(h,l) / 100)),
Field(key='unk2',
value=lambda h,l: decode_base240(h,l)), # Always 3023
Field(key='unk3',
value=lambda h,l: decode_base240(h,l)) # Always 2165
],
# Example packets:V3
# fa 0c 0032 07de 0000 0000 0032 01b4 000a 09 63 f8
# fa 0c 0214 1059 299e 0000 0214 01b4 000a 09 44 f8
# Charge = 9998 mAh
# fa 0c 0214 1059 8ca8 0000 0214 01b4 000a 09 d7 f8
# Charge > 10000 mAh = 10 Ah display will be 10.01 Ah
# fa 0c 0214 1078 9124 0000 0214 01b4 000a 09 67 f8
# charge = 20.69 Ah
# fa 0c 000b 1178 c478 0000 0214 01b4 000a 09 72 f8
# charge = 143.9 Ah
(ZKETechEBCModels.EBC_A20, StatusCode.CHRG_CCCV): [
# Mode: CHRG-CCCV
# Sent in both CC and CV phases of charging
# Current data
Field(key='current',
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere),
Field(key='voltage',
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.volt),
Field(key='charge',
value=lambda h,l: decode_ranged(h,l) * ureg.ampere_hour),
# Does not seem to work, always 0
Field(key='energy',
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.watt_hour),
# Setpoint
Field(key='chrg_cccv_current',
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere),
Field(key='chrg_cccv_voltage',
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.volt),
Field(key='chrg_cutoff_current',
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere)
],
# Example packet:
# fa 70 0000 0453 0000 0000 013e 0c8f 0905 09 9e f8
(ZKETechEBCModels.EBC_A20, StatusCode.CHRG_CCCV_MONITOR): [
# Mode: CHRG-CCCV
# Sent during the first few seconds of charging
# Current data
Field(key='current',
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere),
Field(key='voltage',
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.volt),
Field(key='charge',
value=lambda h,l: decode_ranged(h,l) * ureg.ampere_hour),
# Does not seem to work, always 0
Field(key='energy',
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.watt_hour),
# Device
# Displayed in the ZKETECH application UI as 'V3.02'
Field(key='fw_ver', value=lambda h,l: str(decode_base240(h,l) / 100)),
Field(key='unk2', value=lambda h,l: decode_base240(h,l)), # Always 3023
Field(key='unk3', value=lambda h,l: decode_base240(h,l)) # Always 2165
],
# Example packet:
# fa 16 000a 0a64 0014 0000 0032 010a 000a 09 5c f8
(ZKETechEBCModels.EBC_A20, StatusCode.CHRG_CCCV_END): [
# Mode: CHRG-CCCV
# Sent when charging stops due to current cutoff reached
# Current data
Field(key='current',
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere),
Field(key='voltage',
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.volt),
Field(key='charge',
value=lambda h,l: decode_ranged(h,l) * ureg.ampere_hour),
# Does not seem to work, always 0
Field(key='energy',
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.watt_hour),
# Setpoint
Field(key='cccv_current',
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere),
Field(key='cccv_voltage',
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.volt),
Field(key='cutoff_current',
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere)
],
# Example packet:
# fa 00 0000 1049 0000 0000 0032 013c 0078 09 27 f8
(ZKETechEBCModels.EBC_A20, StatusCode.DISC_CC_IDLE): [
# Mode: DISC-CC
# Current data
Field(key='current',
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere),
Field(key='voltage',
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.volt),
Field(key='charge',
value=lambda h,l: decode_ranged(h,l) * ureg.ampere_hour),
# Does not seem to work, always 0
Field(key='energy',
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.watt_hour),
# Setpoint
Field(key='disc_cc_current', value=
lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere),
Field(key='disc_cc_cutoff_voltage', value=
lambda h,l: decode_base240(h,l) * 0.01 * ureg.volt),
Field(key='disc_max_time', value=
lambda h,l: decode_base240(h,l) * ureg.minute)
],
# Example packets:
# fa 0a 0214 0f22 bbcf 0000 0214 013c 0000 09 67 f8
# here the charge is > 100 Ah
# charge == 123.2Ah
# fa 0a 0214 0e80 dbd0 0000 0214 013c 0000 09 bb f8
# charge = 200.0 (decoded)
# fa 0a 0214 0e80 e630 0000 0214 013c 0000 09 66 f8
# charge = 200.0Ah
# fa 00 0000 0dbf e777 0000 0214 013c 0000 09 00 f8
# charge = 231.1Ah
# fa 0a 0214 0e4f e68d 0000 0214 013c 0000 09 14 f8
# charge = 209.3Ah
# BUG!!!
# 2025-01-22 18:46:37 [debug ] decode status pkt
# pkt=b'fa0a02140f01cf8800000214013c00000977f8'
# {'.calculated_crc': 119, '.crc_check': True, 'ptype': <StatusCode.DISC_CC: 10>,
# 'dev_model': <ZKETechEBCModels.EBC_A20: 9>,
# 'current': <Quantity(5.0, 'ampere')>, 'voltage': <Quantity(3.601, 'volt')>,
# 'charge': <Quantity(-343.2, 'ampere_hour')>,
# 'energy': <Quantity(0.0, 'watt_hour')>, 'disc_cc_current': <Quantity(5.0, 'ampere')>,
# 'disc_cc_cutoff_voltage': <Quantity(3.0, 'volt')>, 'disc_max_time': <Quantity(0, 'minute')>}
# charge is 170.5Ah on display
(ZKETechEBCModels.EBC_A20, StatusCode.DISC_CC): [
# Mode: DISC-CC
# Current data
Field(key='current',
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere),
Field(key='voltage',
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.volt),
Field(key='charge',
value=lambda h,l: decode_ranged(h,l) * ureg.ampere_hour),
# Does not seem to work, always 0
Field(key='energy',
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.watt_hour),
# Setpoint
Field(key='disc_cc_current',
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere),
Field(key='disc_cc_cutoff_voltage',
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.volt),
Field(key='disc_max_time',
value=lambda h,l: decode_base240(h,l) * ureg.minute)
],
(ZKETechEBCModels.EBC_A20, StatusCode.DISC_CC_MONITOR): [
# Current data
Field(key='current',
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere),
Field(key='voltage',
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.volt),
Field(key='charge',
value=lambda h,l: decode_ranged(h,l) * ureg.ampere_hour),
# Does not seem to work, always 0
Field(key='energy',
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.watt_hour),
# Device
# Displayed in the ZKETECH application UI as 'V3.02'
Field(key='fw_ver',
value=lambda h,l: str(decode_base240(h,l) / 100)),
Field(key='unk2',
value=lambda h,l: decode_base240(h,l)), # Always 3023
Field(key='unk3',
value=lambda h,l: decode_base240(h,l)) # Always 2165
],
# Example packet:
# fa 0a 0032 0f41 0002 0000 0032 013c 003c 09 4e f8
(ZKETechEBCModels.EBC_A20, StatusCode.DISC_CC): [ # Mode: D-CC
# Current data
Field(key='current',
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere),
Field(key='voltage',
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.volt),
Field(key='charge',
value=lambda h,l: decode_ranged(h,l) * ureg.ampere_hour),
# Does not seem to work, always 0
Field(key='energy',
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.watt_hour),
# Setpoint
Field(key='disc_cc_current',
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere),
Field(key='disc_cc_cutoff_voltage',
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.volt),
Field(key='disc_max_time',
value=lambda h,l: decode_base240(h,l) * ureg.minute)
],
# Example packet:
# fa 14 0032 0c77 0159 0000 0032 013c 0078 09 7b f8
(ZKETechEBCModels.EBC_A20, StatusCode.DISC_CC_END): [ # Mode: D-CC
# Current data
Field(key='current',
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere),
Field(key='voltage',
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.volt),
Field(key='charge',
value=lambda h,l: decode_ranged(h,l) * ureg.ampere_hour),
# Does not seem to work, always 0
Field(key='energy',
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.watt_hour),
# Setpoint
Field(key='disc_cc_current',
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere),
Field(key='disc_cc_cutoff_voltage',
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.volt),
Field(key='disc_max_time',
value=lambda h,l: decode_base240(h,l) * ureg.minute)
],
# Example packet:
# fa 6e 0032 0f41 0002 0000 013e 0c8f 0905 09 a9 f8
(ZKETechEBCModels.EBC_A20, StatusCode.UNK1): [
# Current data
Field(key='current',
value=lambda h,l: decode_base240(h,l) * 0.01 * ureg.ampere),
Field(key='voltage',
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.volt),
Field(key='charge',
value=lambda h,l: decode_ranged(h,l) * ureg.ampere_hour),
# Does not seem to work, always 0
Field(key='energy',
value=lambda h,l: decode_base240(h,l) * 0.001 * ureg.watt_hour),
# Device
# Displayed in the ZKETECH application UI as 'V3.02'
Field(key='fw_ver', value=lambda h,l: str(decode_base240(h,l) /100)),
Field(key='unk2', value=lambda h,l: decode_base240(h,l)),
Field(key='unk3', value=lambda h,l: decode_base240(h,l))
],
}
#pylint: enable=unnecessary-lambda
def decode_status_pkt(pkt: bytes) -> dict:
"""
Decode a status packet received from the battery tester into
a dictionary of fields.
Some of the fields are metadata fields which contain information
about the decoding process itself. For example, the '.crc_check'
key contains a boolean indicating whether the CRC is valid for the
status packet.
"""
log.debug('decode status pkt', pkt=hexlify(pkt))
if len(pkt) < 4:
log.error('packet too short', pkt=hexlify(pkt))
return None
pktb = bytearray(pkt)
pkt_fields = {}
marker1 = pktb.pop(0)
marker2 = pktb.pop()
crc = pktb.pop()
if marker1 != SOF or marker2 != EOF:
log.error('unknown packet format', pkt=hexlify(pkt))
return None
calculated_crc = reduce(operator.xor, pktb)
pkt_fields['.calculated_crc'] = calculated_crc
# CRC values which have the highest 4 bits set are transmitted
# with those bits turned off to not insert a SOF or EOF byte
# by chance and confuse the framing decoder
# Reference: https://github.com/dev-strom/esp-ebc-mqtt/blob/main/lib/commands/message.cpp
if (calculated_crc & 0xf0) == 0xf0:
calculated_crc = calculated_crc & 0x0f
if calculated_crc != crc:
log.warn('crc verification failed', pkt=hexlify(pkt))
pkt_fields['.crc_check'] = False
else:
pkt_fields['.crc_check'] = True
ptype = pktb.pop(0)
dev_model = pktb.pop()
if ptype in StatusCode and dev_model in ZKETechEBCModels:
ptype = StatusCode( ptype )
dev_model = ZKETechEBCModels( dev_model )
pkt_fields['ptype'] = ptype
pkt_fields['dev_model'] = dev_model
field_defs = pkt_field_defs.get( (dev_model, ptype) )
if not field_defs:
log.error('no field definitions', dev_model=dev_model, ptype=ptype, pkt=hexlify(pkt))
return None
for field_def in field_defs:
(h, l) = unpack('2B', pktb[0:2])
pkt_fields[field_def.key] = field_def.value(h, l)
pktb = pktb[2:]
return pkt_fields
log.warn('cannot recognize packet', packet_type=ptype, device=dev_model, pkt=hexlify(pkt))
return None
def wrap_pkt(payload: bytes) -> bytes:
"""
Wrap the control packet payload with a CRC and start/stop frame indicators.
"""
crc = reduce(operator.xor, payload)
return bytes([ SOF ]) + payload + bytes([crc]) + bytes([ EOF ])
def ctl_packet(command: CommandCode, p1: int = 0, p2: int = 0, p3: int = 0) -> bytes:
"""
Build a control packet payload with a command code and 3 parameters
"""
log.debug('building control packet', command=command, p1=p1, p2=p2, p3=p3)
payload = pack('B 2B 2B 2B', int(command),
*encode_base240(p1),
*encode_base240(p2),
*encode_base240(p3))
pkt = wrap_pkt(payload)
log.debug('control packet bytes', pkt=hexlify(pkt))
return pkt
def read_packet(s: serial.Serial) -> bytes:
"""
Read data from the serial port and return a complete packet.
It uses the SOF and EOF tags as delimiters.
"""
pkt = []
while True:
b = s.read(1)[0]
if b != SOF:
continue
break
pkt.append(SOF)
while True:
b = s.read(1)[0]
pkt.append(b)
if b != EOF:
continue
break
return bytes(pkt)
def packet_rx(s: serial.Serial, terminate_on: list[StatusCode]):
"""
Receive and yield packets until one of the termination status codes
is received.
This is used to stop when a "Charging End" status packet is received.
"""
# fields in pkt_fields are numeric not enum
terminate_on = [ int(t) for t in terminate_on]
while True:
pkt = read_packet(s)
pkt_fields = decode_status_pkt(pkt)
if not pkt_fields:
log.warn('unknown status packet', pkt=hexlify(pkt))
yield (pkt, pkt_fields)
if pkt_fields is not None and pkt_fields['ptype'] in terminate_on:
break
def curve_plot_filename(args: argparse.ArgumentParser, name: str = None) -> Tuple[Path, AbstractContextManager]: #pylint: disable=line-too-long
"""
Build the filename for the charge/discharge curve CSV plot and a file context
based on argument flags and base CSV filename.
"""
if args.output_prefix and args.save_curves:
csv_filename = args.output_prefix
if name is not None:
csv_filename = csv_filename.with_name(name)
csv_filename = csv_filename.with_suffix('.csv')
csv_file_ctx = csv_filename.open("w")
else:
csv_filename = None
csv_file_ctx = nullcontext(None)
return (csv_filename, csv_file_ctx)
def charge(s: serial.Serial, settings: dict, **kwargs):
"""
Start charging a battery using the provided settings.
Monitors the charging progress and stores the charge curve points
using a specified CSV writer.
Returns a dictionary with summary data about the charging process.
"""
log.info('charge begin', serial=s, settings=settings, kwargs=kwargs)
summary_data = {
'action': 'charge',
'algorithm': 'CC-CV',
# pint.Quantity cannot be generically serialized to JSON
'settings': { k: str(v) for (k,v) in settings.items() },
'flags': {},
}
if kwargs.get('plot_filename'):
summary_data['plot_filename'] = kwargs['plot_filename']
if kwargs.get('transmit', False):
pkt = ctl_packet(CommandCode.CHRG_CCCV_START,
int(settings['current'].to(ureg.milliampere).m / 10),
# pint defines mA but does not define mV
int( (settings['voltage'] * 1000).m / 10 ),
int( settings['cutoff_current'].to(ureg.milliampere).m / 10)
)
log.info('transmit control packet', pkt=hexlify(pkt))
s.write(pkt)
# CHRG_TIME
# p1 -> current time [min]
# p2 -> always 0
# p3 -> always 0
#chrg_time = ctl_packet(CommandCode.CHRG_TIME, 50, 420, 2)
#s.write(chrg_time)
summary_data['start_ts'] = time.time()
last_status_update = time.time()
for (pkt, pkt_fields) in packet_rx(s, [ StatusCode.CHRG_CCCV_END ]):
# In case packet was not decoded
if not pkt_fields:
continue
if time.time() - last_status_update > 30:
log.info('charging', voltage=str(pkt_fields['voltage'].to(ureg.volt)),
current=str(pkt_fields['current'].to(ureg.ampere)),
charge=str(pkt_fields['charge'].to(ureg.ampere_hour)))
last_status_update = time.time()
if kwargs.get('csv_writer'):
kwargs['csv_writer'].writerow({
# TODO: Make precision depend on the reading like on device LCD
# For example 100mAh is "100mAh" but 40Ah is "40.00Ah" and 111Ah is
# "111.0Ah"
't': time.time(),
'voltage': round(pkt_fields['voltage'].to(ureg.volt).m, 3),
'chrg_current': round(pkt_fields['current'].to(ureg.ampere).m, 2),
})
if pkt_fields['ptype'] == int(StatusCode.CHRG_CCCV_END):
log.info('charging finished')
summary_data['finish_ts'] = time.time()
summary_data['charge'] = str(pkt_fields['charge'])
summary_data['flags']['current_cutoff'] = True
break
time.sleep(1)
log.info('charge end', summary=summary_data)
return summary_data
def cmd_charge(args: argparse.Namespace, s: serial.Serial):
"""
Handle the 'charge' CLI command.
"""
kwargs = { 'transmit': args.transmit }
(csv_filename, csv_file_ctx) = curve_plot_filename(args)
kwargs['plot_filename'] = str(csv_filename)
with csv_file_ctx as csv_file:
if csv_file:
csv_writer = csv.DictWriter(csv_file, fieldnames=['t', 'voltage', 'chrg_current'])
csv_writer.writeheader()
kwargs['csv_writer'] = csv_writer
summary_data = charge(s, settings={
'current': args.current,
'voltage': args.voltage,
'cutoff_current': args.cutoff_current
}, **kwargs)
return summary_data
def discharge(s: serial.Serial, settings: dict, **kwargs):
"""
Start discharging a battery using the provided settings.
Currently supports only the Constant Current (CC) load mode.
Monitors the discharge progress and stores the discharge curve points
using a specified CSV writer.
Returns a dictionary with summary data about the discharge process.
"""
log.info('discharge start', serial=s, settings=settings, kwargs=kwargs)
# TODO: Add support for Constant Power discharge
summary_data = {
'action': 'discharge',
# pint.Quantity cannot be generically serialized to JSON
'settings': { k: str(v) for (k,v) in settings.items() },
'flags': {},
}
summary_data['settings']['load'] = 'CC'
if kwargs.get('plot_filename'):
summary_data['plot_filename'] = kwargs['plot_filename']
if kwargs.get('transmit', False):
# DISC-CC start
# p1 -> current, unit is 10 mA
# p2 -> cutoff voltage, unit is 10 mV
# p3 -> maximum time, unit is min
pkt = ctl_packet(CommandCode.DISCH_CC_START,
int(settings['current'].to(ureg.milliampere).m / 10),
# pint defines mA but does not define mV
int( (settings['cutoff_voltage'] * 1000).m / 10 ),
int( settings['time_limit'].to(ureg.minute).m )
)
log.info('transmit control packet', pkt=hexlify(pkt))
s.write(pkt)
summary_data['start_ts'] = time.time()
last_status_update = time.time()
for (pkt, pkt_fields) in packet_rx(s, [ StatusCode.DISC_CC_END ]):
# In case packet was not decoded
if not pkt_fields:
continue
if time.time() - last_status_update > 30:
log.info('discharging', voltage=str(pkt_fields['voltage'].to(ureg.volt)),
current=str(pkt_fields['current'].to(ureg.ampere)),
charge=str(pkt_fields['charge'].to(ureg.ampere_hour)))
last_status_update = time.time()
if kwargs.get('csv_writer'):
kwargs['csv_writer'].writerow({
't': time.time(),
# TODO: Make precision configurable
'voltage': round(pkt_fields['voltage'].to(ureg.volt).m, 3),
'disch_current': round(pkt_fields['current'].to(ureg.ampere).m, 2),
})
# TODO: Handle time limit properly
if pkt_fields and pkt_fields['ptype'] == int(StatusCode.DISC_CC_END):
log.info('discharge finished')
summary_data['finish_ts'] = time.time()
summary_data['charge'] = str(pkt_fields['charge'])
summary_data['flags']['voltage_cutoff'] = True
break
time.sleep(1)
log.info('discharge end', summary=summary_data)
return summary_data
def cmd_discharge(args: argparse.Namespace, s: serial.Serial):
"""
Handle the 'charge' CLI command.
"""
kwargs = { 'transmit' : args.transmit }
(csv_filename, csv_file_ctx) = curve_plot_filename(args)
kwargs['plot_filename'] = str(csv_filename)
with csv_file_ctx as csv_file:
if csv_file:
csv_writer = csv.DictWriter(csv_file, fieldnames=['t', 'voltage', 'disch_current'])
csv_writer.writeheader()
kwargs['csv_writer'] = csv_writer
summary_data = discharge(s, settings={
'current': args.current,
'cutoff_voltage': args.cutoff_voltage,
'time_limit': args.time_limit
}, **kwargs)
return summary_data
def cmd_cycle(args: argparse.Namespace, s: serial.Serial):
"""
Handle the 'cycle' CLI command.
"""
log.info('cycle', serial=s, num_cycles=args.num_cycles)
if not args.transmit:
log.error('cycle requires tx enabled')
return None
main_summary_data = {
'action': 'cycle',
'settings': {
'num_cycles': args.num_cycles
},
'cycles': [],
'result': {}
}
# Store charge measurements for each cycle
charge_measurements = []
# Pre-charge
kwargs = { 'transmit': args.transmit }
(csv_filename, csv_file_ctx) = curve_plot_filename(args,
name=f'{args.output_prefix.name}-precharge')
kwargs['plot_filename'] = str(csv_filename)
with csv_file_ctx as csv_file:
if csv_file:
csv_writer = csv.DictWriter(csv_file, fieldnames=['t', 'voltage', 'chrg_current'])
csv_writer.writeheader()
kwargs['csv_writer'] = csv_writer
precharge_summary_data = charge(s, settings={
'current': args.pre_charge_current,
'voltage': args.pre_charge_voltage,
'cutoff_current': args.pre_cutoff_current
}, **kwargs)
main_summary_data['cycles'].append(precharge_summary_data)
cycles_left = args.num_cycles
cycle_num = 1
while cycles_left > 0:
# Test discharge
kwargs = { 'transmit': args.transmit }
(csv_filename, csv_file_ctx) = curve_plot_filename(args,
name=f'{args.output_prefix.name}-cycle-{cycle_num}-discharge')
kwargs['plot_filename'] = str(csv_filename)
with csv_file_ctx as csv_file:
if csv_file:
csv_writer = csv.DictWriter(csv_file, fieldnames=['t', 'voltage', 'disch_current'])
csv_writer.writeheader()
kwargs['csv_writer'] = csv_writer
discharge_summary_data = discharge(s, settings={
'current': args.discharge_current,
'cutoff_voltage': args.discharge_cutoff_voltage,
'time_limit': args.discharge_time_limit
}, **kwargs)
charge_measurements.append(discharge_summary_data['charge'])
main_summary_data['cycles'].append(discharge_summary_data)
# Post-charge
kwargs = { 'transmit': args.transmit }
(csv_filename, csv_file_ctx) = curve_plot_filename(args,
name=f'{args.output_prefix.name}-cycle-{cycle_num}-postcharge')
kwargs['plot_filename'] = str(csv_filename)
with csv_file_ctx as csv_file:
if csv_file:
csv_writer = csv.DictWriter(csv_file, fieldnames=['t', 'voltage', 'chrg_current'])
csv_writer.writeheader()
kwargs['csv_writer'] = csv_writer
postcharge_summary_data = charge(s, settings={
'current': args.post_charge_current,
'voltage': args.post_charge_voltage,
'cutoff_current': args.post_cutoff_current
}, **kwargs)
main_summary_data['cycles'].append(postcharge_summary_data)
cycle_num += 1
cycles_left -= 1
mean_capacity = mean([ureg.parse_expression(c) for c in charge_measurements])
# mean is often returned as a Fraction
mean_capacity = pint.Quantity(float(mean_capacity.m), ureg.ampere_hour)
main_summary_data['results'] = {
# We need to parse as pint expressions again as charge measurements
# are presented as strings in the summary_data
'mean_capacity': str( round(mean_capacity, 3) )
}
log.info('cycle finished', mean_capacity=round(mean_capacity, 3))
return main_summary_data
def cmd_monitor(args: argparse.Namespace, s: serial.Serial):
"""
Handle the 'monitor' CLI command.
"""
log.info('monitor', serial=s)
if args.output_prefix and args.save_curves:
csv_file_ctx = args.output_prefix.with_suffix('.csv').open("w")
else:
csv_file_ctx = nullcontext(None)
with csv_file_ctx as csv_file:
if csv_file:
csv_writer = csv.DictWriter(csv_file, fieldnames=['t', 'voltage', 'current'])
csv_writer.writeheader()
while True:
pkt = read_packet(s)
pkt_fields = decode_status_pkt(pkt)
print(pkt_fields)
if not pkt_fields:
log.warn('unknown status packet', pkt=hexlify(pkt))
else:
if csv_file:
csv_writer.writerow({
't': time.time(),
# TODO: Make precision configurable
'voltage': round(pkt_fields['voltage'].to(ureg.volt).m, 3),
'current': round(pkt_fields['current'].to(ureg.ampere).m, 2),
})
time.sleep(1)
def cmd_decode(args: argparse.Namespace,
s: serial.Serial # pylint: disable=unused-argument
):
"""
Handle the 'decode' CLI command.
"""
log.info('decode')
for pkt in args.packets:
pkt = re.sub(r'\s', '', pkt)
pkt = unhexlify(pkt)
pkt_fields = decode_status_pkt(pkt)
print(pkt_fields)
def pint_bind_unit(unit: pint.Unit) -> argparse.Action:
"""
Handle a CLI argument parsing it as a pint expression and
converting to the expected unit (for example miliamperes will be
converted to amperes).
"""
class PintArgument(argparse.Action):
"""
argparse action using pint.
"""
def __call__(self, parser, args, value, option_string=None):
try:
setattr( args, self.dest, ureg.parse_expression(value).to(unit) )
except pint.errors.PintError as e:
log.error('cannot parse expression',
dest=self.dest, expected_unit=unit, value=value, msg=str(e))
return PintArgument
def output_data_path(value: str) -> Path:
"""
Check if a path can be used as a prefix for the output summary
data.
"""
p = Path(value)
if not p.parent.exists():
raise argparse.ArgumentTypeError(f'path {p.parent} needs to exist')
return p
def add_charge_args(parser: argparse.ArgumentParser) -> argparse.ArgumentParser:
"""
Add arguments for the 'charge' CLI command.
"""
parser.add_argument('--current',
metavar='A', required=True,
action=pint_bind_unit(ureg.ampere),
help='Charging current')
parser.add_argument('--voltage',
metavar='V', required=True,
action=pint_bind_unit(ureg.volt),
help='Charging voltage')
parser.add_argument('--cutoff-current', metavar='A',
action=pint_bind_unit(ureg.ampere),
# This is the lowest cutoff current for EBC-A20
default=ureg.parse_expression("100 mA"),
help='Charge termination current')
parser.set_defaults(cmd=cmd_charge)
parser.set_defaults(need_serial=True)
return parser
def add_discharge_args(parser: argparse.ArgumentParser) -> argparse.ArgumentParser:
"""
Add arguments for the 'discharge' CLI command.
"""
parser.add_argument('--current', metavar='A', required=True,
action=pint_bind_unit(ureg.ampere),
help='Constant Current discharge current')
# Constant Power discharge not yet implemented
#discharge_parser.add_argument('--power',
# action=pint_bind_unit(ureg.watt),
# help='Constant Power discharge power')
parser.add_argument('--cutoff-voltage', metavar='V', required=True,
action=pint_bind_unit(ureg.volt),
help='Discharge cutoff voltage')
parser.add_argument('--time-limit', metavar='T',
action=pint_bind_unit(ureg.minute),
default=0 * ureg.second,
help='Time limit')
parser.set_defaults(cmd=cmd_discharge)
parser.set_defaults(need_serial=True)
return parser
def add_cycle_args(parser: argparse.ArgumentParser) -> argparse.ArgumentParser:
"""
Add arguments for the 'cycle' CLI command.
"""
parser.add_argument('--num-cycles',
type=int, default=1, help='Number of measurement test cycles')
prepare_charge_group = parser.add_argument_group('Prepare Charge')
prepare_charge_group.add_argument('--pre-charge-current',
metavar='A', required=True,
action=pint_bind_unit(ureg.ampere),
help='Charging current')
prepare_charge_group.add_argument('--pre-charge-voltage',
metavar='V', required=True,
action=pint_bind_unit(ureg.volt),
help='Charging voltage')
prepare_charge_group.add_argument('--pre-cutoff-current',
metavar='A',
action=pint_bind_unit(ureg.ampere),
default=ureg.parse_expression('100 mA'),
help='Charge termination current')
test_discharge_group = parser.add_argument_group('Test discharge')
test_discharge_group.add_argument('--discharge-current',
metavar='A', required=True,
action=pint_bind_unit(ureg.ampere),
help='Discharging current')
test_discharge_group.add_argument('--discharge-cutoff-voltage',
metavar='V', required=True,
action=pint_bind_unit(ureg.volt),
help='Discharge cutoff voltage')
test_discharge_group.add_argument('--discharge-time-limit',
metavar='T',
action=pint_bind_unit(ureg.minute),
default=0 * ureg.second,
help='Discharge time limit')
post_charge_group = parser.add_argument_group('Post charge')
post_charge_group.add_argument('--post-charge-current',
metavar='A', required=True,
action=pint_bind_unit(ureg.ampere),
help='Charging current')
post_charge_group.add_argument('--post-charge-voltage',
metavar='V', required=True,
action=pint_bind_unit(ureg.volt),
help='Charging voltage')
post_charge_group.add_argument('--post-cutoff-current',
metavar='T',
action=pint_bind_unit(ureg.ampere),
help='Charging termination current')
parser.set_defaults(cmd=cmd_cycle)
parser.set_defaults(need_serial=True)
return parser
def add_monitor_args(parser: argparse.ArgumentParser) -> argparse.ArgumentParser:
"""
Add arguments for the 'monitor' CLI command.
"""
parser.set_defaults(cmd=cmd_monitor)
parser.set_defaults(need_serial=True)
return parser
def add_decode_args(parser: argparse.ArgumentParser) -> argparse.ArgumentParser:
"""
Add arguments for the 'decode' CLI command.
"""
parser.add_argument('packets', metavar='PKT', nargs='+',
help='Hexlified packets to decode')
parser.set_defaults(cmd=cmd_decode)
parser.set_defaults(need_serial=False)
parser.set_defaults(transmit=False)
return parser
def main():
"""
Main CLI function.
"""
parser = argparse.ArgumentParser('ebc-a20',
description='CLI for the ZKETECH EBC-A20 Charger/Dummy Load')
parser.add_argument('--port', type=str, default='/dev/ttyUSB0',
help='Serial port to use')
parser.add_argument('--loglevel', choices=LOG_LEVEL_NAMES, default='INFO',
help='Change log level')
parser.add_argument('--transmit', action=argparse.BooleanOptionalAction, default=True,
help='Transmit commands to the charger')
parser.add_argument('--output-prefix', metavar='PATH', default=None,
type=output_data_path, help='Output path prefix for charge/discharge data')
parser.add_argument('--save-curves', action=argparse.BooleanOptionalAction, default=True,
help='Save charge/discharge curves as .csv')
parser.set_defaults(need_serial=False)
subparsers = parser.add_subparsers(help='sub-commands help')
charge_parser = subparsers.add_parser('charge', help='Charge a cell')
add_charge_args(charge_parser)
discharge_parser = subparsers.add_parser('discharge', help='Discharge a cell')
add_discharge_args(discharge_parser)
cycle_parser = subparsers.add_parser('cycle',
help='Charge-Discharge-Charge capacity measurement cycle')
add_cycle_args(cycle_parser)
monitor_parser = subparsers.add_parser('monitor',
help='Monitor and log the charger state')
add_monitor_args(monitor_parser)
decode_parser = subparsers.add_parser('decode', help='Test packet decoder')
add_decode_args(decode_parser)
args = parser.parse_args()
structlog.configure(
wrapper_class=structlog.make_filtering_bound_logger(getattr(logging, args.loglevel))
)
log.debug('arguments', args=args)
try:
if args.need_serial:
s = serial.Serial(port=args.port,
baudrate=9600,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_ODD)
else:
s = None
if args.transmit and s is not None:
pkt = ctl_packet(CommandCode.CONNECT)
log.info('transmit control packet', pkt=hexlify(pkt))
s.write(pkt)
if hasattr(args, 'cmd'):
summary_data = args.cmd(args, s=s if args.need_serial else None)
if args.output_prefix:
args.output_prefix.with_suffix('.json').write_text(json.dumps(summary_data))
else:
log.fatal('command needs to be specified')
except KeyboardInterrupt:
pass
finally:
if args.transmit:
pkt = ctl_packet(CommandCode.STOP)
log.info('transmit control packet', pkt=hexlify(pkt))
s.write(pkt)
pkt = ctl_packet(CommandCode.DISCONNECT)
log.info('transmit control packet', pkt=hexlify(pkt))
s.write(pkt)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment