Last active
October 21, 2018 08:30
-
-
Save tjoen/9851436cedb68f6d6a08be14067c85f4 to your computer and use it in GitHub Desktop.
button.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import websocket | |
import time | |
import itertools | |
import threading | |
import time | |
import RPi.GPIO as GPIO | |
import json | |
import subprocess | |
def on_message(ws, message): | |
mes = json.loads(message) | |
print(mes) | |
if mes['type'] == 'recognizer_loop:record_begin': | |
my_led.set_state(LED.ON) | |
if mes['type'] == 'recognizer_loop:audio_output_start': | |
my_led.set_state(LED.BEACON) | |
if mes['type'] == 'recognizer_loop:record_end': | |
my_led.set_state(LED.OFF) | |
if mes['type'] == 'recognizer_loop:audio_output_end': | |
my_led.set_state(LED.OFF) | |
def on_error(ws, error): | |
print(error) | |
def on_close(ws): | |
print("### closed ###") | |
class LED: | |
"""Starts a background thread to show patterns with the LED. | |
Simple usage: | |
my_led = LED(channel = 25) | |
my_led.start() | |
my_led.set_state(LED.BEACON) | |
my_led.stop() | |
""" | |
OFF = 0 | |
ON = 1 | |
BLINK = 2 | |
BLINK_3 = 3 | |
BEACON = 4 | |
BEACON_DARK = 5 | |
DECAY = 6 | |
PULSE_SLOW = 7 | |
PULSE_QUICK = 8 | |
def __init__(self, channel): | |
self.animator = threading.Thread(target=self._animate) | |
self.channel = channel | |
self.iterator = None | |
self.running = False | |
self.state = None | |
self.sleep = 0 | |
GPIO.setmode(GPIO.BCM) | |
GPIO.setup(channel, GPIO.OUT) | |
self.pwm = GPIO.PWM(channel, 100) | |
self.lock = threading.Lock() | |
def __del__(self): | |
self.stop() | |
GPIO.cleanup(self.channel) | |
def start(self): | |
"""Start the LED driver.""" | |
with self.lock: # pylint: disable=E1129 | |
if not self.running: | |
self.running = True | |
self.pwm.start(0) # off by default | |
self.animator.start() | |
def stop(self): | |
"""Stop the LED driver and sets the LED to off.""" | |
with self.lock: # pylint: disable=E1129 | |
if self.running: | |
self.running = False | |
self.animator.join() | |
self.pwm.stop() | |
def set_state(self, state): | |
"""Set the LED driver's new state. | |
Note the LED driver must be started for this to have any effect. | |
""" | |
with self.lock: # pylint: disable=E1129 | |
self.state = state | |
def _animate(self): | |
while True: | |
state = None | |
running = False | |
with self.lock: # pylint: disable=E1129 | |
state = self.state | |
self.state = None | |
running = self.running | |
if not running: | |
return | |
if state is not None: | |
if not self._parse_state(state): | |
raise ValueError('unsupported state: %d' % state) | |
if self.iterator: | |
self.pwm.ChangeDutyCycle(next(self.iterator)) | |
time.sleep(self.sleep) | |
else: | |
# We can also wait for a state change here with a Condition. | |
time.sleep(1) | |
def _parse_state(self, state): | |
self.iterator = None | |
self.sleep = 0.0 | |
handled = False | |
if state == self.OFF: | |
self.pwm.ChangeDutyCycle(0) | |
handled = True | |
elif state == self.ON: | |
self.pwm.ChangeDutyCycle(100) | |
handled = True | |
elif state == self.BLINK: | |
self.iterator = itertools.cycle([0, 100]) | |
self.sleep = 0.5 | |
handled = True | |
elif state == self.BLINK_3: | |
self.iterator = itertools.cycle([0, 100] * 3 + [0, 0]) | |
self.sleep = 0.25 | |
handled = True | |
elif state == self.BEACON: | |
self.iterator = itertools.cycle( | |
itertools.chain([30] * 100, [100] * 8, range(100, 30, -5))) | |
self.sleep = 0.05 | |
handled = True | |
elif state == self.BEACON_DARK: | |
self.iterator = itertools.cycle( | |
itertools.chain([0] * 100, range(0, 30, 3), range(30, 0, -3))) | |
self.sleep = 0.05 | |
handled = True | |
elif state == self.DECAY: | |
self.iterator = itertools.cycle(range(100, 0, -2)) | |
self.sleep = 0.05 | |
handled = True | |
elif state == self.PULSE_SLOW: | |
self.iterator = itertools.cycle( | |
itertools.chain(range(0, 100, 2), range(100, 0, -2))) | |
self.sleep = 0.1 | |
handled = True | |
elif state == self.PULSE_QUICK: | |
self.iterator = itertools.cycle( | |
itertools.chain(range(0, 100, 5), range(100, 0, -5))) | |
self.sleep = 0.05 | |
handled = True | |
return handled | |
my_led = LED(channel = 25) | |
my_led.start() | |
my_led.set_state(LED.PULSE_QUICK) | |
process = subprocess.Popen(['python', "/home/pi/button.py"]) | |
if __name__ == "__main__": | |
websocket.enableTrace(True) | |
ws = websocket.WebSocketApp("ws://localhost:8181/core", | |
on_message = on_message, | |
on_error = on_error, | |
on_close = on_close) | |
ws.run_forever() | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python | |
# python mbus.py localhost "mycroft.stop" | |
import sys | |
from websocket import create_connection | |
uri = 'ws://' + sys.argv[1] + ':8181/core' | |
ws = create_connection(uri) | |
print "Sending " + sys.argv[2] + " to " + uri + "..." | |
if len(sys.argv) >= 4: | |
data = sys.argv[3] | |
else: | |
data = "{}" | |
message = '{"type": "' + sys.argv[2] + '", "data": "'+ data +'"}' | |
result = ws.send(message) | |
print "Receiving..." | |
result = ws.recv() | |
print "Received '%s'" % result | |
ws.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment