Created
July 26, 2016 20:01
-
-
Save RobinDaugherty/a411e8ce131b734a0db6f65877a4689f to your computer and use it in GitHub Desktop.
Pressure monitor on Raspberry Pi 3 with MCP3008 ADC and Nokia 5110 display
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import time | |
| import datetime | |
| import array | |
| import Adafruit_GPIO.SPI as SPI | |
| import RPi.GPIO as GPIO | |
| import Adafruit_Nokia_LCD as LCD | |
| import Adafruit_MCP3008 | |
| from m2x.client import M2XClient | |
| import Image | |
| import ImageDraw | |
| import ImageFont | |
| m2x_api_key = '' | |
| m2x_client_device_id = '' | |
| class PressureReader: | |
| def __init__(self): | |
| # Software SPI configuration for MCP3008: | |
| # clk_pin = 18 | |
| # miso_pin = 23 | |
| # mosi_pin = 24 | |
| # cs_pin = 25 | |
| # self.mcp = Adafruit_MCP3008.MCP3008(clk=clk_pin, cs=cs_pin, miso=miso_pin, mosi=mosi_pin) | |
| self.adc_input = 0 | |
| # Hardware SPI configuration: | |
| spi_port = 0 | |
| spi_device = 0 | |
| self.mcp = Adafruit_MCP3008.MCP3008(spi=SPI.SpiDev(spi_port, spi_device)) | |
| def read(self): | |
| self.raw_reading = self.mcp.read_adc(self.adc_input) | |
| def current_raw_reading(self): | |
| """Returns the reading as an integer""" | |
| return self.raw_reading | |
| def current_psi(self): | |
| # 12 PSI = 242 | |
| # 0 PSI = 141 | |
| # 11 PSI = 237 | |
| reading_floor = 141 | |
| reading_divisor = 10.0 | |
| return (self.current_raw_reading() - reading_floor) / reading_divisor | |
| def print_all(self): | |
| """Prints the state of all inputs""" | |
| values = [0]*8 | |
| for i in range(8): | |
| # The read_adc function will get the value of the specified channel (0-7). | |
| values[i] = self.mcp.read_adc(i) | |
| # Print the ADC values. | |
| print('| {0:>4} | {1:>4} | {2:>4} | {3:>4} | {4:>4} | {5:>4} | {6:>4} | {7:>4} |'.format(*values)) | |
| class Display: | |
| def __init__(self): | |
| # Nokia 5110 display Software SPI: | |
| # sclk_pin = 5 | |
| # din_pin = 6 | |
| dc_pin = 12 | |
| # cs_pin = 19 | |
| rst_pin = 16 | |
| # self.display = LCD.PCD8544(dc_pin, rst_pin, sclk_pin, din_pin, cs_pin) | |
| spi_port = 0 | |
| spi_device = 1 | |
| spi = spi=SPI.SpiDev(spi_port, spi_device, max_speed_hz=4000000) | |
| self.display = LCD.PCD8544(dc=dc_pin, rst=rst_pin, spi=spi) | |
| backlight_pin = 20 | |
| GPIO.setup(backlight_pin, GPIO.OUT) | |
| self.backlight = GPIO.PWM(backlight_pin, 60) | |
| self.backlight.start(40) | |
| self.font = ImageFont.load_default() | |
| # Alternatively load a TTF font. | |
| # Some nice fonts to try: http://www.dafont.com/bitmap.php | |
| # font = ImageFont.truetype('Minecraftia.ttf', 8) | |
| self.display.begin(contrast=60) | |
| self.display.clear() | |
| self.display.display() | |
| def set_display_brightness(self, brightness): | |
| """Sets the display brightness to an integer in the range 0-100""" | |
| self.backlight.ChangeDutyCycle(brightness) | |
| pass | |
| def set_current_pressure(self, pressure_in_psi, raw_reading=None): | |
| """Adds the current pressure to the display""" | |
| self.current_pressure_in_psi = pressure_in_psi | |
| self.raw_reading = raw_reading | |
| now = datetime.datetime.now().isoformat() | |
| print('{2}: {0:.2f} PSI, {1} raw'.format(pressure_in_psi, raw_reading, now)) | |
| self.update_display() | |
| def update_display(self): | |
| """Updates the display with the current state""" | |
| # Create blank image for drawing. | |
| # Make sure to create image with mode '1' for 1-bit color. | |
| image = Image.new('1', (LCD.LCDWIDTH, LCD.LCDHEIGHT)) | |
| # Get drawing object to draw on image. | |
| draw = ImageDraw.Draw(image) | |
| # Draw a white filled box to clear the image. | |
| draw.rectangle((0,0,LCD.LCDWIDTH,LCD.LCDHEIGHT), outline=255, fill=255) | |
| # Write some text. | |
| if self.raw_reading: | |
| draw.text((0,50), '{0} raw'.format(self.raw_reading), font=self.font) | |
| draw.text((0,30), '{0:.2f} PSI'.format(self.current_pressure_in_psi), font=self.font) | |
| # Display image. | |
| self.display.clear() | |
| self.display.image(image) | |
| self.display.display() | |
| class Reporter: | |
| def __init__(self, report_interval_seconds): | |
| self.client = M2XClient(key=m2x_api_key) | |
| self.device = self.client.device(m2x_client_device_id) | |
| self.stream = self.device.stream('filter_pressure') | |
| self.report_interval = 5.0 | |
| self.reset_next_report_time() | |
| self.recent_readings = array.array('f') | |
| def add_reading_to_average(self, pressure_in_psi): | |
| """Add the reading to an array so that it can be used to later calculate the average""" | |
| self.recent_readings.append(pressure_in_psi) | |
| def average_reading(self): | |
| """Get the current reading and reset the list of stored values""" | |
| avg = float(sum(self.recent_readings)) / len(self.recent_readings) | |
| del self.recent_readings[:] | |
| return avg | |
| def is_it_time_to_report(self): | |
| return datetime.datetime.now() >= self.next_report_time | |
| def reset_next_report_time(self): | |
| self.next_report_time = datetime.datetime.now() + datetime.timedelta(seconds=self.report_interval) | |
| def set_filter_pressure(self, pressure_in_psi): | |
| """Stores the current value and sends the current pressure to the reporting system""" | |
| self.add_reading_to_average(pressure_in_psi) | |
| if self.is_it_time_to_report(): | |
| try: | |
| self.stream.add_value(self.average_reading()) | |
| self.reset_next_report_time() | |
| except requests.exceptions.HTTPError: | |
| print("AT&T M2X failure.") | |
| pressure_reader = PressureReader() | |
| display = Display() | |
| reporter = Reporter(report_interval_seconds=5) | |
| print('Reading pressure, press Ctrl-C to quit...') | |
| while True: | |
| pressure_reader.read() | |
| # pressure_reader.print_all() | |
| display.set_current_pressure(pressure_reader.current_psi(), pressure_reader.current_raw_reading()) | |
| reporter.set_filter_pressure(pressure_reader.current_psi()) | |
| time.sleep(0.1) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment