Last active
September 28, 2021 20:38
-
-
Save evindunn/082b7e22aba2e5104de3ed19e87f38ba to your computer and use it in GitHub Desktop.
DHT11 Temperature Sensor as HTTP Service
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from argparse import ArgumentParser | |
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler | |
from json import dumps as json_dumps | |
from threading import Thread, RLock | |
from time import sleep | |
import Adafruit_DHT | |
GPIO_PIN_DEFAULT = 4 | |
class Handler(BaseHTTPRequestHandler): | |
CODE_INTERNAL_SERVER_ERROR = 500 | |
CODE_OK = 200 | |
CONTENT_TYPE = "application/json;charset=utf-8" | |
protocol_version = "HTTP/1.1" | |
error_content_type = CONTENT_TYPE | |
error_message_format = json_dumps({"message": "%(explain)s"}) | |
def do_GET(self): | |
self.send_response(Handler.CODE_OK) | |
self.send_header("Content-Type", Handler.CONTENT_TYPE) | |
response = json_dumps(self.server.sensor_data).encode("utf-8") | |
self.send_header("Content-Length", len(response)) | |
self.end_headers() | |
self.wfile.write(response) | |
class Server(ThreadingHTTPServer): | |
SENSOR_POLL_INTERVAL = 1 | |
allow_reuse_address = True | |
def __init__(self, address, handler, gpio_pin): | |
super().__init__(address, handler) | |
self._sensor_data = {"humidity": 0, "temperature": 0} | |
self._sensor_lock = RLock() | |
self._sensor_thread = Thread( | |
target=Server.poll_sensor, | |
args=(self._sensor_lock, gpio_pin, self._sensor_data), | |
daemon=True | |
) | |
self._sensor_thread.start() | |
@property | |
def sensor_data(self): | |
with self._sensor_lock: | |
return self._sensor_data | |
@staticmethod | |
def poll_sensor(lock, gpio_pin, data): | |
while True: | |
humidity, temperature = Adafruit_DHT.read_retry(Adafruit_DHT.DHT11, gpio_pin) | |
with lock: | |
data["humidity"] = humidity | |
data["temperature"] = temperature | |
sleep(Server.SENSOR_POLL_INTERVAL) | |
if __name__ == "__main__": | |
parser = ArgumentParser(description="Temperature and Humidity Server") | |
parser.add_argument("port", type=int, help="The port to the the server on") | |
parser.add_argument("--gpio-pin", type=int, help="The GPIO pin that the sensor is connected to", default=GPIO_PIN_DEFAULT) | |
args = parser.parse_args() | |
with Server(("0.0.0.0", args.port), Handler, args.gpio_pin) as httpd: | |
print("Server running on port {}...".format(args.port)) | |
try: | |
httpd.serve_forever() | |
except KeyboardInterrupt: | |
httpd.shutdown() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment