Last active
December 10, 2024 16:08
-
-
Save plaffitt/9e3e86ed59e57d9a27b30e8176b05382 to your computer and use it in GitHub Desktop.
ESP32 ntfy.sh on GPIO
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This script aims to monitor the state of a circuit (open or closed) using an ESP32 and send real-time notifications | |
# to an ntfy.sh topic whenever the circuit's state changes. | |
# | |
# The WiFi credentials, notification topic, and configurable parameters such as the minimum notification interval are | |
# defined at the beginning of the script for easy customization. | |
# | |
# It uses interrupts for efficient state detection and sends HTTP POST requests to ntfy.sh without relying on external | |
# libraries, making it lightweight and straightforward. | |
# | |
# DISCLAIMER: Highly written with the help of an LLM for the sake of my precious time, but works nicely for my needs. | |
# | |
# Author: Paul LAFFITTE <[email protected]> | |
# Date: December 2024 | |
import machine | |
from machine import Pin | |
from time import time, sleep | |
import network | |
import socket | |
# Variable for you to configure | |
ssid = "<SSID>" | |
password = "<PASSWORD>" | |
ntfy_topic = "<TOPIC>" | |
ntfy_host = "ntfy.sh" | |
ntfy_port = 80 | |
notification_min_interval = 5 | |
# Don't touch this one | |
last_notification_time = 0 | |
def connect_wifi(): | |
wlan = network.WLAN(network.STA_IF) | |
wlan.active(True) | |
wlan.connect(ssid, password) | |
while not wlan.isconnected(): | |
pass | |
print("connected to WiFi:", wlan.ifconfig()) | |
return wlan | |
def send_notification(message): | |
print(message) | |
if not wlan.isconnected(): | |
machine.reset() | |
url = f"/{ntfy_topic}" | |
content = message.encode('utf-8') | |
# Create a socket and send HTTP POST | |
addr = socket.getaddrinfo(ntfy_host, ntfy_port)[0][-1] | |
s = socket.socket() | |
s.connect(addr) | |
s.sendall(b"POST " + url.encode() + b" HTTP/1.1\r\n" + | |
b"Host: " + ntfy_host.encode() + b"\r\n" + | |
b"Content-Type: text/plain\r\n" + | |
b"Content-Length: " + str(len(content)).encode() + b"\r\n" + | |
b"\r\n" + | |
content) | |
response = s.recv(1024) | |
print(response.decode()) | |
s.close() | |
def callback(pin): | |
global last_notification_time | |
current_time = time() | |
if current_time - last_notification_time >= notification_min_interval: | |
print(current_time, last_notification_time, current_time - last_notification_time) | |
state = "closed" if pin.value() == 0 else "open" | |
send_notification("change: " + state) | |
last_notification_time = current_time | |
print("connecting...") | |
wlan = connect_wifi() | |
pin = Pin(21, Pin.IN, Pin.PULL_UP) | |
pin.irq(trigger=Pin.IRQ_FALLING | Pin.IRQ_RISING, handler=callback) | |
while True: | |
print("sleep 10") | |
sleep(10) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment