Last active
December 6, 2022 08:04
-
-
Save bphermansson/82aa42fd59fc0874133b2849634a4544 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# Script that reads various I2C sensors. | |
# CCS811, BH1750, HTU21. (Air quality, light, temp & humidity). | |
# CCS811. "This part will measure eCO2 (equivalent calculated carbon-dioxide) concentration | |
# within a range of 400 to 8192 parts per million (ppm), and TVOC (Total Volatile Organic | |
# Compound) concentration within a range of 0 to 1187 parts per billion (ppb)." | |
# "The BH1750 provides 16-bit light measurements in lux" | |
# The script sends this values in this order by MQTT: | |
# TVOC, CO2, light, temperature, humidity | |
# Connections, pin numbers on RPI's Gpio | |
# SCL pin 5 Gpio3 | |
# Gnd pin 9 Gnd | |
# Vcc pin 1 3V3 | |
# SDA pin 3 Gpio2 | |
import sys | |
from smbus import SMBus | |
from time import sleep | |
from datetime import datetime | |
from htu21 import HTU21 | |
import time | |
import json | |
import paho.mqtt.client as paho | |
broker="192.168.1.190" | |
mqtt_sensors_topic="magicmirror/sensors" | |
mqtt_error_topic = "magicmirror/errors" | |
mqtt_info_topic = "magicmirror/info" | |
client= paho.Client("MagicMirrorSensors") | |
#CCS811 | |
SLAVE_ADDR = 0x5B # Can be 0x5A | |
# Slave registers to read and write | |
DEVICE_REG_STATUS = 0x00 | |
DEVICE_REG_MEAS_MODE = 0x01 | |
DEVICE_REG_ALG_RESULT_DATA = 0x02 | |
DEVICE_REG_ERROR_ID = 0xE0 | |
DEVICE_REG_APP_START = 0xF4 | |
# Slave register read values | |
DEVICE_STATE_BOOT = 0x10 | |
DEVICE_STATE_APP = 0x90 | |
DEVICE_STATE_APP_WITH_DATA = 0x98 | |
# Slave register write values | |
DEVICE_SET_MODE_10S = [0x10] | |
DEVICE_SET_SW_RESET = [0x11, 0xE5, 0x72, 0x8A] | |
# BH1750 | |
BH1750_DEVICE = 0x23 # Default device I2C address | |
DEVICE = 0x23 # Default device I2C address | |
POWER_DOWN = 0x00 # No active state | |
POWER_ON = 0x01 # Power on | |
RESET = 0x07 # Reset data register value | |
# Start measurement at 4lx resolution. Time typically 16ms. | |
CONTINUOUS_LOW_RES_MODE = 0x13 | |
# Start measurement at 1lx resolution. Time typically 120ms | |
CONTINUOUS_HIGH_RES_MODE_1 = 0x10 | |
# Start measurement at 0.5lx resolution. Time typically 120ms | |
CONTINUOUS_HIGH_RES_MODE_2 = 0x11 | |
# Start measurement at 1lx resolution. Time typically 120ms | |
# Device is automatically set to Power Down after measurement. | |
ONE_TIME_HIGH_RES_MODE_1 = 0x20 | |
# Start measurement at 0.5lx resolution. Time typically 120ms | |
# Device is automatically set to Power Down after measurement. | |
ONE_TIME_HIGH_RES_MODE_2 = 0x21 | |
# Start measurement at 1lx resolution. Time typically 120ms | |
# Device is automatically set to Power Down after measurement. | |
ONE_TIME_LOW_RES_MODE = 0x23 | |
# 0 = /dev/i2c-0 (port I2C0), 1 = /dev/i2c-1 (port I2C1) | |
try: | |
i2c_bus = SMBus(1) | |
except: | |
errorRoutine("I2C") | |
# CCS811 | |
def read_i2c_bus_dev(dev_reg, read_len): | |
return i2c_bus.read_i2c_block_data(SLAVE_ADDR, dev_reg, read_len) | |
def write_i2c_bus_dev(dev_reg, write_data): | |
i2c_bus.write_i2c_block_data(SLAVE_ADDR, dev_reg, write_data) | |
def readCCS811(): | |
try: | |
if read_i2c_bus_dev(DEVICE_REG_STATUS, 1)[0] == DEVICE_STATE_BOOT: | |
write_i2c_bus_dev(DEVICE_REG_APP_START, []) # empty write | |
write_i2c_bus_dev(DEVICE_REG_MEAS_MODE, DEVICE_SET_MODE_10S) | |
read_i2c_bus_dev(DEVICE_REG_ERROR_ID, 1) # clear any errors | |
timeout_in_seconds = 30 | |
while (timeout_in_seconds and | |
(read_i2c_bus_dev(DEVICE_REG_STATUS, 1)[0] != | |
DEVICE_STATE_APP_WITH_DATA)): | |
sleep(5) | |
timeout_in_seconds -= 5 | |
if not timeout_in_seconds: | |
raise | |
read_data = read_i2c_bus_dev(DEVICE_REG_ALG_RESULT_DATA, 8) | |
eCO2_reading = (read_data[0] << 8) | read_data[1] | |
eTVOC_reading = (read_data[2] << 8) | read_data[3] | |
#print ("eCO2: ") | |
#print (eCO2_reading) | |
#print ("eTVOC: ") | |
#print (eTVOC_reading) | |
list = [] | |
list.append(eTVOC_reading) | |
list.append(eCO2_reading) | |
return list | |
#raise ValueError # Used to test the exception | |
except Exception as e: | |
errorRoutine("CCS811 reading") | |
# BH1750 | |
def readLight(addr=BH1750_DEVICE): | |
# Read data from I2C interface | |
try: | |
data = i2c_bus.read_i2c_block_data(addr,ONE_TIME_HIGH_RES_MODE_1) | |
return convertToNumber(data) | |
except: | |
errorRoutine("readLight") | |
def convertToNumber(data): | |
# Simple function to convert 2 bytes of data | |
# into a decimal number. Optional parameter 'decimals' | |
# will round to specified number of decimal places. | |
result=(data[1] + (256 * data[0])) / 1.2 | |
return (result) | |
def errorRoutine(src): | |
err = str(sys.exc_info()[0]) | |
err = "Error: " + src + " " + err | |
print(err) | |
client.publish(mqtt_error_topic, err) | |
def main(): | |
print("Main") | |
htu = HTU21() | |
print("connecting to broker ",broker) | |
client.connect(broker) | |
client.loop_start() | |
client.publish(mqtt_info_topic,"start") | |
jsonValues = ['123'] | |
while True: | |
now = datetime.now() | |
current_time = now.strftime("%H:%M:%S") | |
#print("Current Time =", current_time) | |
lightLevel=readLight() | |
#print("Light Level : " + format(lightLevel,'.2f') + " lx") | |
#print("Temperature: " + format(htu.read_temperature(),'.2f')) | |
#print("Humidity: : " + format(htu.read_humidity(),'.2f')) | |
lFormat = format(lightLevel,'.2f') | |
tFormat = format(htu.read_temperature(),'.2f') | |
hFormat = format(htu.read_humidity(),'.2f') | |
try: | |
jsonValues = readCCS811() | |
jsonValues.append(lFormat) | |
jsonValues.append(tFormat) | |
jsonValues.append(hFormat) | |
#raise ValueError | |
except: | |
errorRoutine("Json append") | |
try: | |
json_decoded_data = json.dumps({"Time": current_time, "TVOC": jsonValues[0], "CO2": jsonValues[1], "light": lFormat, "Temperature": tFormat, "Humidity": hFormat}) | |
print (json_decoded_data) | |
client.publish(mqtt_sensors_topic,json_decoded_data) | |
except: | |
errorRoutine("Mqtt publish") | |
time.sleep(30) | |
if __name__ == "__main__": | |
try: | |
main() | |
except KeyboardInterrupt: | |
print ('Interrupted') | |
client.publish(mqtt_info_topic, "Shutting down") | |
client.disconnect() #disconnect | |
client.loop_stop() #stop loop | |
sys.exit(0) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment