Last active
January 1, 2025 19:05
-
-
Save manchicken/36b6d530eec03871f5a5e24834a2266a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import time | |
import binascii | |
import ssl | |
import wifi | |
import socketpool | |
import adafruit_connection_manager | |
import adafruit_minimqtt.adafruit_minimqtt as MQTT | |
use_adafruit_io=False | |
input_feed=os.getenv('input_feed') | |
output_feed=os.getenv('output_feed') | |
ssid=os.getenv('CIRCUITPY_WIFI_SSID') | |
ssid_pw=os.getenv('CIRCUITPY_WIFI_PASSWORD') | |
broker=os.getenv('aio_broker') if use_adafruit_io else os.getenv('mqtt_broker') | |
port=int(os.getenv('aio_port')) if use_adafruit_io else os.getenv('mqtt_port') | |
username=os.getenv('aio_username') if use_adafruit_io else os.getenv('mqtt_user') | |
password=os.getenv('aio_key') if use_adafruit_io else os.getenv('mqtt_password') | |
## DEBUG CODE | |
print(f"input_feed: {input_feed}") | |
print(f"output_feed: {output_feed}") | |
print(f"ssid: {ssid}") | |
print(f"broker: {broker}") | |
print(f"port: {port}") | |
print(f"username: {username}") | |
## WiFi setup | |
print(f"Connecting to SSID {ssid}") | |
wifi.radio.connect(ssid, ssid_pw) | |
print(f"Connected to SSID {ssid}") | |
sockpool=socketpool.SocketPool(wifi.radio) | |
ssl_context=ssl.create_default_context() | |
# Initialize a new MQTT Client object | |
mqtt_client = MQTT.MQTT( | |
broker=broker, | |
port=port, | |
username=username, | |
password=password, | |
socket_pool=sockpool, | |
ssl_context=ssl_context, | |
use_binary_mode=True, | |
) | |
print("Connecting to the MQTT broker...") | |
mqtt_client.connect(True) | |
print(f"Maximum MQTT payload size is: ", mqtt_client.mqtt_msg) | |
def generate_payload(): | |
# Generate 13KB of ASCII text using repeated 'A' | |
payload_size_kb = 2.834 | |
payload = "A" * int(payload_size_kb * 1024) # 1KB = 1024 bytes | |
return payload | |
# Snap a photo and publish it to the broker | |
def send_payload(): | |
mqtt_client.publish(output_feed, generate_payload()) | |
## Main Loop | |
while True: | |
# Take a photo | |
try: | |
send_payload() | |
except MemoryError as e: | |
print("Exception: ", e) | |
pass | |
# Last! | |
time.sleep(1) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment