|
#!/usr/bin/env python |
|
from sys import exit |
|
import json |
|
import time |
|
import re |
|
try: |
|
import paho.mqtt.client as mqtt |
|
except ImportError: |
|
exit('This example requires the paho-mqtt module\nInstall with: sudo pip install paho-mqtt') |
|
|
|
import blinkt |
|
|
|
MQTT_SERVER = "homeassistant.local" |
|
MQTT_PORT = 1883 |
|
MQTT_BASE = "homeassistant/light/" |
|
MQTT_TOPIC = MQTT_BASE + "+/set" |
|
|
|
# Set these to use authorisation |
|
MQTT_USER = "blink" |
|
MQTT_PASS = "" |
|
|
|
pixel_count = 8 |
|
pixels = {k: {"r": 255, "g": 255, "b": 255, "brightness": 255, 'state': 'OFF'} |
|
for k in range(pixel_count)} |
|
|
|
|
|
def getId(pixel): |
|
# Return the MAC address of the specified interface |
|
try: |
|
macstr = open('/sys/class/net/%s/address' % 'wlan0').read() |
|
except: |
|
macstr = "00:00:00:00:00:00" |
|
return macstr[0:17].replace(':', '') + "-pixel-" + str(pixel) |
|
|
|
|
|
def send_discovery(client): |
|
for pixel in range(pixel_count): |
|
name = "Blinkt Pixel " + str(pixel) |
|
identifier = getId(pixel) |
|
path = MQTT_BASE + identifier |
|
config_msg = { |
|
"~": path, |
|
"name": name, |
|
"device": { |
|
"identifiers": [identifier], |
|
"manufacturer": "pi-pimroni", |
|
"model": "blinkt", |
|
"name": name, |
|
"sw_version": "0.1" |
|
}, |
|
"schema": "json", |
|
"cmd_t": "~/set", |
|
"stat_t": "~/state", |
|
"brightness": True, |
|
"unique_id": identifier, |
|
"rgb": True, |
|
} |
|
client.publish(path + "/config", json.dumps(config_msg)) |
|
|
|
|
|
def set_pixels(client, pixels_data): |
|
for pixel, data in pixels_data.items(): |
|
brightness = 0.0 |
|
if (data['state'] == 'ON'): |
|
brightness = data['brightness'] / 255.0 |
|
|
|
blinkt.set_pixel(pixel, data['r'], data['g'], data['b'], brightness) |
|
client.publish(MQTT_BASE + getId(pixel) + "/state", json.dumps(data)) |
|
blinkt.show() |
|
|
|
|
|
def on_connect(client, userdata, flags, rc): |
|
print('Connected with result code ' + str(rc)) |
|
send_discovery(client) |
|
print("sent discovery") |
|
set_pixels(client, {k: {"r": 72, "g": 255, "b": 209, |
|
"brightness": 255, 'state': 'ON'} for k in range(pixel_count)}) |
|
time.sleep(0.5) |
|
print("Flashed pixels") |
|
set_pixels(client, pixels) |
|
print("Reset pixels") |
|
client.subscribe(MQTT_TOPIC) |
|
print("Subscribed") |
|
|
|
|
|
def on_message(client, userdata, msg): |
|
|
|
json_string = msg.payload |
|
if type(json_string) is bytes: |
|
json_string = json_string.decode('utf-8') |
|
data = json.loads(json_string) |
|
print(data) |
|
try: |
|
pixel = int(re.findall( |
|
r"-pixel-(\d)", msg.topic)[0]) |
|
if pixel > pixel_count - 1: |
|
print('Pixel out of range: ' + str(pixel)) |
|
return |
|
except: |
|
print('Could not determine pixel from topic ' + msg.topic) |
|
return |
|
|
|
pixels[pixel]['state'] = data['state'] |
|
|
|
try: |
|
pixels[pixel]['r'] = data['color']['r'] |
|
pixels[pixel]['g'] = data['color']['g'] |
|
pixels[pixel]['b'] = data['color']['b'] |
|
except: |
|
print("No RGB data leaving at last values") |
|
|
|
try: |
|
pixels[pixel]['brightness'] = data['brightness'] |
|
except: |
|
print("No brightness data leaving at last values") |
|
|
|
set_pixels(client, {pixel: pixels[pixel]}) |
|
|
|
|
|
blinkt.set_clear_on_exit() |
|
|
|
client = mqtt.Client() |
|
client.on_connect = on_connect |
|
client.on_message = on_message |
|
|
|
if MQTT_USER is not None and MQTT_PASS is not None: |
|
print('Using username: {un} and password: {pw}'.format( |
|
un=MQTT_USER, pw='*' * len(MQTT_PASS))) |
|
client.username_pw_set(username=MQTT_USER, password=MQTT_PASS) |
|
|
|
client.connect(MQTT_SERVER, MQTT_PORT, 60) |
|
|
|
client.loop_forever() |