Created
October 13, 2021 05:13
-
-
Save arrdem/39836a0a0e0c297dd0be34f332dd2e8e to your computer and use it in GitHub Desktop.
An approachable implementation of the ClusterCTRL/ClusterHAT i2c controller
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""An I2C driver for the ClusterCTRL/ClusterHAT device(s).""" | |
from enum import Enum | |
from itertools import chain, repeat | |
from time import sleep | |
from typing import Union | |
import smbus | |
def once(f): | |
"""Decorator. Defer to f once and only once, caching the result forever. | |
Users with a functional background may recognize the concept of a `thunk`. | |
""" | |
unset = val = object() | |
def _helper(*args, **kwargs): | |
nonlocal val | |
if val is unset: | |
val = f(*args, **kwargs) | |
return val | |
return _helper | |
# I2C address of ClusterCTRL device | |
I2C_ADDRESS = 0x20 | |
class Reg(Enum): | |
"""The registers supported by an Cluster device.""" | |
VERSION = 0x00 # Register layout version | |
MAXPI = 0x01 # Maximum number of Pi | |
ORDER = 0x02 # Order - used to sort multiple ClusterCTRL devices | |
MODE = 0x03 # N/A | |
TYPE = 0x04 # 0=DA, 1=pHAT | |
DATA7 = 0x05 # | |
DATA6 = 0x06 # | |
DATA5 = 0x07 # | |
DATA4 = 0x08 # | |
DATA3 = 0x09 # | |
DATA2 = 0x0A # | |
DATA1 = 0x0B # | |
DATA0 = 0x0C # | |
CMD = 0x0D # Command | |
STATUS = 0x0E # Status | |
class Cmd(Enum): | |
"""Commands supported by various Cluster devices.""" | |
ON = 0x03 # Turn on Px (data0=x) | |
OFF = 0x04 # Turn off Px (data0=x) | |
ALERT_ON = 0x05 # Turn on Alert LED | |
ALERT_OFF = 0x06 # Turn off Alert LED | |
HUB_CYCLE = 0x07 # Reset USB HUB (turn off for data0*10ms, then back on) | |
LED_EN = 0x0A # Enable Px LED (data0=x) | |
LED_DIS = 0x0B # Disable Px LED (data0=x) | |
PWR_ON = 0x0C # Turn off PWR LED | |
PWR_OFF = 0x0D # Turn off PWR LED | |
RESET = 0x0E # Resets ClusterCTRL (does not keep power state) | |
GET_PSTATUS = 0x0F # Get Px power status (data0=x) | |
FAN = 0x10 # Turn fan on (data0=1) or off (data0=0) | |
GETPATH = 0x11 # Get USB path to Px (data0=x 0=controller) returned in data7-data0 | |
USBBOOT_EN = 0x12 # Turn on USBBOOT | |
USBBOOT_DIS = 0x13 # Turn off USBBOOT | |
GET_USTATUS = 0x14 # Get Px USBBOOT status (data0=x) | |
SET_ORDER = 0x15 # Set order (data0=order) | |
SAVE = 0xF0 # Save current PWR/P1-LED/P2-LED/P1/P2/Order/Mode to EEPROM | |
SAVEDEFAULTS = 0xF1 # Save factory defaults | |
GET_DATA = 0xF2 # Get DATA (Temps/ADC/etc.) | |
SAVE_ORDER = 0xF3 # Save order to EEPROM | |
SAVE_USBBOOT = 0xF4 # Save usbboot status to EEPROM | |
SAVE_POS = 0xF5 # Save Power On State to EEPROM | |
SAVE_LED = 0xF6 # Save LED to EEPROM | |
NOP = 0x90 # Do nothing | |
class Data(Enum): | |
"""Datum that can be read back from the Cluster device via Cmd.GET_DATA""" | |
# Get arbitrary data from ClusterCTRL | |
VERSION = 0x00 # Get firmware version | |
ADC_CNT = 0x01 # Returns number of ADC ClusterCTRL supports | |
ADC_READ = 0x02 # Read ADC data for ADC number 'data0' | |
ADC_TEMP = 0x03 # Read Temperature ADC | |
FANSTATUS = 0x04 # Read fan status | |
class ClusterDriver(object): | |
def __init__(self, bus: smbus.SMBus, address: int = I2C_ADDRESS, delay: int = 0, clear = False): | |
"""Initialize a ClusterCTRL/ClusterHAT driver instance for a given bus device.""" | |
self._bus = bus | |
self._address = address | |
self._delay = delay | |
self._clear = clear | |
def _read(self, id: Union[Reg, Data], len: int = 1): | |
"""A convenient abstraction for reading data back.""" | |
# Performing a "fundamental" read | |
if isinstance(id, Reg): | |
if len == 1: | |
return self._bus.read_byte_data(self._address, id.value) | |
else: | |
return self._bus.read_block_data(self._address, id.value, len) | |
# Performing a "command" read | |
elif isinstance(id, Data): | |
return self._call(Cmd.GET_DATA, id.value) | |
def _write(self, id: Reg, val: int): | |
"""A convenient abstraction for writing a register.""" | |
return self._bus.write_byte_data(self._address, id.value, val) | |
def _call(self, op: Cmd, *args, clear = False): | |
"""A convenient abstraction over the 'calling' convention for ops. | |
Operations are "called" when Reg.CMD is written to. | |
Operations consume parameters from Reg.DATA0-Reg.DATA7. | |
If `clear=` is truthy, any registers not defined by parameters will be cleared (zeroed) as a safety measure. | |
Note that the caller is responsible for reading any returned data, for which the protocol is less clear. | |
Most operations "just" return via reg.DATA0, but some don't. | |
""" | |
if self._clear or clear: | |
args = chain(args, repeat(0)) | |
args = zip([Reg.DATA0, Reg.DATA1, Reg.DATA2, Reg.DATA3, Reg.DATA4, Reg.DATA5, Reg.DATA6, Reg.DATA7], args) | |
for r, v in args: | |
self._write(r, v) | |
# Execute the call | |
self._write(Reg.CMD, op.value) | |
# If the user has specified a delay, sleep | |
if self._delay: | |
sleep(self._delay) | |
# Return the (mostly) meaningful return code | |
return self._read(Reg.DATA0) | |
@property | |
def min_pi(self): | |
"""Get the minimum supported Pi ID on this controller.""" | |
return 1 | |
@property | |
@once | |
def max_pi(self): | |
"""Get the maximum supported Pi ID on this controller.""" | |
return self._read(Reg.MAXPI) | |
@property | |
def pi_ids(self): | |
"""Iterate over the IDs of Pis which could be connected to this controller.""" | |
return range(self.min_pi, self.max_pi + 1) | |
@property | |
def type(self): | |
return self._read(Reg.TYPE) | |
def reset_hub(self, delay: int = 0): | |
"""[Power] cycle the Controller hub for `delay` x 10ms.""" | |
return self._call(Cmd.HUB_CYCLE, delay) | |
def reset_all(self): | |
"""[Power] cycle the entire Controller.""" | |
return self._call(Cmd.RESET) | |
def power_on(self, id: int): | |
"""Power on a given slot by ID.""" | |
return self._call(Cmd.ON, id) | |
def power_off(self, id: int): | |
"""Power off a given slot by ID.""" | |
return self._call(Cmd.OFF, id) | |
def power_status(self, id: int): | |
"""Read the status of a given slot by ID.""" | |
return self._call(Cmd.GET_PSTATUS, id) | |
def led_on(self, id: int): | |
"""Turn on an LED by ID.""" | |
return self._call(Cmd.LED_EN, id) | |
def led_off(self, id: int): | |
"""Turn off an LED by ID.""" | |
return self._call(Cmd.LED_DIS, id) | |
def alert_on(self): | |
"""Turn on the alert LED on the Controller.""" | |
return self._call(Cmd.ALERT_ON) | |
def alert_off(self): | |
"""Turn off the alert LED on the Controller.""" | |
return self._call(Cmd.ALERT_OFF) | |
def power_all_on(self): | |
"""Power on all slots in this Controller.""" | |
for id in self.pi_ids: | |
if not self.power_status(id): | |
self.power_on(id) | |
def power_all_off(self): | |
"""Power off all slots in this Controller.""" | |
for id in self.pi_ids: | |
self.power_off(id) | |
def fan_on(self): | |
"""Turn on the fan(s) attached to this Controller.""" | |
return self._call(Cmd.FAN, 1) | |
def fan_off(self): | |
"""Turn off the fan(s) attached to this Controller.""" | |
return self._call(Cmd.FAN, 0) | |
def fan_status(self): | |
"""Get the status of the fan(s) attached to this Controller.""" | |
return self._call(Cmd.GET_DATA, Data.FANSTATUS) | |
def eeprom_reset(self): | |
"""Reset device EEPROM to factory defaults, erasing saved configurations.""" | |
return self._call(Cmd.RESET) | |
def eeprom_save_powerstate(self): | |
"""Persist the current power state to EEPROM.""" | |
return self._call(Cmd.SAVE_POS) | |
def get_order(self): | |
"""Get the 'order' value of this device. Can be updated via """ | |
return self._read(Reg.ORDER) | |
def set_order(self, order: int): | |
"""Set an 'order' (Controller ID) value.""" | |
return self._call(Cmd.SET_ORDER, order) | |
def eeprom_save_order(self): | |
"""Persist the current order value to EEPROM.""" | |
return self._call(Cmd.SAVE_ORDER) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment