Last active
March 7, 2025 13:03
-
-
Save rambo/0f6cb1719fc7b3970b93168d111e6c93 to your computer and use it in GitHub Desktop.
MicroPython SHT40, SHT41, SHT43, SHT45 asyncio driver
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
SHT40, SHT41, SHT43, SHT45 | |
MicroPython driver for micro:bit | |
https://www.fredscave.com/drivers/008-sht4x.html | |
AUTHOR: fredscave.com | |
DATE : 2024/10 | |
VERSION : 1.00 | |
Converted to asyncio by [email protected] on 2025-03-07 | |
""" | |
import micropython | |
import asyncio | |
from micropython import const | |
from machine import I2C | |
import logging | |
LOGGER = logging.getLogger(__name__) | |
ADDR = const(0x44) | |
CMD_HI_PREC = const(0xFD) | |
TIME_HI_PREC = const(10) | |
CMD_MED_PREC = const(0xF6) | |
TIME_MED_PREC = const(5) | |
CMD_LO_PREC = const(0xE0) | |
TIME_LO_PREC = const(2) | |
HIGH = const(0) | |
MEDIUM = const(1) | |
LOW = const(2) | |
prec_cmd_list = [CMD_HI_PREC, CMD_MED_PREC, CMD_LO_PREC] | |
prec_wait_list = [TIME_HI_PREC, TIME_MED_PREC, TIME_LO_PREC] | |
CMD_HEAT0 = const(0x39) # 200mw, 1s | |
CMD_HEAT1 = const(0x32) # 200mw, 0.1s | |
CMD_HEAT2 = const(0x2F) # 110mw, 1s | |
CMD_HEAT3 = const(0x24) # 110mw, 0.1s | |
CMD_HEAT4 = const(0x1E) # 20mw, 1s | |
CMD_HEAT5 = const(0x15) # 20mw, 0.1s | |
heat_cmd_list = [CMD_HEAT0, CMD_HEAT1, CMD_HEAT2, CMD_HEAT3, CMD_HEAT4, CMD_HEAT5] | |
heat_wait_list = [ | |
1000 + TIME_HI_PREC + 2, | |
100 + TIME_HI_PREC + 2, | |
1000 + TIME_HI_PREC + 2, | |
100 + TIME_HI_PREC + 2, | |
1000 + TIME_HI_PREC + 2, | |
100 + TIME_HI_PREC + 2, | |
] | |
CMD_SERIAL = const(0x89) | |
CMD_RESET = const(0x94) | |
CRC_INITIAL = const(0xFF) | |
CRC_POLY = const(0x31) | |
ResultType = (float, float, float, bool, bool) # C, F, rh, t_ok, rh_ok | |
class SHT4X: | |
"""SHT40, SHT41, SHT43, SHT45 driver""" | |
@micropython.native | |
def __init__(self, i2c: I2C, addr: int = ADDR) -> None: | |
"""init with bus object and address""" | |
self.i2c = i2c | |
self.addr = addr | |
@micropython.native | |
async def read(self, precision: int = HIGH) -> ResultType: | |
"""Read temperature and humidity data from the sensor""" | |
if precision not in [HIGH, MEDIUM, LOW]: | |
precision = HIGH | |
cmd = prec_cmd_list[precision] | |
wait = prec_wait_list[precision] | |
self.i2c.writeto(self.addr, bytes([cmd])) | |
await asyncio.sleep_ms(wait) | |
buf = self.i2c.readfrom(self.addr, 6) | |
return SHT4X.convert(buf) | |
@staticmethod | |
@micropython.native | |
def normalize_temp(temp: float) -> float: | |
"""Nomalize temperature result""" | |
return round(temp, 1) | |
@micropython.native | |
async def temp(self, precision: int = HIGH) -> float | None: | |
"""call read and return the temperature in C part of result rounded to one decimal""" | |
results = await self.read(precision) | |
if not results[3]: | |
LOGGER.error("Checksum failure for temperature") | |
return None | |
return SHT4X.normalize_temp(results[0]) | |
@staticmethod | |
@micropython.native | |
def normalize_rh(rh: float) -> float: | |
"""Normalize the humidity to 0-100""" | |
if rh > 100: | |
return 100.0 | |
elif rh < 0: | |
return 0.0 | |
return round(rh + 0.5, 1) | |
@micropython.native | |
async def humidity(self, precision: int = HIGH) -> int | None: | |
"""call read and return the humidity part of result in percent""" | |
results = await self.read(precision) | |
if not results[4]: | |
LOGGER.error("Checksum failure for humidity") | |
return None | |
return SHT4X.normalize_rh(results[2]) | |
@micropython.native | |
async def temp_and_rh(self, precision: int = HIGH) -> (float, int) | None: | |
"""Get temperatur and relative humidity normalized with one read""" | |
results = await self.read(precision) | |
if not (results[3] and results[4]): | |
LOGGER.error("Checksum failure for temperature and/or relative humidity") | |
return None | |
return SHT4X.normalize_temp(results[0]), SHT4X.normalize_rh(results[2]) | |
@micropython.native | |
async def heater(self, setting: int = 5) -> ResultType: | |
"""Turn on the heater and read results after (for self-test)""" | |
if setting not in range(6): | |
setting = 5 | |
cmd = heat_cmd_list[setting] | |
wait = heat_wait_list[setting] | |
self.i2c.writeto(self.addr, bytes([cmd])) | |
await asyncio.sleep_ms(wait) | |
buf = self.i2c.readfrom(self.addr, 6) | |
return SHT4X.convert(buf) | |
@micropython.native | |
async def serialno(self) -> str: | |
"""Read the serial number""" | |
self.i2c.writeto(self.addr, bytes([CMD_SERIAL])) | |
await asyncio.sleep_ms(5) | |
buf = self.i2c.readfrom(self.addr, 6) | |
upper = buf[0] * 256 + buf[1] | |
lower = buf[3] * 256 + buf[4] | |
return str(upper) + "-" + str(lower) | |
@micropython.native | |
async def reset(self) -> None: | |
"""Send SW reset command""" | |
self.i2c.writeto(self.addr, bytes([CMD_RESET])) | |
await asyncio.sleep_ms(1) | |
@staticmethod | |
@micropython.native | |
def checksum(buf: bytes) -> int: | |
"""Calculate buffer checksum""" | |
crc = CRC_INITIAL | |
for byte in buf: | |
crc ^= byte | |
for bit in range(8): | |
if crc & 0x80: | |
crc = (crc << 1) ^ CRC_POLY | |
else: | |
crc = crc << 1 | |
return crc & 0xFF | |
@staticmethod | |
@micropython.native | |
def convert(buf) -> ResultType: | |
"""Convert the buffer into results""" | |
t_ticks = buf[0] * 256 + buf[1] | |
t_c = -45 + 175 * t_ticks / 65535 | |
t_f = -49 + 315 * t_ticks / 65535 | |
t_buf = bytearray([buf[0], buf[1]]) | |
t_check = SHT4X.checksum(t_buf) | |
rh_ticks = buf[3] * 256 + buf[4] | |
rh_p = -6 + 125 * rh_ticks / 65535 | |
rh_buf = bytearray([buf[3], buf[4]]) | |
rh_check = SHT4X.checksum(rh_buf) | |
return t_c, t_f, rh_p, t_check == buf[2], rh_check == buf[5] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment