Created
October 28, 2022 21:53
-
-
Save rpavlik/041a2bef03765f35b50099d5b3c9b212 to your computer and use it in GitHub Desktop.
creature eyes mqtt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# SPDX-FileCopyrightText: 2020 Phillip Burgess for Adafruit Industries | |
# SPDX-FileCopyrightText: 2021-2022 Ryan Pavlik <[email protected]> | |
# | |
# SPDX-License-Identifier: MIT | |
""" | |
RASTER EYES for Adafruit Matrix Portal: animated spooky eyes. | |
Updated by Ryan to hide the head fully and have MQTT control | |
""" | |
# pylint: disable=import-error | |
import math | |
import random | |
import time | |
import displayio | |
import adafruit_imageload | |
from adafruit_matrixportal.matrix import Matrix | |
import microcontroller | |
# import neopixel | |
from network import mqtt_client, status_light, root_topic | |
from interval import Interval | |
# TO LOAD DIFFERENT EYE DESIGNS: change the middle word here (between | |
# 'eyes.' and '.data') to one of the folder names inside the 'eyes' folder: | |
from eyes.werewolf.data import EYE_DATA | |
# from eyes.cyclops.data import EYE_DATA | |
# from eyes.kobold.data import EYE_DATA | |
# from eyes.adabot.data import EYE_DATA | |
# from eyes.skull.data import EYE_DATA | |
# UTILITY FUNCTIONS AND CLASSES -------------------------------------------- | |
# pylint: disable=too-few-public-methods | |
class Sprite(displayio.TileGrid): | |
"""Single-tile-with-bitmap TileGrid subclass, adds a height element | |
because TileGrid doesn't appear to have a way to poll that later, | |
object still functions in a displayio.Group. | |
""" | |
def __init__(self, filename, transparent=None): | |
"""Create Sprite object from color-paletted BMP file, optionally | |
set one color to transparent (pass as RGB tuple or list to locate | |
nearest color, or integer to use a known specific color index). | |
""" | |
bitmap, palette = adafruit_imageload.load( | |
filename, bitmap=displayio.Bitmap, palette=displayio.Palette | |
) | |
assert palette | |
if isinstance(transparent, (tuple, list)): # Find closest RGB match | |
closest_distance = 0x1000000 # Force first match | |
for color_index, color in enumerate(palette): # Compare each... | |
delta = ( | |
transparent[0] - ((color >> 16) & 0xFF), | |
transparent[1] - ((color >> 8) & 0xFF), | |
transparent[2] - (color & 0xFF), | |
) | |
rgb_distance = ( | |
delta[0] * delta[0] + delta[1] * delta[1] + delta[2] * delta[2] | |
) # Actually dist^2 | |
if rgb_distance < closest_distance: # but adequate for | |
closest_distance = rgb_distance # compare purposes, | |
closest_index = color_index # no sqrt needed | |
palette.make_transparent(closest_index) | |
elif isinstance(transparent, int): | |
palette.make_transparent(transparent) | |
super(Sprite, self).__init__(bitmap, pixel_shader=palette) | |
# ONE-TIME INITIALIZATION -------------------------------------------------- | |
MATRIX = Matrix(bit_depth=6) | |
DISPLAY = MATRIX.display | |
# Order in which sprites are added determines the 'stacking order' and | |
# visual priority. Lower lid is added before the upper lid so that if they | |
# overlap, the upper lid is 'on top' (e.g. if it has eyelashes or such). | |
SPRITES = displayio.Group() | |
SPRITES.append(Sprite(EYE_DATA["eye_image"])) # Base image is opaque | |
SPRITES.append(Sprite(EYE_DATA["lower_lid_image"], EYE_DATA["transparent"])) | |
SPRITES.append(Sprite(EYE_DATA["upper_lid_image"], EYE_DATA["transparent"])) | |
SPRITES.append(Sprite(EYE_DATA["stencil_image"], EYE_DATA["transparent"])) | |
PARENT = displayio.Group() | |
BG = None | |
if "bg_image" in EYE_DATA: | |
BG = Sprite(EYE_DATA["bg_image"]) | |
PARENT.append(BG) | |
PARENT.append(SPRITES) | |
DISPLAY.show(PARENT) | |
EYE_CENTER = ( | |
(EYE_DATA["eye_move_min"][0] + EYE_DATA["eye_move_max"][0]) # Pixel coords of eye | |
/ 2, # image when centered | |
(EYE_DATA["eye_move_min"][1] + EYE_DATA["eye_move_max"][1]) # ('neutral' position) | |
/ 2, | |
) | |
EYE_RANGE = ( | |
abs( | |
EYE_DATA["eye_move_max"][0] | |
- EYE_DATA["eye_move_min"][0] # Max eye image motion | |
) | |
/ 2, # delta from center | |
abs(EYE_DATA["eye_move_max"][1] - EYE_DATA["eye_move_min"][1]) / 2, | |
) | |
UPPER_LID_MIN = ( | |
min( | |
EYE_DATA["upper_lid_open"][0], # Motion bounds of | |
EYE_DATA["upper_lid_closed"][0], | |
), # upper and lower | |
min(EYE_DATA["upper_lid_open"][1], EYE_DATA["upper_lid_closed"][1]), # eyelids | |
) | |
UPPER_LID_MAX = ( | |
max(EYE_DATA["upper_lid_open"][0], EYE_DATA["upper_lid_closed"][0]), | |
max(EYE_DATA["upper_lid_open"][1], EYE_DATA["upper_lid_closed"][1]), | |
) | |
LOWER_LID_MIN = ( | |
min(EYE_DATA["lower_lid_open"][0], EYE_DATA["lower_lid_closed"][0]), | |
min(EYE_DATA["lower_lid_open"][1], EYE_DATA["lower_lid_closed"][1]), | |
) | |
LOWER_LID_MAX = ( | |
max(EYE_DATA["lower_lid_open"][0], EYE_DATA["lower_lid_closed"][0]), | |
max(EYE_DATA["lower_lid_open"][1], EYE_DATA["lower_lid_closed"][1]), | |
) | |
EYE_PREV = (0, 0) | |
EYE_NEXT = (0, 0) | |
class TimedStates: | |
""" | |
A state machine class, where there are some number of states, | |
a duration is set when entering a state, and the states just | |
follow one after another in a circle. | |
We can compute "how far into a state" we are with ratio(). | |
""" | |
def __init__(self, now, first_duration, first_state, num_states): | |
""" | |
Construct the base TimedStates object. | |
now: Current timestamp | |
first_duration: How long we will stay in the initial state | |
first_state: The initial state number | |
num_states: The total number of states, must be >first_state | |
""" | |
self.time_of_last = now | |
"""The time when we entered the current state.""" | |
self.duration = first_duration | |
"""The planned duration of this current state.""" | |
self.state = first_state | |
"""The current state number""" | |
self.num_states = num_states | |
"""The total number of states.""" | |
def update(self, now): | |
""" | |
Process time, checking for a state change, etc. | |
Can extend, but be sure to call the super implementation. | |
""" | |
if now - self.time_of_last > self.duration: | |
self.time_of_last = now | |
self.state = (self.state + 1) % self.num_states | |
self.handle_state_change(now, self.state) | |
def ratio(self, now): | |
"""Get the fraction of the current state that has elapsed""" | |
return (now - self.time_of_last) / self.duration | |
def reset(self, now, state, duration=None): | |
"""Reset time, state, and optionally duration.""" | |
self.time_of_last = now | |
self.state = state | |
if duration is not None: | |
self.duration = duration | |
def handle_state_change(self, now, state): | |
"""Respond to a state change - must override.""" | |
raise NotImplementedError("Must override handle_state_change") | |
class MoveStates(TimedStates): | |
"""State machine for handling periodic movement of the creature eye(s).""" | |
def __init__(self, now): | |
self.eye_next = (0, 0) | |
self.eye_prev = (0, 0) | |
super().__init__(now, random.uniform(0.1, 3), first_state=0, num_states=2) | |
def update(self, now): | |
"""Update state and return eye position tuple""" | |
super().update(now) | |
# Fraction of move elapsed (0.0 to 1.0), then ease in/out 3*e^2-2*e^3 | |
ratio = self.ratio(now) | |
ratio = 3 * ratio * ratio - 2 * ratio * ratio * ratio | |
eye_pos = ( | |
self.eye_prev[0] + ratio * (self.eye_next[0] - self.eye_prev[0]), | |
self.eye_prev[1] + ratio * (self.eye_next[1] - self.eye_prev[1]), | |
) | |
return eye_pos | |
def handle_state_change(self, now, state): | |
if state: # Starting a new move? | |
self.duration = random.uniform(0.08, 0.17) # Move time | |
angle = random.uniform(0, math.pi * 2) | |
self.eye_next = ( | |
math.cos(angle) * EYE_RANGE[0], # (0,0) in center, | |
math.sin(angle) * EYE_RANGE[1], | |
) # NOT pixel coords | |
else: # Starting a new pause | |
self.duration = random.uniform(0.04, 3) # Hold time | |
self.eye_prev = self.eye_next | |
class BlinkStates(TimedStates): | |
"""State machine for handling periodic blinking of the creature eye(s).""" | |
def __init__(self, now): | |
super().__init__(now, random.uniform(0.25, 0.5), first_state=2, num_states=3) | |
def update(self, now): | |
super().update(now) | |
if self.state: # Currently in a blink? | |
# Fraction of closing or opening elapsed (0.0 to 1.0) | |
ratio = self.ratio(now) | |
if self.state == 2: # Opening | |
ratio = 1.0 - ratio # Flip ratio so eye opens instead of closes | |
else: # Not blinking | |
ratio = 0 | |
return ratio | |
def handle_state_change(self, now, state): | |
if state == 1: # Starting a new blink (closing) | |
# print("closing") | |
self.duration = random.uniform(0.03, 0.07) | |
elif state == 2: # Starting de-blink (opening) | |
# print("opening") | |
self.duration *= 2 | |
else: # Blink ended, | |
# print("blink ended") | |
self.duration = random.uniform(self.duration * 3, 4) | |
class HideStates(TimedStates): | |
"""Main state machine, for handling the hiding/revealing of the creature entirely""" | |
def __init__(self, now): | |
super().__init__(now, random.uniform(0.25, 0.5), first_state=2, num_states=4) | |
def update(self, now): | |
super().update(now) | |
if self.state in (1, 3): # hiding or returning? | |
ratio = self.ratio(now) | |
if self.state == 3: # hiding | |
ratio = 1.0 - ratio | |
elif self.state == 2: | |
# visible | |
ratio = 1 | |
else: | |
# hidden | |
ratio = 0 | |
print(f"Hide state {self.state} ratio {ratio}") | |
return ratio | |
@property | |
def hidden(self): | |
return self.state == 0 | |
def reset(self, now, state, duration=None): | |
mqtt_client.publish(f"{root_topic}/hide_state", f"Reset hide state {state}, duration {duration}") | |
return super().reset(now, state, duration) | |
def handle_state_change(self, now, state): | |
if state == 1: # popping up | |
status_light.fill((0, 0, 255)) | |
self.duration = random.uniform(0.1, 0.8) | |
elif state == 2: # staying around | |
self.duration *= 10 | |
# let's randomize the stars a bit. | |
assert BG | |
BG.flip_x = random.choice((True, False)) | |
BG.flip_y = random.choice((True, False)) | |
elif state == 3: | |
self.duration = random.uniform(0.1, 1.2) | |
else: | |
status_light.fill((0, 255, 0)) | |
self.duration = random.uniform(3, 90) | |
print(f"Hide state {state}, duration {self.duration}") | |
mqtt_client.publish(f"{root_topic}/hide_state", f"Hide state {state}, duration {self.duration}") | |
class FakeHideState: | |
"""The always-visible alternative to HideStates, for when we don't have a background we can hide using.""" | |
def __init__(self) -> None: | |
self.hidden = False | |
def update(self, now): | |
return 1 | |
def reset(self, now, state, duration=None): | |
pass | |
def handle_state_change(self, now, state): | |
pass | |
now = time.monotonic() | |
move_state = MoveStates(now) | |
blink_state = BlinkStates(now) | |
if BG: | |
hide_state = HideStates(now) | |
else: | |
hide_state = FakeHideState() | |
# MAIN LOOP ---------------------------------------------------------------- | |
def clamp(v, minval, maxval): | |
return min(max(v, minval), maxval) | |
mqtt_client.will_set(f"{root_topic}/status", "DISCONNECTED", 0, True) | |
mqtt_client.connect() | |
mqtt_client.publish(f"{root_topic}/status", "CONNECTED", True) | |
def peekaboo(client, topic, msg): | |
print("peekaboo") | |
# Trigger a reveal that takes half a second | |
if hide_state.hidden: | |
now = time.monotonic() | |
hide_state.reset(now, 1, 0.5) | |
hide_state.handle_state_change(now, 1) | |
status_light.fill((0, 255, 255)) | |
TOPIC = f"{root_topic}/peekaboo" | |
mqtt_client.subscribe(TOPIC, 0) | |
mqtt_client.add_topic_callback(TOPIC, peekaboo) | |
# mqtt_interval = Interval(0.5) | |
while True: | |
try: | |
# if mqtt_interval.poll(): | |
now = time.monotonic() | |
# Update hide state and move/hide the creature as appropriate | |
visible_ratio = hide_state.update(now) | |
PARENT[-1].y = int(32 * (1.0 - visible_ratio)) | |
PARENT[-1].hidden = hide_state.hidden | |
# Poll MQTT if we are hidden | |
if hide_state.hidden: | |
print("before poll") | |
mqtt_client.loop(1) | |
print("after poll") | |
# Shortcut the loop if we aren't at least a little visible | |
update_eyes = BG is None or not hide_state.hidden | |
if not update_eyes: | |
continue | |
# Eye movement --------------------------------------------------------- | |
eye_pos = move_state.update(now) | |
# Blinking ------------------------------------------------------------- | |
blink_ratio = blink_state.update(now) | |
# Eyelid tracking ------------------------------------------------------ | |
# Initial estimate of 'tracked' eyelid positions | |
UPPER_LID_POS = ( | |
EYE_DATA["upper_lid_center"][0] + eye_pos[0], | |
EYE_DATA["upper_lid_center"][1] + eye_pos[1], | |
) | |
LOWER_LID_POS = ( | |
EYE_DATA["lower_lid_center"][0] + eye_pos[0], | |
EYE_DATA["lower_lid_center"][1] + eye_pos[1], | |
) | |
# Then constrain these to the upper/lower lid motion bounds | |
UPPER_LID_POS = ( | |
clamp(UPPER_LID_POS[0], UPPER_LID_MIN[0], UPPER_LID_MAX[0]), | |
clamp(UPPER_LID_POS[1], UPPER_LID_MIN[1], UPPER_LID_MAX[1]), | |
) | |
LOWER_LID_POS = ( | |
clamp(LOWER_LID_POS[0], LOWER_LID_MIN[0], LOWER_LID_MAX[0]), | |
clamp(LOWER_LID_POS[1], LOWER_LID_MIN[1], LOWER_LID_MAX[1]), | |
) | |
# Then interpolate between bounded tracked position to closed position | |
UPPER_LID_POS = ( | |
UPPER_LID_POS[0] | |
+ blink_ratio * (EYE_DATA["upper_lid_closed"][0] - UPPER_LID_POS[0]), | |
UPPER_LID_POS[1] | |
+ blink_ratio * (EYE_DATA["upper_lid_closed"][1] - UPPER_LID_POS[1]), | |
) | |
LOWER_LID_POS = ( | |
LOWER_LID_POS[0] | |
+ blink_ratio * (EYE_DATA["lower_lid_closed"][0] - LOWER_LID_POS[0]), | |
LOWER_LID_POS[1] | |
+ blink_ratio * (EYE_DATA["lower_lid_closed"][1] - LOWER_LID_POS[1]), | |
) | |
# Move eye sprites ----------------------------------------------------- | |
SPRITES[0].x, SPRITES[0].y = ( | |
int(EYE_CENTER[0] + eye_pos[0] + 0.5), | |
int(EYE_CENTER[1] + eye_pos[1] + 0.5), | |
) | |
SPRITES[2].x, SPRITES[2].y = ( | |
int(UPPER_LID_POS[0] + 0.5), | |
int(UPPER_LID_POS[1] + 0.5), | |
) | |
SPRITES[1].x, SPRITES[1].y = ( | |
int(LOWER_LID_POS[0] + 0.5), | |
int(LOWER_LID_POS[1] + 0.5), | |
) | |
except: | |
microcontroller.reset() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" Configuration data for the werewolf eyes """ | |
EYE_PATH = __file__[: __file__.rfind("/") + 1] | |
EYE_DATA = { | |
"eye_image": EYE_PATH + "werewolf-eyes.bmp", | |
"upper_lid_image": EYE_PATH + "werewolf-upper-lids.bmp", | |
"lower_lid_image": EYE_PATH + "werewolf-lower-lids.bmp", | |
"stencil_image": EYE_PATH + "werewolf-stencil.bmp", | |
"bg_image": EYE_PATH + "stars.bmp", # ADDED full panel size bitmap for background | |
"transparent": (0, 255, 0), # Transparent color in above images | |
"eye_move_min": (-3, -5), # eye_image (left, top) move limit | |
"eye_move_max": (7, 6), # eye_image (right, bottom) move limit | |
"upper_lid_open": (7, -4), # upper_lid_image pos when open | |
"upper_lid_center": (7, -1), # " when eye centered | |
"upper_lid_closed": (7, 8), # " when closed | |
"lower_lid_open": (7, 22), # lower_lid_image pos when open | |
"lower_lid_center": (7, 21), # " when eye centered | |
"lower_lid_closed": (7, 17), # " when closed | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
class Interval: | |
def __init__(self, interval: float): | |
self.interval = interval | |
self.next = time.monotonic() + interval | |
def poll(self): | |
now = time.monotonic() | |
if now > self.next: | |
self.next = now + self.interval | |
return True | |
return False |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import adafruit_esp32spi.adafruit_esp32spi_socket as socket | |
import adafruit_minimqtt.adafruit_minimqtt as MQTT | |
from adafruit_esp32spi import adafruit_esp32spi, adafruit_esp32spi_wifimanager | |
import board | |
import busio | |
import neopixel | |
from digitalio import DigitalInOut | |
# Get wifi details and more from a secrets.py file | |
try: | |
from secrets import secrets | |
except ImportError: | |
print("WiFi secrets are kept in secrets.py, please add them there!") | |
raise | |
root_topic = secrets['root_topic'] | |
# If you are using a board with pre-defined ESP32 Pins: | |
esp32_cs = DigitalInOut(board.ESP_CS) | |
esp32_ready = DigitalInOut(board.ESP_BUSY) | |
esp32_reset = DigitalInOut(board.ESP_RESET) | |
spi = busio.SPI(board.SCK, board.MOSI, board.MISO) | |
esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset) | |
if esp.status == adafruit_esp32spi.WL_IDLE_STATUS: | |
print("ESP32 found and in idle mode") | |
print("Firmware vers.", esp.firmware_version) | |
print("MAC addr:", [hex(i) for i in esp.MAC_address]) | |
for ap in esp.scan_networks(): | |
print("\t%s\t\tRSSI: %d" % (str(ap["ssid"], "utf-8"), ap["rssi"])) | |
print("Connecting to AP...") | |
status_light = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.2) | |
wifi = adafruit_esp32spi_wifimanager.ESPSPI_WiFiManager(esp, secrets, status_light) | |
print("Connecting to WiFi...") | |
wifi.connect() | |
print("Connected!") | |
# Initialize MQTT interface with the esp interface | |
MQTT.set_socket(socket, esp) | |
# Set up a MiniMQTT Client | |
mqtt_client = MQTT.MQTT( | |
broker=secrets["broker"], | |
# is_ssl=secrets["mqttssl"] | |
port=secrets["brokerPort"], | |
# log=True | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment