Created
June 3, 2023 19:10
-
-
Save DavesCodeMusings/f520e468b8f43a419056b8db42818be0 to your computer and use it in GitHub Desktop.
MQTT topic on an OLED display with ESP32-C3 and SSD1306
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from machine import Pin, SoftI2C, Timer | |
| from ssd1306 import SSD1306_I2C | |
| from network import WLAN, STA_IF | |
| from time import ticks_diff, ticks_ms | |
| from config import WIFI_NAME, WIFI_PASS, WIFI_TIMEOUT | |
| BOOT_BUTTON_GPIO = 9 # ESP32 is 0, ESP32-C3 is 9 | |
| SCREEN_TIMEOUT = 30 # Seconds before blanking OLED | |
| i2c = SoftI2C(scl=Pin(4), sda=Pin(5)) | |
| if 0x3C not in i2c.scan(): | |
| raise RuntimeError("SSD1306 I2C display not found.") | |
| oled = SSD1306_I2C(128, 32, i2c) | |
| # Setup screen saver using BOOT button repurposed to trigger a wake up. | |
| def blank_screen(_): | |
| oled.poweroff() | |
| def wake_screen(_): | |
| screen_saver.deinit() # Cancel (possibly) running timer | |
| oled.poweron() | |
| screen_saver.init(period=SCREEN_TIMEOUT*1000, | |
| mode=Timer.ONE_SHOT, callback=blank_screen) | |
| screen_saver = Timer(0) | |
| screen_saver.init(period=SCREEN_TIMEOUT*1000, | |
| mode=Timer.ONE_SHOT, callback=blank_screen) | |
| boot_button = Pin(BOOT_BUTTON_GPIO, Pin.IN, Pin.PULL_UP) | |
| boot_button.irq(trigger=Pin.IRQ_FALLING, handler=wake_screen) | |
| # Show WiFi connection status on OLED as well as debug. | |
| wlan = WLAN(STA_IF) | |
| wlan.active(True) | |
| print(f"Connecting to SSID {WIFI_NAME}...", end='') | |
| oled.fill(0) | |
| oled.text("Connecting...", 0, 0) | |
| oled.show() | |
| wlan.connect(WIFI_NAME, WIFI_PASS) | |
| start_time = ticks_ms() | |
| while not wlan.isconnected(): | |
| if (ticks_diff(ticks_ms(), start_time) > WIFI_TIMEOUT * 1000): | |
| print(".", end='') | |
| if (wlan.isconnected()): | |
| print(f"\n{wlan.ifconfig()[0]}") | |
| oled.text("DHCP Address:", 0, 12) | |
| oled.text(f"{wlan.ifconfig()[0]}", 0, 24) | |
| else: | |
| print("Timed out.") | |
| oled.text("No connection.", 0, 0) | |
| oled.show() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| WIFI_NAME = "YourAP" | |
| WIFI_PASS = "YourPassword" | |
| WIFI_TIMEOUT = 30 | |
| MQTT_BROKER = "IP.AD.DR.ESS" | |
| MQTT_TOPIC = "Test" | |
| MQTT_CLIENT_ID = "ESP32-C3" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from umqtt.simple import MQTTClient | |
| from time import sleep | |
| from config import MQTT_BROKER, MQTT_TOPIC, MQTT_CLIENT_ID | |
| def connect_and_subscribe(client, topic): | |
| client.connect() | |
| client.subscribe(topic) | |
| client.publish(MQTT_TOPIC.encode('utf-8'), | |
| (MQTT_CLIENT_ID + "\r\njoined the chat.").encode('utf-8')) | |
| def show_msg(topic, msg): | |
| print(f"{topic.decode('utf-8')}: {msg.decode('utf-8')}") | |
| lines = msg.splitlines() | |
| oled.fill(0) | |
| oled.text(topic.decode('utf-8'), 0, 0) | |
| oled.hline(0, 10, 128, 1) | |
| if len(lines) > 0: | |
| oled.text(lines[0].decode('utf-8'), 0, 14) | |
| if len(lines) > 1: | |
| oled.text(lines[1].decode('utf-8'), 0, 24) | |
| oled.show() | |
| wake_screen(0) | |
| # Give time for reading boot messages, then clear. | |
| sleep(5) | |
| oled.fill(0) | |
| oled.show() | |
| client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER) | |
| client.set_callback(show_msg) | |
| connect_and_subscribe(client, MQTT_TOPIC.encode('utf-8')) | |
| while True: | |
| try: | |
| client.check_msg() | |
| except Exception as e: | |
| print(f"MQTT connection problem: {e}") | |
| connect_and_subscribe(client, MQTT_TOPIC.encode('utf-8')) | |
| sleep(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment