Skip to content

Instantly share code, notes, and snippets.

@DavesCodeMusings
Created June 3, 2023 19:10
Show Gist options
  • Select an option

  • Save DavesCodeMusings/f520e468b8f43a419056b8db42818be0 to your computer and use it in GitHub Desktop.

Select an option

Save DavesCodeMusings/f520e468b8f43a419056b8db42818be0 to your computer and use it in GitHub Desktop.
MQTT topic on an OLED display with ESP32-C3 and SSD1306
from machine import Pin, SoftI2C, Timer
from ssd1306 import SSD1306_I2C
from network import WLAN, STA_IF
from time import ticks_diff, ticks_ms
from config import WIFI_NAME, WIFI_PASS, WIFI_TIMEOUT
BOOT_BUTTON_GPIO = 9 # ESP32 is 0, ESP32-C3 is 9
SCREEN_TIMEOUT = 30 # Seconds before blanking OLED
i2c = SoftI2C(scl=Pin(4), sda=Pin(5))
if 0x3C not in i2c.scan():
raise RuntimeError("SSD1306 I2C display not found.")
oled = SSD1306_I2C(128, 32, i2c)
# Setup screen saver using BOOT button repurposed to trigger a wake up.
def blank_screen(_):
oled.poweroff()
def wake_screen(_):
screen_saver.deinit() # Cancel (possibly) running timer
oled.poweron()
screen_saver.init(period=SCREEN_TIMEOUT*1000,
mode=Timer.ONE_SHOT, callback=blank_screen)
screen_saver = Timer(0)
screen_saver.init(period=SCREEN_TIMEOUT*1000,
mode=Timer.ONE_SHOT, callback=blank_screen)
boot_button = Pin(BOOT_BUTTON_GPIO, Pin.IN, Pin.PULL_UP)
boot_button.irq(trigger=Pin.IRQ_FALLING, handler=wake_screen)
# Show WiFi connection status on OLED as well as debug.
wlan = WLAN(STA_IF)
wlan.active(True)
print(f"Connecting to SSID {WIFI_NAME}...", end='')
oled.fill(0)
oled.text("Connecting...", 0, 0)
oled.show()
wlan.connect(WIFI_NAME, WIFI_PASS)
start_time = ticks_ms()
while not wlan.isconnected():
if (ticks_diff(ticks_ms(), start_time) > WIFI_TIMEOUT * 1000):
print(".", end='')
if (wlan.isconnected()):
print(f"\n{wlan.ifconfig()[0]}")
oled.text("DHCP Address:", 0, 12)
oled.text(f"{wlan.ifconfig()[0]}", 0, 24)
else:
print("Timed out.")
oled.text("No connection.", 0, 0)
oled.show()
WIFI_NAME = "YourAP"
WIFI_PASS = "YourPassword"
WIFI_TIMEOUT = 30
MQTT_BROKER = "IP.AD.DR.ESS"
MQTT_TOPIC = "Test"
MQTT_CLIENT_ID = "ESP32-C3"
from umqtt.simple import MQTTClient
from time import sleep
from config import MQTT_BROKER, MQTT_TOPIC, MQTT_CLIENT_ID
def connect_and_subscribe(client, topic):
client.connect()
client.subscribe(topic)
client.publish(MQTT_TOPIC.encode('utf-8'),
(MQTT_CLIENT_ID + "\r\njoined the chat.").encode('utf-8'))
def show_msg(topic, msg):
print(f"{topic.decode('utf-8')}: {msg.decode('utf-8')}")
lines = msg.splitlines()
oled.fill(0)
oled.text(topic.decode('utf-8'), 0, 0)
oled.hline(0, 10, 128, 1)
if len(lines) > 0:
oled.text(lines[0].decode('utf-8'), 0, 14)
if len(lines) > 1:
oled.text(lines[1].decode('utf-8'), 0, 24)
oled.show()
wake_screen(0)
# Give time for reading boot messages, then clear.
sleep(5)
oled.fill(0)
oled.show()
client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER)
client.set_callback(show_msg)
connect_and_subscribe(client, MQTT_TOPIC.encode('utf-8'))
while True:
try:
client.check_msg()
except Exception as e:
print(f"MQTT connection problem: {e}")
connect_and_subscribe(client, MQTT_TOPIC.encode('utf-8'))
sleep(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment