Last active
July 3, 2022 10:40
-
-
Save sandyjmacdonald/d0cce358c18b257cda4de6b547681e38 to your computer and use it in GitHub Desktop.
An example of simple API to set the colour of the Pimoroni Inventor 2040's LEDs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gc | |
from machine import Pin, PWM | |
from pimoroni_i2c import PimoroniI2C | |
from plasma import WS2812 | |
from motor import Motor | |
from servo import Servo | |
from encoder import Encoder, MMME_CPR | |
# IO Pin Constants | |
GP0 = 0 | |
GP1 = 1 | |
GP2 = 2 | |
A0 = 26 | |
A1 = 27 | |
A2 = 28 | |
GPIOS = (GP0, GP1, GP2, A0, A1, A2) | |
ADCS = (A0, A1, A2) | |
# Index Constants | |
MOTOR_A = 0 | |
MOTOR_B = 1 | |
SERVO_1 = 0 | |
SERVO_2 = 1 | |
SERVO_3 = 2 | |
SERVO_4 = 3 | |
SERVO_5 = 4 | |
SERVO_6 = 5 | |
LED_GP0 = 0 | |
LED_GP1 = 1 | |
LED_GP2 = 2 | |
LED_A0 = 3 | |
LED_A1 = 4 | |
LED_A2 = 5 | |
LED_SERVO_1 = 6 | |
LED_SERVO_2 = 7 | |
LED_SERVO_3 = 8 | |
LED_SERVO_4 = 9 | |
LED_SERVO_5 = 10 | |
LED_SERVO_6 = 11 | |
# Count Constants | |
NUM_GPIOS = 6 | |
NUM_ADCS = 3 | |
NUM_MOTORS = 2 | |
NUM_SERVOS = 6 | |
NUM_LEDS = 12 | |
class Inventor2040W(): | |
AMP_EN_PIN = 3 | |
I2C_SDA_PIN = 4 | |
I2C_SCL_PIN = 5 | |
MOTOR_A_PINS = (6, 7) | |
MOTOR_B_PINS = (8, 9) | |
ENCODER_A_PINS = (19, 18) | |
ENCODER_B_PINS = (17, 16) | |
SERVO_1_PIN = 10 | |
SERVO_2_PIN = 11 | |
SERVO_3_PIN = 12 | |
SERVO_4_PIN = 13 | |
SERVO_5_PIN = 14 | |
SERVO_6_PIN = 15 | |
LED_DATA_PIN = 20 | |
PWM_AUDIO_PIN = 21 | |
USER_SW_PIN = 22 | |
AMP_CORRECTION = 4 | |
DEFAULT_VOLUME = 0.2 | |
def __init__(self, motor_gear_ratio=50, init_motors=True, init_servos=True): | |
# Free up hardware resources | |
gc.collect() | |
# Set up the motors and encoders, if the user wants them | |
self.motors = None | |
if init_motors: | |
cpr = MMME_CPR * motor_gear_ratio | |
self.motors = [Motor(self.MOTOR_A_PINS), Motor(self.MOTOR_B_PINS)] | |
# Set the encoders to use PIO 0 and State Machines 0 and 1 | |
self.encoders = [Encoder(0, 0, self.ENCODER_A_PINS, counts_per_rev=cpr, count_microsteps=True), | |
Encoder(0, 1, self.ENCODER_B_PINS, counts_per_rev=cpr, count_microsteps=True)] | |
# Set up the servos, if the user wants them | |
self.servos = None | |
if init_servos: | |
self.servos = [Servo(i) for i in range(self.SERVO_1_PIN, self.SERVO_6_PIN + 1)] | |
# Set up the i2c for Qw/st and Breakout Garden | |
self.i2c = PimoroniI2C(self.I2C_SDA_PIN, self.I2C_SCL_PIN, 100000) | |
# Set up the amp enable | |
self.__amp_en = Pin(self.AMP_EN_PIN, Pin.OUT) | |
self.__amp_en.off() | |
self.audio_pwm = PWM(Pin(self.PWM_AUDIO_PIN)) | |
self.__volume = self.DEFAULT_VOLUME | |
# Set up the user switch | |
self.__switch = Pin(self.USER_SW_PIN, Pin.IN, Pin.PULL_DOWN) | |
# Set up the WS2812 LEDs, using PIO 0 and State Machine 2 | |
self.leds = WS2812(NUM_LEDS, 0, 2, self.LED_DATA_PIN) | |
self.leds.start() | |
def switch_pressed(self): | |
return self.__switch.value() | |
def play_tone(self, frequency): | |
try: | |
self.audio_pwm.freq(frequency) | |
except ValueError: | |
self.play_silence() | |
raise ValueError("frequency of range. Expected greater than 0") | |
corrected_volume = (self.__volume ** 4) # Correct for RC Filter curve | |
self.audio_pwm.duty_u16(int(32768 * corrected_volume)) | |
self.unmute_audio() | |
def play_silence(self): | |
self.audio_pwm.freq(44100) | |
corrected_volume = (self.__volume ** 4) # Correct for RC Filter curve | |
self.audio_pwm.duty_u16(int(32768 * corrected_volume)) | |
self.unmute_audio() | |
def stop_playing(self): | |
self.audio_pwm.duty_u16(0) | |
self.mute_audio() | |
def volume(self, volume=None): | |
if volume is None: | |
return self.__volume | |
if volume < 0.01 or volume > 1.0: | |
raise ValueError("volume out of range. Expected 0.0 to 1.0") | |
self.__volume = volume | |
def mute_audio(self): | |
self.__amp_en.off() | |
def unmute_audio(self): | |
self.__amp_en.on() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
CRITICAL = 50 | |
ERROR = 40 | |
WARNING = 30 | |
INFO = 20 | |
DEBUG = 10 | |
NOTSET = 0 | |
_level_dict = { | |
CRITICAL: "CRIT", | |
ERROR: "ERROR", | |
WARNING: "WARN", | |
INFO: "INFO", | |
DEBUG: "DEBUG", | |
} | |
_stream = sys.stderr | |
class LogRecord: | |
def __init__(self): | |
self.__dict__ = {} | |
def __getattr__(self, key): | |
return self.__dict__[key] | |
class Handler: | |
def __init__(self): | |
pass | |
def setFormatter(self, fmtr): | |
pass | |
class Logger: | |
level = NOTSET | |
handlers = [] | |
record = LogRecord() | |
def __init__(self, name): | |
self.name = name | |
def _level_str(self, level): | |
l = _level_dict.get(level) | |
if l is not None: | |
return l | |
return "LVL%s" % level | |
def setLevel(self, level): | |
self.level = level | |
def isEnabledFor(self, level): | |
return level >= (self.level or _level) | |
def log(self, level, msg, *args): | |
if self.isEnabledFor(level): | |
levelname = self._level_str(level) | |
if args: | |
msg = msg % args | |
if self.handlers: | |
d = self.record.__dict__ | |
d["levelname"] = levelname | |
d["levelno"] = level | |
d["message"] = msg | |
d["name"] = self.name | |
for h in self.handlers: | |
h.emit(self.record) | |
else: | |
print(levelname, ":", self.name, ":", msg, sep="", file=_stream) | |
def debug(self, msg, *args): | |
self.log(DEBUG, msg, *args) | |
def info(self, msg, *args): | |
self.log(INFO, msg, *args) | |
def warning(self, msg, *args): | |
self.log(WARNING, msg, *args) | |
def error(self, msg, *args): | |
self.log(ERROR, msg, *args) | |
def critical(self, msg, *args): | |
self.log(CRITICAL, msg, *args) | |
def exc(self, e, msg, *args): | |
self.log(ERROR, msg, *args) | |
sys.print_exception(e, _stream) | |
def exception(self, msg, *args): | |
self.exc(sys.exc_info()[1], msg, *args) | |
def addHandler(self, hndlr): | |
self.handlers.append(hndlr) | |
_level = INFO | |
_loggers = {} | |
def getLogger(name="root"): | |
if name in _loggers: | |
return _loggers[name] | |
l = Logger(name) | |
_loggers[name] = l | |
return l | |
def info(msg, *args): | |
getLogger().info(msg, *args) | |
def debug(msg, *args): | |
getLogger().debug(msg, *args) | |
def basicConfig(level=INFO, filename=None, stream=None, format=None): | |
global _level, _stream | |
_level = level | |
if stream: | |
_stream = stream | |
if filename is not None: | |
print("logging.basicConfig: filename arg is not supported") | |
if format is not None: | |
print("logging.basicConfig: format arg is not supported") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
from inventor import Inventor2040W, NUM_LEDS | |
import WIFI_CONFIG | |
from network_manager import NetworkManager | |
from urllib import urequest | |
import uasyncio | |
import tinyweb | |
## Requires the tinyweb library: https://github.com/belyalov/tinyweb | |
## Remember to set your SSID and PSK in the WIFI_CONFIG file! | |
board = Inventor2040W() | |
WIFI_COUNTRY = "GB" | |
network_manager = NetworkManager(WIFI_COUNTRY) | |
hue = 0.0 | |
sat = 0.0 | |
val = 0.0 | |
state = 0 | |
class LEDs(): | |
def get(self, data): | |
global hue, sat, val, state | |
return {"message": {"hue": hue, "sat": sat, "val": val, "state": state}}, 201 | |
def post(self, data): | |
global hue, sat, val, state | |
if all(x in ("h", "s", "v", "state") for x in data.keys()): | |
hue = float(data["h"]) | |
sat = float(data["s"]) | |
val = float(data["v"]) | |
state = int(data["state"]) | |
set_leds(hue, sat, val, state) | |
return {'message': 'LEDs updated'}, 201 | |
else: | |
return not_found() | |
def not_found(): | |
return {'message': 'no values provided'}, 404 | |
def set_leds(hue, sat, val, state): | |
if state: | |
for i in range(NUM_LEDS): | |
board.leds.set_hsv(i, hue, sat, val) | |
else: | |
board.leds.clear() | |
def update(): | |
uasyncio.get_event_loop().run_until_complete(network_manager.client(WIFI_CONFIG.SSID, WIFI_CONFIG.PSK)) | |
def run(): | |
app = tinyweb.webserver() | |
app.add_resource(LEDs, '/leds') | |
app.run(host='0.0.0.0', port=8081) | |
board.leds.clear() | |
set_leds(hue, sat, val, state) | |
update() | |
run() | |
while True: | |
continue |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import rp2 | |
import network | |
import machine | |
import uasyncio | |
class NetworkManager: | |
_ifname = ("Client", "Access Point") | |
def __init__(self, country="GB", client_timeout=30, access_point_timeout=5, status_handler=None): | |
rp2.country(country) | |
self._ap_if = network.WLAN(network.AP_IF) | |
self._sta_if = network.WLAN(network.STA_IF) | |
self._mode = network.STA_IF | |
self._client_timeout = client_timeout | |
self._access_point_timeout = access_point_timeout | |
self._status_handler = status_handler | |
self.UID = ("{:02X}" * 8).format(*machine.unique_id()) | |
def isconnected(self): | |
return self._sta_if.isconnected() or self._ap_if.isconnected() | |
def ifaddress(self): | |
if self._sta_if.isconnected(): | |
return self._sta_if.ifconfig()[0] | |
if self._ap_if.isconnected(): | |
return self._ap_if.ifconfig()[0] | |
return '0.0.0.0' | |
def disconnect(self): | |
if self._sta_if.isconnected(): | |
self._sta_if.disconnect() | |
if self._ap_if.isconnected(): | |
self._ap_if.disconnect() | |
async def wait(self, mode): | |
while not self.isconnected(): | |
self._handle_status(mode, None) | |
await uasyncio.sleep_ms(1000) | |
def _handle_status(self, mode, status): | |
if self._status_handler is not None: | |
self._status_handler(self._ifname[mode], status, self.ifaddress()) | |
async def client(self, ssid, psk): | |
if self._sta_if.isconnected(): | |
return | |
self._ap_if.disconnect() | |
self._ap_if.active(False) | |
self._ap_if.deinit() | |
self._sta_if = network.WLAN(network.STA_IF) | |
self._sta_if.active(True) | |
self._sta_if.connect(ssid, psk) | |
try: | |
await uasyncio.wait_for(self.wait(network.STA_IF), self._client_timeout) | |
self._handle_status(network.STA_IF, True) | |
except uasyncio.TimeoutError: | |
self._sta_if.active(False) | |
self._handle_status(network.STA_IF, False) | |
raise RuntimeError("WIFI Client Failed") | |
async def access_point(self): | |
if self._ap_if.isconnected(): | |
return | |
self._sta_if.disconnect() | |
self._sta_if.active(False) | |
self._sta_if.deinit() | |
self._ap_if = network.WLAN(network.AP_IF) | |
self._ap_if.active(True) | |
try: | |
await uasyncio.wait_for(self.wait(network.AP_IF), self._access_point_timeout) | |
self._handle_status(network.AP_IF, True) | |
except uasyncio.TimeoutError: | |
self._sta_if.active(False) | |
self._handle_status(network.AP_IF, False) | |
raise RuntimeError("WIFI AP Failed") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
SSID = "" | |
PSK = "" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment