Skip to content

Instantly share code, notes, and snippets.

@RobinDaugherty
Created July 26, 2016 20:01
Show Gist options
  • Select an option

  • Save RobinDaugherty/a411e8ce131b734a0db6f65877a4689f to your computer and use it in GitHub Desktop.

Select an option

Save RobinDaugherty/a411e8ce131b734a0db6f65877a4689f to your computer and use it in GitHub Desktop.
Pressure monitor on Raspberry Pi 3 with MCP3008 ADC and Nokia 5110 display
import time
import datetime
import array
import Adafruit_GPIO.SPI as SPI
import RPi.GPIO as GPIO
import Adafruit_Nokia_LCD as LCD
import Adafruit_MCP3008
from m2x.client import M2XClient
import Image
import ImageDraw
import ImageFont
m2x_api_key = ''
m2x_client_device_id = ''
class PressureReader:
def __init__(self):
# Software SPI configuration for MCP3008:
# clk_pin = 18
# miso_pin = 23
# mosi_pin = 24
# cs_pin = 25
# self.mcp = Adafruit_MCP3008.MCP3008(clk=clk_pin, cs=cs_pin, miso=miso_pin, mosi=mosi_pin)
self.adc_input = 0
# Hardware SPI configuration:
spi_port = 0
spi_device = 0
self.mcp = Adafruit_MCP3008.MCP3008(spi=SPI.SpiDev(spi_port, spi_device))
def read(self):
self.raw_reading = self.mcp.read_adc(self.adc_input)
def current_raw_reading(self):
"""Returns the reading as an integer"""
return self.raw_reading
def current_psi(self):
# 12 PSI = 242
# 0 PSI = 141
# 11 PSI = 237
reading_floor = 141
reading_divisor = 10.0
return (self.current_raw_reading() - reading_floor) / reading_divisor
def print_all(self):
"""Prints the state of all inputs"""
values = [0]*8
for i in range(8):
# The read_adc function will get the value of the specified channel (0-7).
values[i] = self.mcp.read_adc(i)
# Print the ADC values.
print('| {0:>4} | {1:>4} | {2:>4} | {3:>4} | {4:>4} | {5:>4} | {6:>4} | {7:>4} |'.format(*values))
class Display:
def __init__(self):
# Nokia 5110 display Software SPI:
# sclk_pin = 5
# din_pin = 6
dc_pin = 12
# cs_pin = 19
rst_pin = 16
# self.display = LCD.PCD8544(dc_pin, rst_pin, sclk_pin, din_pin, cs_pin)
spi_port = 0
spi_device = 1
spi = spi=SPI.SpiDev(spi_port, spi_device, max_speed_hz=4000000)
self.display = LCD.PCD8544(dc=dc_pin, rst=rst_pin, spi=spi)
backlight_pin = 20
GPIO.setup(backlight_pin, GPIO.OUT)
self.backlight = GPIO.PWM(backlight_pin, 60)
self.backlight.start(40)
self.font = ImageFont.load_default()
# Alternatively load a TTF font.
# Some nice fonts to try: http://www.dafont.com/bitmap.php
# font = ImageFont.truetype('Minecraftia.ttf', 8)
self.display.begin(contrast=60)
self.display.clear()
self.display.display()
def set_display_brightness(self, brightness):
"""Sets the display brightness to an integer in the range 0-100"""
self.backlight.ChangeDutyCycle(brightness)
pass
def set_current_pressure(self, pressure_in_psi, raw_reading=None):
"""Adds the current pressure to the display"""
self.current_pressure_in_psi = pressure_in_psi
self.raw_reading = raw_reading
now = datetime.datetime.now().isoformat()
print('{2}: {0:.2f} PSI, {1} raw'.format(pressure_in_psi, raw_reading, now))
self.update_display()
def update_display(self):
"""Updates the display with the current state"""
# Create blank image for drawing.
# Make sure to create image with mode '1' for 1-bit color.
image = Image.new('1', (LCD.LCDWIDTH, LCD.LCDHEIGHT))
# Get drawing object to draw on image.
draw = ImageDraw.Draw(image)
# Draw a white filled box to clear the image.
draw.rectangle((0,0,LCD.LCDWIDTH,LCD.LCDHEIGHT), outline=255, fill=255)
# Write some text.
if self.raw_reading:
draw.text((0,50), '{0} raw'.format(self.raw_reading), font=self.font)
draw.text((0,30), '{0:.2f} PSI'.format(self.current_pressure_in_psi), font=self.font)
# Display image.
self.display.clear()
self.display.image(image)
self.display.display()
class Reporter:
def __init__(self, report_interval_seconds):
self.client = M2XClient(key=m2x_api_key)
self.device = self.client.device(m2x_client_device_id)
self.stream = self.device.stream('filter_pressure')
self.report_interval = 5.0
self.reset_next_report_time()
self.recent_readings = array.array('f')
def add_reading_to_average(self, pressure_in_psi):
"""Add the reading to an array so that it can be used to later calculate the average"""
self.recent_readings.append(pressure_in_psi)
def average_reading(self):
"""Get the current reading and reset the list of stored values"""
avg = float(sum(self.recent_readings)) / len(self.recent_readings)
del self.recent_readings[:]
return avg
def is_it_time_to_report(self):
return datetime.datetime.now() >= self.next_report_time
def reset_next_report_time(self):
self.next_report_time = datetime.datetime.now() + datetime.timedelta(seconds=self.report_interval)
def set_filter_pressure(self, pressure_in_psi):
"""Stores the current value and sends the current pressure to the reporting system"""
self.add_reading_to_average(pressure_in_psi)
if self.is_it_time_to_report():
try:
self.stream.add_value(self.average_reading())
self.reset_next_report_time()
except requests.exceptions.HTTPError:
print("AT&T M2X failure.")
pressure_reader = PressureReader()
display = Display()
reporter = Reporter(report_interval_seconds=5)
print('Reading pressure, press Ctrl-C to quit...')
while True:
pressure_reader.read()
# pressure_reader.print_all()
display.set_current_pressure(pressure_reader.current_psi(), pressure_reader.current_raw_reading())
reporter.set_filter_pressure(pressure_reader.current_psi())
time.sleep(0.1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment