Created
January 26, 2024 11:29
-
-
Save eddo888/87f1b18fb02a280ddd8eb6481cb18891 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# micropython | |
# Complete project details at https://RandomNerdTutorials.com | |
import sys, math, ubinascii, ssd1306, max7219, machine, ntptime, time | |
from machine import Pin, SoftI2C, SPI | |
from umqttsimple import MQTTClient | |
from rotary_irq_esp import RotaryIRQ | |
from stepper import Stepper | |
stepper_motor = Stepper(14,15,22,23) | |
spi = SPI(1, baudrate=10000000, polarity=1, phase=0, sck=Pin(4), mosi=Pin(2)) | |
ss = Pin(5, Pin.OUT) | |
led_display = max7219.Matrix8x8(spi, ss, 1) | |
led_display.fill(0) | |
led_display.show() | |
def displays(texts): | |
if isinstance(texts,str): | |
texts = [ texts ] | |
if not isinstance(texts, list): | |
return | |
spacing = int(32/len(texts)) | |
oled_display.fill(0) | |
for i, t in enumerate(texts): | |
oled_display.text(t, 0, spacing*i, 1) | |
oled_display.show() | |
def led8x8(lines): | |
for line in lines: | |
if not len(line.strip()): | |
continue | |
try: | |
(command,params) = tuple(line.split(':')) | |
if len(params): | |
args = tuple(list(map(lambda x: eval(x), params.split(',')))) | |
else: | |
args = () | |
method = getattr(led_display, command) | |
method(*args) | |
except Exception as e: | |
sys.print_exception(e) | |
def sub_cb(topic, msg): | |
if msg == b'restart': | |
restart_and_reconnect() | |
return | |
if msg.startswith('rotate:'): | |
s = msg.decode('UTF8') | |
number = int(s.split(':')[1]) | |
print('rotate->%d'%number) | |
stepper_motor.run(number) | |
return | |
if msg.startswith('relay:'): | |
s = msg.decode('UTF8') | |
number = int(s.split(':')[1]) | |
print('relay->%d'%number) | |
relay_output.value(number) | |
return | |
if msg.startswith('8x8:'): | |
s = msg.decode('UTF8') | |
lines = s.split('\n')[1:] | |
print('8x8->\n%s'%('\n'.join(lines))) | |
led8x8(lines) | |
return | |
displays([topic, msg]) | |
print(f'{topic}:{msg}') | |
mqtt_server = '%HOSTNAME%' | |
mqtt_port = 1883 | |
mqtt_username = '%USERNAME%' | |
mqtt_password = '%PASSWORD%' | |
i2c=SoftI2C(sda=Pin(19), scl=Pin(21)) | |
oled_display=ssd1306.SSD1306_I2C(128,32,i2c) | |
displays(['ntptime', str(ntptime.host)]) | |
ntptime.settime() | |
client_id = ubinascii.hexlify(machine.unique_id()) | |
topic_sub = b'requests' | |
topic_pub = b'timestamp' | |
last_message = 0 | |
message_interval = 0.9 | |
counter = 0 | |
def connect_and_subscribe(): | |
global client_id, mqtt_server, topic_sub | |
client = MQTTClient( | |
client_id, | |
mqtt_server, | |
port=mqtt_port, | |
user=mqtt_username, | |
password=mqtt_password, | |
keepalive=30, | |
) | |
client.set_callback(sub_cb) | |
client.connect() | |
client.subscribe(topic_sub) | |
displays(['mqtt server', mqtt_server]) | |
return client | |
def restart_and_reconnect(): | |
displays('connecting') | |
time.sleep(5) | |
machine.reset() | |
try: | |
client = connect_and_subscribe() | |
except OSError as e: | |
restart_and_reconnect() | |
# magnetic sensor | |
near_field_sensor = Pin(34, Pin.IN) | |
near_field_sensor_old = near_field_sensor.value() | |
m3 = '' | |
# encoder button | |
encoder_button = Pin(26, Pin.IN) | |
encoder_button_old = encoder_button.value() | |
m0 = '' | |
# led output | |
led_output = Pin(27, Pin.OUT) | |
led_output_old = 0 | |
led_output.value(led_output_old) | |
# relay output | |
relay_output = Pin(25, Pin.OUT) | |
# toggle switch | |
switch = Pin(35, Pin.IN) | |
switch_old = switch.value() | |
# encoder inner | |
encoder_inner = None | |
m1 = '' | |
encoder_inner = RotaryIRQ( | |
pin_num_clk=12, | |
pin_num_dt=13, | |
min_val=108, | |
max_val=117, | |
reverse=False, | |
range_mode=RotaryIRQ.RANGE_BOUNDED | |
) | |
encoder_inner_old = encoder_inner.value() | |
#encoder outer | |
encoder_outer = None | |
m2 = '' | |
encoder_outer = RotaryIRQ( | |
pin_num_clk=32, | |
pin_num_dt=33, | |
min_val=0, | |
max_val=9, | |
reverse=False, | |
range_mode=RotaryIRQ.RANGE_BOUNDED | |
) | |
encoder_outer_old = encoder_outer.value() | |
def nav(): | |
msg = 'NAV1s:%03d%02d'%(encoder_inner_old, encoder_outer_old*10) | |
print(msg) | |
client.publish(b'esp32', msg) | |
while True: | |
time.sleep_ms(50) | |
if switch: | |
value = switch.value() | |
if value != switch_old: | |
switch_old = value | |
m3 = 'switch: %d'%switch_old | |
print(m3) | |
displays([m0,m1,m2,m3]) | |
client.publish('switch',str(switch_old)) | |
if near_field_sensor: | |
value = near_field_sensor.value() | |
if value != near_field_sensor_old: | |
near_field_sensor_old = value | |
m3 = f'sensor: %d'%value | |
print(m3) | |
displays([m0,m1,m2,m3]) | |
client.publish('sensor',str(near_field_sensor_old)) | |
if encoder_button and led_output: | |
value = encoder_button.value() | |
#led_output.value(value) | |
if value != encoder_button_old: | |
encoder_button_old = value | |
print('button: %d'%value) | |
if value == 0: | |
led_output_old = 1^led_output_old | |
m0 = 'led: %d'%led_output_old | |
displays([m0,m1,m2,m3]) | |
led_output.value(led_output_old) | |
client.publish('cmnd/tasmota_C9D18B/Power',str(led_output_old)) | |
if encoder_inner: | |
value = encoder_inner.value() | |
m1 = 'inner: %d'%value | |
if value != encoder_inner_old: | |
encoder_inner_old = value | |
print(m1) | |
displays([m0,m1,m2,m3]) | |
nav() | |
if encoder_outer: | |
value = encoder_outer.value() | |
m2 = 'outer: %d'%value | |
if value != encoder_outer_old: | |
encoder_outer_old = value | |
print(m2) | |
displays([m0,m1,m2,m3]) | |
nav() | |
if (time.time() - last_message) > message_interval: | |
gmt_plus = 10+1 | |
now = time.localtime(time.time()+3600*gmt_plus) | |
date_str = '%4d-%02d-%02d'%now[0:3] | |
time_str = '%02d:%02d:%02d'%now[3:6] | |
msg = '%s %s'%(date_str,time_str) | |
print(msg) | |
last_message = time.time() | |
counter += 1 | |
displays(['timestamp', date_str, time_str]) | |
try: | |
client.check_msg() | |
client.publish(topic_pub, msg) | |
except OSError as e: | |
restart_and_reconnect() | |
if switch_old == 1: | |
continue | |
vector = int(time_str[-1]) | |
radians = 2*math.pi*vector/10 | |
xo = int(4 + 4 * math.cos(radians)) | |
yo = int(4 + 4 * math.sin(radians)) | |
xi = int(4 + 1 * math.cos(radians)) | |
yi = int(4 + 1 * math.sin(radians)) | |
#print(f'{vector}: {xi},{yi} -> {xo}.{yo}') | |
led_display.fill(0) | |
led_display.line(xi,yi,xo,yo,1) | |
led_display.show() | |
stepper_motor.run(int(4200/10)) | |
relay_output.value(near_field_sensor.value()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment