Last active
August 18, 2020 04:34
-
-
Save nivhty/ce069ec37e1908f9899424602fd47d9c to your computer and use it in GitHub Desktop.
Python to script to use the SDS011 air quality sensor and outputting to a 16x2 I2C LCD
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python -u | |
# coding=utf-8 | |
# "DATASHEET": http://cl.ly/ekot | |
#AQI | |
from __future__ import print_function, division | |
import serial, struct, sys, time, json, subprocess, smbus | |
DEBUG = 0 | |
CMD_MODE = 2 | |
CMD_QUERY_DATA = 4 | |
CMD_DEVICE_ID = 5 | |
CMD_SLEEP = 6 | |
CMD_FIRMWARE = 7 | |
CMD_WORKING_PERIOD = 8 | |
MODE_ACTIVE = 0 | |
MODE_QUERY = 1 | |
PERIOD_CONTINUOUS = 0 | |
ser = serial.Serial() | |
ser.port = "/dev/ttyUSB0" | |
ser.baudrate = 9600 | |
ser.open() | |
ser.flushInput() | |
byte, data = 0, "" | |
# Function | |
def dump(d, prefix=''): | |
print(prefix + ' '.join(x.encode('hex') for x in d)) | |
def construct_command(cmd, data=[]): | |
assert len(data) <= 12 | |
data += [0,]*(12-len(data)) | |
checksum = (sum(data)+cmd-2)%256 | |
ret = "\xaa\xb4" + chr(cmd) | |
ret += ''.join(chr(x) for x in data) | |
ret += "\xff\xff" + chr(checksum) + "\xab" | |
if DEBUG: | |
dump(ret, '> ') | |
return ret | |
def process_data(d): | |
r = struct.unpack('<HHxxBB', d[2:]) | |
pm25 = r[0]/10.0 | |
pm10 = r[1]/10.0 | |
checksum = sum(ord(v) for v in d[2:8])%256 | |
return [pm25, pm10] | |
def process_version(d): | |
r = struct.unpack('<BBBHBB', d[3:]) | |
checksum = sum(ord(v) for v in d[2:8])%256 | |
print("Y: {}, M: {}, D: {}, ID: {}, CRC={}".format(r[0], r[1], r[2], hex(r[3]), "OK" if (checksum==r[4] and r[5]==0xab) else "NOK")) | |
def read_response(): | |
byte = 0 | |
while byte != "\xaa": | |
byte = ser.read(size=1) | |
d = ser.read(size=9) | |
if DEBUG: | |
dump(d, '< ') | |
return byte + d | |
def cmd_set_mode(mode=MODE_QUERY): | |
ser.write(construct_command(CMD_MODE, [0x1, mode])) | |
read_response() | |
def cmd_query_data(): | |
ser.write(construct_command(CMD_QUERY_DATA)) | |
d = read_response() | |
values = [] | |
if d[1] == "\xc0": | |
values = process_data(d) | |
return values | |
def cmd_set_sleep(sleep): | |
mode = 0 if sleep else 1 | |
ser.write(construct_command(CMD_SLEEP, [0x1, mode])) | |
read_response() | |
def cmd_set_working_period(period): | |
ser.write(construct_command(CMD_WORKING_PERIOD, [0x1, period])) | |
read_response() | |
def cmd_firmware_ver(): | |
ser.write(construct_command(CMD_FIRMWARE)) | |
d = read_response() | |
process_version(d) | |
def cmd_set_id(id): | |
id_h = (id>>8) % 256 | |
id_l = id % 256 | |
ser.write(construct_command(CMD_DEVICE_ID, [0]*10+[id_l, id_h])) | |
read_response() | |
# LED FUNCTION | |
# LED | |
# Define some device parameters | |
I2C_ADDR = 0x27 # I2C device address | |
LCD_WIDTH = 16 # Maximum characters per line | |
# Define some device constants | |
LCD_CHR = 1 # Mode - Sending data | |
LCD_CMD = 0 # Mode - Sending command | |
LCD_LINE_1 = 0x80 # LCD RAM address for the 1st line | |
LCD_LINE_2 = 0xC0 # LCD RAM address for the 2nd line | |
LCD_LINE_3 = 0x94 # LCD RAM address for the 3rd line | |
LCD_LINE_4 = 0xD4 # LCD RAM address for the 4th line | |
LCD_BACKLIGHT = 0x08 # On | |
#LCD_BACKLIGHT = 0x00 # Off | |
ENABLE = 0b00000100 # Enable bit | |
# Timing constants | |
E_PULSE = 0.0005 | |
E_DELAY = 0.0005 | |
#Open I2C interface | |
#bus = smbus.SMBus(0) # Rev 1 Pi uses 0 | |
bus = smbus.SMBus(1) # Rev 2 Pi uses 1 | |
def lcd_init(): | |
# Initialise display | |
lcd_byte(0x33,LCD_CMD) # 110011 Initialise | |
lcd_byte(0x32,LCD_CMD) # 110010 Initialise | |
lcd_byte(0x06,LCD_CMD) # 000110 Cursor move direction | |
lcd_byte(0x0C,LCD_CMD) # 001100 Display On,Cursor Off, Blink Off | |
lcd_byte(0x28,LCD_CMD) # 101000 Data length, number of lines, font size | |
lcd_byte(0x01,LCD_CMD) # 000001 Clear display | |
time.sleep(E_DELAY) | |
def lcd_byte(bits, mode): | |
# Send byte to data pins | |
# bits = the data | |
# mode = 1 for data | |
# 0 for command | |
bits_high = mode | (bits & 0xF0) | LCD_BACKLIGHT | |
bits_low = mode | ((bits<<4) & 0xF0) | LCD_BACKLIGHT | |
# High bits | |
bus.write_byte(I2C_ADDR, bits_high) | |
lcd_toggle_enable(bits_high) | |
# Low bits | |
bus.write_byte(I2C_ADDR, bits_low) | |
lcd_toggle_enable(bits_low) | |
def lcd_toggle_enable(bits): | |
# Toggle enable | |
time.sleep(E_DELAY) | |
bus.write_byte(I2C_ADDR, (bits | ENABLE)) | |
time.sleep(E_PULSE) | |
bus.write_byte(I2C_ADDR,(bits & ~ENABLE)) | |
time.sleep(E_DELAY) | |
def lcd_string(message, line): | |
# Send string to display | |
message = str(message) | |
message = message.ljust(LCD_WIDTH," ") | |
lcd_byte(line, LCD_CMD) | |
for i in range(LCD_WIDTH): | |
lcd_byte(ord(message[i]),LCD_CHR) | |
# Calculate function | |
def calcAQIpm25(pm25): | |
pm1 = 0 | |
pm2 = 12 | |
pm3 = 35.4 | |
pm4 = 55.4 | |
pm5 = 150.4 | |
pm6 = 250.4 | |
pm7 = 350.4 | |
pm8 = 500.4 | |
aqi1 = 0 | |
aqi2 = 50 | |
aqi3 = 100 | |
aqi4 = 150 | |
aqi5 = 200 | |
aqi6 = 300 | |
aqi7 = 400 | |
aqi8 = 500 | |
if (pm25 >= pm1 and pm25 < pm2): | |
aqipm25 = ((aqi2 - aqi1) / (pm2 - pm1)) * (pm25 - pm1) + aqi1 | |
elif (pm25 >= pm2 and pm25 < pm3): | |
aqipm25 = ((aqi3 - aqi2) / (pm3 - pm2)) * (pm25 - pm2) + aqi2 | |
elif (pm25 >= pm3 and pm25 < pm4): | |
aqipm25 = ((aqi4 - aqi3) / (pm4 - pm3)) * (pm25 - pm3) + aqi3 | |
elif (pm25 >= pm4 and pm25 < pm5): | |
aqipm25 = ((aqi5 - aqi4) / (pm5 - pm4)) * (pm25 - pm4) + aqi4 | |
elif (pm25 >= pm5 and pm25 < pm6): | |
aqipm25 = ((aqi6 - aqi5) / (pm6 - pm5)) * (pm25 - pm5) + aqi5 | |
elif (pm25 >= pm6 and pm25 < pm7): | |
aqipm25 = ((aqi7 - aqi6) / (pm7 - pm6)) * (pm25 - pm6) + aqi6 | |
elif (pm25 >= pm7 and pm25 <= pm8): | |
aqipm25 = ((aqi8 - aqi7) / (pm8 - pm7)) * (pm25 - pm7) + aqi7 | |
return round(aqipm25) | |
def calcAQIpm10(pm10): | |
pm1 = 0 | |
pm2 = 54 | |
pm3 = 154 | |
pm4 = 254 | |
pm5 = 354 | |
pm6 = 424 | |
pm7 = 504 | |
pm8 = 604 | |
aqi1 = 0 | |
aqi2 = 50 | |
aqi3 = 100 | |
aqi4 = 150 | |
aqi5 = 200 | |
aqi6 = 300 | |
aqi7 = 400 | |
aqi8 = 500 | |
if (pm10 >= pm1 and pm10 < pm2): | |
aqipm10 = ((aqi2 - aqi1) / (pm2 - pm1)) * (pm10 - pm1) + aqi1 | |
elif (pm10 >= pm2 and pm10 < pm3): | |
aqipm10 = ((aqi3 - aqi2) / (pm3 - pm2)) * (pm10 - pm2) + aqi2 | |
elif (pm10 >= pm3 and pm10 < pm4): | |
aqipm10 = ((aqi4 - aqi3) / (pm4 - pm3)) * (pm10 - pm3) + aqi3 | |
elif (pm10 >= pm4 and pm10 < pm5): | |
aqipm10 = ((aqi5 - aqi4) / (pm5 - pm4)) * (pm10 - pm4) + aqi4 | |
elif (pm10 >= pm5 and pm10 < pm6): | |
aqipm10 = ((aqi6 - aqi5) / (pm6 - pm5)) * (pm10 - pm5) + aqi5 | |
elif (pm10 >= pm6 and pm10 < pm7): | |
aqipm10 = ((aqi7 - aqi6) / (pm7 - pm6)) * (pm10 - pm6) + aqi6 | |
elif (pm10 >= pm7 and pm10 <= pm8): | |
aqipm10 = ((aqi8 - aqi7) / (pm8 - pm7)) * (pm10 - pm7) + aqi7 | |
return round(aqipm10) | |
def air_quality(index): | |
if index >=0 and index <=50: | |
level = "Green" | |
elif index >=51 and index <=100: | |
level = "Yellow" | |
elif index >=101 and index <=150: | |
level = "Orange" | |
elif index >=151 and index <=200: | |
level = "Red" | |
elif index >=201 and index <=300: | |
level = "Purple" | |
elif index >=301 and index <=500: | |
level = "Maroon" | |
return level | |
# ========================================================== | |
# Main | |
# AQI MAIN | |
def main(): | |
# Main program block | |
lcd_init() | |
cmd_set_sleep(0) | |
cmd_firmware_ver() | |
cmd_set_working_period(PERIOD_CONTINUOUS) | |
cmd_set_mode(MODE_QUERY); | |
while True: | |
cmd_set_sleep(0) | |
for t in range(15): | |
values = cmd_query_data(); | |
if values is not None and len(values) == 2: | |
# Write to lcd | |
pm25_per = str(values[0]) | |
pm10_per = str(values[1]) | |
air_pm25 = str(calcAQIpm25(values[0])) | |
air_pm10 = str(calcAQIpm10(values[1])) | |
air_q_25 = air_quality(calcAQIpm25(values[0])) | |
air_q_10 = air_quality(calcAQIpm10(values[1])) | |
lcd_string(pm25_per + "|" + air_pm25 +"|" + air_q_25 , LCD_LINE_1) | |
lcd_string(pm10_per + "|" + air_pm10 +"|" + air_q_10, LCD_LINE_2) | |
time.sleep(2) | |
if __name__ == '__main__': | |
try: | |
main() | |
except KeyboardInterrupt: | |
pass | |
finally: | |
lcd_byte(0x01, LCD_CMD) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment