Last active
January 17, 2021 04:38
-
-
Save bitglue/bf077095243405702367876af444b98a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Improved example for reading DHT22 sensors | |
adafruit-circuitpython-dht | |
<https://github.com/adafruit/Adafruit_CircuitPython_DHT/blob/master/adafruit_dht.py> | |
is very buggy, throwing a RuntimeError about 20% of the time when its unable to | |
decode the result from the sensor. The most common problem seems to be it's | |
unable to capture all the pulses. This is likely due to the pulseio | |
implementation in blinka: | |
https://github.com/adafruit/Adafruit_Blinka/tree/master/src/adafruit_blinka/microcontroller/bcm283x/pulseio | |
Maybe it's better on actual CircuitPython. I haven't tested. | |
Blinka has a separate program called libgpiod_pulsein which is linked with | |
pigpio, and the python pulsein module communicates with this program running as | |
a subprocess. I don't know the reasoning for this arrangement, but it seems | |
much more reliable results can be obtained by just using the Python interface | |
to pigpio directly (pigpiod must be running). | |
The example implementation here has yet to fail to read my DHT22. Perhaps this | |
approach should replace the blinka pulsein implementation. | |
""" | |
from __future__ import division, print_function | |
import time | |
import pigpio | |
GPIO = 10 | |
class DHT22Error(Exception): | |
pass | |
class Reader: | |
def __init__(self, pi, gpio): | |
self.pi = pi | |
self.gpio = gpio | |
self.reset() | |
self._cb = pi.callback(gpio, pigpio.EITHER_EDGE, self._cbf) | |
def reset(self): | |
self.last_high = None | |
self.pulses = [] | |
def _cbf(self, gpio, level, tick): | |
if level == pigpio.HIGH: | |
self.last_high = tick | |
elif self.last_high is not None: | |
pulse_time_us = pigpio.tickDiff(self.last_high, tick) | |
self.pulses.append(pulse_time_us) | |
def bits_to_int(bits): | |
i = 0 | |
while bits: | |
i <<= 1 | |
if bits[0]: | |
i += 1 | |
del bits[0] | |
return i | |
def trigger_measurement(pi): | |
pi.set_mode(GPIO, pigpio.OUTPUT) | |
pi.write(GPIO, 0) | |
time.sleep(0.001) | |
pi.set_mode(GPIO, pigpio.INPUT) | |
def test_pigpio(): | |
pi = pigpio.pi() | |
reader = Reader(pi, GPIO) | |
pi.set_pull_up_down(GPIO, pigpio.PUD_UP) | |
while True: | |
trigger_measurement(pi) | |
time.sleep(2) | |
bits = [i > 49 for i in reader.pulses] | |
reader.reset() | |
if len(bits) < 40: | |
raise DHT22Error("not enough bits") | |
bits = bits[-40:] | |
humidity = bits_to_int(bits[0:16]) / 10 | |
temperature = bits_to_int(bits[17:32]) / 10 | |
if bits[16]: | |
temperature = -temperature | |
checksum = bits_to_int(bits[32:40]) | |
if ( | |
bits_to_int(bits[0:8]) | |
+ bits_to_int(bits[8:16]) | |
+ bits_to_int(bits[16:24]) | |
+ bits_to_int(bits[24:32]) | |
) & 0xFF != checksum: | |
raise DHT22Error("invalid checksum") | |
print(f"temp: {temperature}, humidity {humidity}") | |
if __name__ == "__main__": | |
test_pigpio() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment