Skip to content

Instantly share code, notes, and snippets.

@plaffitt
Last active December 10, 2024 16:08
Show Gist options
  • Save plaffitt/9e3e86ed59e57d9a27b30e8176b05382 to your computer and use it in GitHub Desktop.
Save plaffitt/9e3e86ed59e57d9a27b30e8176b05382 to your computer and use it in GitHub Desktop.
ESP32 ntfy.sh on GPIO
# This script aims to monitor the state of a circuit (open or closed) using an ESP32 and send real-time notifications
# to an ntfy.sh topic whenever the circuit's state changes.
#
# The WiFi credentials, notification topic, and configurable parameters such as the minimum notification interval are
# defined at the beginning of the script for easy customization.
#
# It uses interrupts for efficient state detection and sends HTTP POST requests to ntfy.sh without relying on external
# libraries, making it lightweight and straightforward.
#
# DISCLAIMER: Highly written with the help of an LLM for the sake of my precious time, but works nicely for my needs.
#
# Author: Paul LAFFITTE <[email protected]>
# Date: December 2024
import machine
from machine import Pin
from time import time, sleep
import network
import socket
# Variable for you to configure
ssid = "<SSID>"
password = "<PASSWORD>"
ntfy_topic = "<TOPIC>"
ntfy_host = "ntfy.sh"
ntfy_port = 80
notification_min_interval = 5
# Don't touch this one
last_notification_time = 0
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(ssid, password)
while not wlan.isconnected():
pass
print("connected to WiFi:", wlan.ifconfig())
return wlan
def send_notification(message):
print(message)
if not wlan.isconnected():
machine.reset()
url = f"/{ntfy_topic}"
content = message.encode('utf-8')
# Create a socket and send HTTP POST
addr = socket.getaddrinfo(ntfy_host, ntfy_port)[0][-1]
s = socket.socket()
s.connect(addr)
s.sendall(b"POST " + url.encode() + b" HTTP/1.1\r\n" +
b"Host: " + ntfy_host.encode() + b"\r\n" +
b"Content-Type: text/plain\r\n" +
b"Content-Length: " + str(len(content)).encode() + b"\r\n" +
b"\r\n" +
content)
response = s.recv(1024)
print(response.decode())
s.close()
def callback(pin):
global last_notification_time
current_time = time()
if current_time - last_notification_time >= notification_min_interval:
print(current_time, last_notification_time, current_time - last_notification_time)
state = "closed" if pin.value() == 0 else "open"
send_notification("change: " + state)
last_notification_time = current_time
print("connecting...")
wlan = connect_wifi()
pin = Pin(21, Pin.IN, Pin.PULL_UP)
pin.irq(trigger=Pin.IRQ_FALLING | Pin.IRQ_RISING, handler=callback)
while True:
print("sleep 10")
sleep(10)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment