Skip to content

Instantly share code, notes, and snippets.

@bitglue
Last active January 17, 2021 04:38
Show Gist options
  • Save bitglue/bf077095243405702367876af444b98a to your computer and use it in GitHub Desktop.
Save bitglue/bf077095243405702367876af444b98a to your computer and use it in GitHub Desktop.
"""Improved example for reading DHT22 sensors
adafruit-circuitpython-dht
<https://github.com/adafruit/Adafruit_CircuitPython_DHT/blob/master/adafruit_dht.py>
is very buggy, throwing a RuntimeError about 20% of the time when its unable to
decode the result from the sensor. The most common problem seems to be it's
unable to capture all the pulses. This is likely due to the pulseio
implementation in blinka:
https://github.com/adafruit/Adafruit_Blinka/tree/master/src/adafruit_blinka/microcontroller/bcm283x/pulseio
Maybe it's better on actual CircuitPython. I haven't tested.
Blinka has a separate program called libgpiod_pulsein which is linked with
pigpio, and the python pulsein module communicates with this program running as
a subprocess. I don't know the reasoning for this arrangement, but it seems
much more reliable results can be obtained by just using the Python interface
to pigpio directly (pigpiod must be running).
The example implementation here has yet to fail to read my DHT22. Perhaps this
approach should replace the blinka pulsein implementation.
"""
from __future__ import division, print_function
import time
import pigpio
GPIO = 10
class DHT22Error(Exception):
pass
class Reader:
def __init__(self, pi, gpio):
self.pi = pi
self.gpio = gpio
self.reset()
self._cb = pi.callback(gpio, pigpio.EITHER_EDGE, self._cbf)
def reset(self):
self.last_high = None
self.pulses = []
def _cbf(self, gpio, level, tick):
if level == pigpio.HIGH:
self.last_high = tick
elif self.last_high is not None:
pulse_time_us = pigpio.tickDiff(self.last_high, tick)
self.pulses.append(pulse_time_us)
def bits_to_int(bits):
i = 0
while bits:
i <<= 1
if bits[0]:
i += 1
del bits[0]
return i
def trigger_measurement(pi):
pi.set_mode(GPIO, pigpio.OUTPUT)
pi.write(GPIO, 0)
time.sleep(0.001)
pi.set_mode(GPIO, pigpio.INPUT)
def test_pigpio():
pi = pigpio.pi()
reader = Reader(pi, GPIO)
pi.set_pull_up_down(GPIO, pigpio.PUD_UP)
while True:
trigger_measurement(pi)
time.sleep(2)
bits = [i > 49 for i in reader.pulses]
reader.reset()
if len(bits) < 40:
raise DHT22Error("not enough bits")
bits = bits[-40:]
humidity = bits_to_int(bits[0:16]) / 10
temperature = bits_to_int(bits[17:32]) / 10
if bits[16]:
temperature = -temperature
checksum = bits_to_int(bits[32:40])
if (
bits_to_int(bits[0:8])
+ bits_to_int(bits[8:16])
+ bits_to_int(bits[16:24])
+ bits_to_int(bits[24:32])
) & 0xFF != checksum:
raise DHT22Error("invalid checksum")
print(f"temp: {temperature}, humidity {humidity}")
if __name__ == "__main__":
test_pigpio()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment