Created
May 24, 2020 01:28
-
-
Save ajmontag/3a5fdd4bf24d209c568a7e25524dd3fc to your computer and use it in GitHub Desktop.
Read from SCD30 Sensor on Raspberry PI
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# coding=utf-8 | |
# | |
# Based on https://github.com/UnravelTEC/Raspi-Driver-SCD30 | |
# Copyright © 2018 UnravelTEC | |
# Michael Maier <[email protected]> | |
# | |
# This program is free software: you can redistribute it and/or modify | |
# it under the terms of the GNU General Public License as published by | |
# the Free Software Foundation, either version 3 of the License, or | |
# (at your option) any later version. | |
# | |
# This program is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General Public License for more details. | |
# | |
# You should have received a copy of the GNU General Public License | |
# along with this program. If not, see <https://www.gnu.org/licenses/>. | |
# | |
# If you want to relicense this code under another license, please contact [email protected]. | |
# hints from https://www.raspberrypi.org/forums/viewtopic.php?p=600515#p600515 | |
from __future__ import print_function | |
# This module uses the services of the C pigpio library. pigpio must be running on the Pi(s) whose GPIO are to be manipulated. | |
# cmd ref: http://abyz.me.uk/rpi/pigpio/python.html#i2c_write_byte_data | |
import pigpio # aptitude install python-pigpio | |
import time | |
import struct | |
import sys | |
import crcmod # aptitude install python-crcmod | |
import optparse | |
import logging | |
from logging import debug, warn, info | |
class SCD30: | |
def __init__(self, pi, bus_num, addr=0x61): | |
self.pi = pi | |
self.bus_num = bus_num | |
self.addr = addr | |
if not self.pi.connected: | |
warn("no connection to pigpio daemon") | |
exit(1) | |
try: | |
self.pi.i2c_close(0) | |
except: | |
if sys.exc_value and str(sys.exc_value) != "'unknown handle'": | |
warn("Unknown error: ", sys.exc_type, ":", sys.exc_value) | |
self.h = None | |
try: | |
self.h = self.pi.i2c_open(self.bus_num, self.addr) | |
except: | |
warn("i2c open failed") | |
exit(1) | |
def __del__(self): | |
self.pi.i2c_close(self.h) | |
def read_n_bytes(self, n): | |
try: | |
(count, data) = self.pi.i2c_read_device(self.h, n) | |
except: | |
warn("error: i2c_read failed") | |
exit(1) | |
if count == n: | |
return data | |
else: | |
warn("error: read measurement interval didnt return " + str(n) + "B") | |
return False | |
def i2cWrite(self, data): | |
try: | |
self.pi.i2c_write_device(self.h, data) | |
except: | |
warn("error: i2c_write failed") | |
return -1 | |
return True | |
# read meas interval (not documented, but works) | |
def read_meas_interval(self): | |
ret = self.i2cWrite([0x46, 0x00]) | |
if ret == -1: | |
return -1 | |
try: | |
(count, data) = self.pi.i2c_read_device(self.h, 3) | |
except: | |
warn("error: i2c_read failed") | |
exit(1) | |
if count == 3: | |
if len(data) == 3: | |
interval = int(data[0])*256 + int(data[1]) | |
#print "measurement interval: " + str(interval) + "s, checksum " + str(data[2]) | |
return interval | |
else: | |
warn("error: no array len 3 returned, instead " + str(len(data)) + "type: " + str(type(data))) | |
else: | |
"error: read measurement interval didnt return 3B" | |
return -1 | |
# def set_meas_interval(): | |
# read_meas_result = read_meas_interval() | |
# if read_meas_result == -1: | |
# warn("bad meas interval") | |
# exit(1) | |
# if read_meas_result != 2: | |
# # if not every 2s, set it | |
# info("setting interval to 2") | |
# ret = i2cWrite([0x46, 0x00, 0x00, 0x02, 0xE3]) | |
# if ret == -1: | |
# exit(1) | |
# read_meas_interval() | |
def read(self): | |
#trigger cont meas | |
pressure_mbar = 0 # use default pressure | |
LSB = 0xFF & pressure_mbar | |
MSB = 0xFF & (pressure_mbar >> 8) | |
#print ("MSB: " + hex(MSB) + " LSB: " + hex(LSB)) | |
#pressure_re = LSB + (MSB * 256) | |
#print("press " + str(pressure_re)) | |
pressure = [MSB, LSB] | |
pressure_array = ''.join(chr(x) for x in [pressure[0], pressure[1]]) | |
#pressure_array = ''.join(chr(x) for x in [0xBE, 0xEF]) # use for testing crc, should be 0x92 | |
#print pressure_array | |
f_crc8 = crcmod.mkCrcFun(0x131, 0xFF, False, 0x00) | |
crc8 = f_crc8(pressure_array) # for pressure 0, should be 0x81 | |
# print "CRC: " + hex(crc8) | |
self.i2cWrite([0x00, 0x10, pressure[0], pressure[1], crc8]) # also activates cont measurement | |
#activating ASC | |
#i2cWrite([0x53, 0x06, 0x00, 0x00, f_crc8(''.join(chr(x) for x in [0x00,0x01]))]) | |
# read ready status | |
while True: | |
ret = self.i2cWrite([0x02, 0x02]) | |
if ret == -1: | |
exit(1) | |
data = self.read_n_bytes(3) | |
if data == False: | |
time.sleep(0.1) | |
continue | |
if data[1] == 1: | |
#print "data ready" | |
break | |
else: | |
#eprint(".") | |
time.sleep(0.1) | |
#read measurement | |
self.i2cWrite([0x03, 0x00]) | |
data = self.read_n_bytes(18) | |
#print "CO2: " + str(data[0]) +" "+ str(data[1]) +" "+ str(data[3]) +" "+ str(data[4]) | |
if data == False: | |
exit(1) | |
struct_co2 = struct.pack('>BBBB', data[0], data[1], data[3], data[4]) | |
float_co2 = struct.unpack('>f', struct_co2)[0] | |
struct_T = struct.pack('>BBBB', data[6], data[7], data[9], data[10]) | |
float_T = struct.unpack('>f', struct_T)[0] | |
struct_rH = struct.pack('>BBBB', data[12], data[13], data[15], data[16]) | |
float_rH = struct.unpack('>f', struct_rH)[0] | |
data = dict() | |
data['CO2'] = float_co2 | |
#data['temperature_degC'] = float_T | |
data['temperature_degF'] = (float_T * 9.0 / 5.0) + 32 | |
data['humidity'] = float_rH | |
info(data) | |
return data | |
def parseOptions(argv, progName, progDesc): | |
optParser = optparse.OptionParser(prog=progName, description=progDesc) | |
optParser.add_option("-b", "--bus", | |
help="Bus Number", | |
type="int", | |
default=1) | |
optParser.add_option("-v", "--verbose", action="count", help="verbose") | |
options, extraArgs = optParser.parse_args(argv) | |
if extraArgs: | |
optParser.print_help() | |
exit(1) | |
return options | |
def main(): | |
options = parseOptions(sys.argv[1:], sys.argv[0], "SCD30 Control") | |
if options.verbose >= 2: | |
logging.basicConfig(level=logging.DEBUG, | |
format='%(asctime)s %(message)s', | |
datefmt='%d %b %Y %H:%M:%S') | |
elif options.verbose == 1: | |
logging.basicConfig(level=logging.INFO, | |
format='%(asctime)s %(message)s', | |
datefmt='%d %b %Y %H:%M:%S') | |
else: | |
logging.basicConfig(level=logging.WARN, | |
format='%(asctime)s %(message)s', | |
datefmt='%d %b %Y %H:%M:%S') | |
PIGPIO_HOST = '::1' | |
PIGPIO_HOST = '127.0.0.1' | |
pi = pigpio.pi(PIGPIO_HOST) | |
sensor = SCD30(pi, options.bus) | |
print(sensor.read()) | |
exit(0) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment