Skip to content

Instantly share code, notes, and snippets.

@manchicken
Last active January 1, 2025 19:05
Show Gist options
  • Save manchicken/36b6d530eec03871f5a5e24834a2266a to your computer and use it in GitHub Desktop.
Save manchicken/36b6d530eec03871f5a5e24834a2266a to your computer and use it in GitHub Desktop.
import os
import time
import binascii
import ssl
import wifi
import socketpool
import adafruit_connection_manager
import adafruit_minimqtt.adafruit_minimqtt as MQTT
use_adafruit_io=False
input_feed=os.getenv('input_feed')
output_feed=os.getenv('output_feed')
ssid=os.getenv('CIRCUITPY_WIFI_SSID')
ssid_pw=os.getenv('CIRCUITPY_WIFI_PASSWORD')
broker=os.getenv('aio_broker') if use_adafruit_io else os.getenv('mqtt_broker')
port=int(os.getenv('aio_port')) if use_adafruit_io else os.getenv('mqtt_port')
username=os.getenv('aio_username') if use_adafruit_io else os.getenv('mqtt_user')
password=os.getenv('aio_key') if use_adafruit_io else os.getenv('mqtt_password')
## DEBUG CODE
print(f"input_feed: {input_feed}")
print(f"output_feed: {output_feed}")
print(f"ssid: {ssid}")
print(f"broker: {broker}")
print(f"port: {port}")
print(f"username: {username}")
## WiFi setup
print(f"Connecting to SSID {ssid}")
wifi.radio.connect(ssid, ssid_pw)
print(f"Connected to SSID {ssid}")
sockpool=socketpool.SocketPool(wifi.radio)
ssl_context=ssl.create_default_context()
# Initialize a new MQTT Client object
mqtt_client = MQTT.MQTT(
broker=broker,
port=port,
username=username,
password=password,
socket_pool=sockpool,
ssl_context=ssl_context,
use_binary_mode=True,
)
print("Connecting to the MQTT broker...")
mqtt_client.connect(True)
print(f"Maximum MQTT payload size is: ", mqtt_client.mqtt_msg)
def generate_payload():
# Generate 13KB of ASCII text using repeated 'A'
payload_size_kb = 2.834
payload = "A" * int(payload_size_kb * 1024) # 1KB = 1024 bytes
return payload
# Snap a photo and publish it to the broker
def send_payload():
mqtt_client.publish(output_feed, generate_payload())
## Main Loop
while True:
# Take a photo
try:
send_payload()
except MemoryError as e:
print("Exception: ", e)
pass
# Last!
time.sleep(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment