Code below is modified version of monitor-volume written by Andrew Dupont.
- rpi.gpio (python3-gpio)
- python3-queue
- python3-persist-queue ? (TODO)
Code below is modified version of monitor-volume written by Andrew Dupont.
[Unit] | |
Description = Volumio GPIO volume | |
Wants=volumio.service | |
After=volumio.service | |
[Service] | |
Type=simple | |
ExecStart=/usr/bin/python3 /data/INTERNAL/volumio-gpiovolume | |
StandardError=syslog | |
SyslogIdentifier=volumio-gpiovolume | |
User=volumio | |
Group=volumio | |
[Install] | |
WantedBy=multi-user.target |
#!/usr/bin/env python3 | |
""" | |
Original code here: https://gist.github.com/savetheclocktower/9b5f67c20f6c04e65ed88f2e594d43c1 | |
The daemon responsible for changing the volume in response to a turn or press | |
of the volume knob. | |
The volume knob is a rotary encoder. It turns infinitely in either direction. | |
Turning it to the right will increase the volume; turning it to the left will | |
decrease the volume. The knob can also be pressed like a button in order to | |
turn muting on or off. | |
The knob uses two GPIO pins and we need some extra logic to decode it. The | |
button we can just treat like an ordinary button. Rather than poll | |
constantly, we use threads and interrupts to listen on all three pins in one | |
script. | |
""" | |
import os | |
import signal | |
import subprocess | |
import sys | |
import threading | |
from RPi import GPIO | |
from queue import Queue | |
DEBUG = False | |
# SETTINGS | |
# ======== | |
# The two pins that the encoder uses (BCM numbering). | |
GPIO_A = 27 | |
GPIO_B = 17 | |
ACTION_PLUS = "volume plus" | |
ACTION_MINUS = "volume minus" | |
# The pin that the knob's button is hooked up to. If you have no button, set | |
# this to None. | |
GPIO_BUTTON = 22 | |
ACTION_TOGGLE = "toggle" | |
# (END SETTINGS) | |
# | |
# When the knob is turned, the callback happens in a separate thread. If | |
# those turn callbacks fire erratically or out of order, we'll get confused | |
# about which direction the knob is being turned, so we'll use a queue to | |
# enforce FIFO. The callback will push onto a queue, and all the actual | |
# volume-changing will happen in the main thread. | |
QUEUE = Queue() | |
# When we put something in the queue, we'll use an event to signal to the | |
# main thread that there's something in there. Then the main thread will | |
# process the queue and reset the event. If the knob is turned very quickly, | |
# this event loop will fall behind, but that's OK because it consumes the | |
# queue completely each time through the loop, so it's guaranteed to catch up. | |
EVENT = threading.Event() | |
def debug(str): | |
if not DEBUG: | |
return | |
print(str) | |
class RotaryEncoder: | |
""" | |
A class to decode mechanical rotary encoder pulses. | |
Ported to RPi.GPIO from the pigpio sample here: | |
http://abyz.co.uk/rpi/pigpio/examples.html | |
""" | |
def __init__(self, gpioA, gpioB, callback=None, buttonPin=None, buttonCallback=None): | |
""" | |
Instantiate the class. Takes three arguments: the two pin numbers to | |
which the rotary encoder is connected, plus a callback to run when the | |
switch is turned. | |
The callback receives one argument: a `delta` that will be either 1 or -1. | |
One of them means that the dial is being turned to the right; the other | |
means that the dial is being turned to the left. I'll be damned if I know | |
yet which one is which. | |
""" | |
self.lastGpio = None | |
self.gpioA = gpioA | |
self.gpioB = gpioB | |
self.callback = callback | |
self.gpioButton = buttonPin | |
self.buttonCallback = buttonCallback | |
self.levA = 0 | |
self.levB = 0 | |
GPIO.setmode(GPIO.BCM) | |
GPIO.setup(self.gpioA, GPIO.IN, pull_up_down=GPIO.PUD_UP) | |
GPIO.setup(self.gpioB, GPIO.IN, pull_up_down=GPIO.PUD_UP) | |
GPIO.add_event_detect(self.gpioA, GPIO.BOTH, self._callback) | |
GPIO.add_event_detect(self.gpioB, GPIO.BOTH, self._callback) | |
if self.gpioButton: | |
GPIO.setup(self.gpioButton, GPIO.IN, pull_up_down=GPIO.PUD_UP) | |
GPIO.add_event_detect(self.gpioButton, GPIO.FALLING, self._buttonCallback, bouncetime=500) | |
def destroy(self): | |
GPIO.remove_event_detect(self.gpioA) | |
GPIO.remove_event_detect(self.gpioB) | |
GPIO.cleanup() | |
def _buttonCallback(self, channel): | |
self.buttonCallback(GPIO.input(channel)) | |
def _callback(self, channel): | |
level = GPIO.input(channel) | |
if channel == self.gpioA: | |
self.levA = level | |
else: | |
self.levB = level | |
# Debounce. | |
if channel == self.lastGpio: | |
return | |
# When both inputs are at 1, we'll fire a callback. If A was the most | |
# recent pin set high, it'll be forward, and if B was the most recent pin | |
# set high, it'll be reverse. | |
self.lastGpio = channel | |
if channel == self.gpioA and level == 1: | |
if self.levB == 1: | |
self.callback(1) | |
elif channel == self.gpioB and level == 1: | |
if self.levA == 1: | |
self.callback(-1) | |
class VolumeError(Exception): | |
pass | |
class Volume: | |
def __init__(self): | |
self.volume = self.volumio("volume") | |
def plus(self): | |
self.volumio(ACTION_PLUS) | |
return self.volumio("volume") | |
def minus(self): | |
self.volumio(ACTION_MINUS) | |
return self.volumio("volume") | |
def toggle(self): | |
self.volumio(ACTION_TOGGLE) | |
def volumio(self, cmd): | |
p = subprocess.Popen("volumio {}".format(cmd), shell=True, stdout=subprocess.PIPE) | |
code = p.wait() | |
if code != 0: | |
raise VolumeError("Unknown error") | |
sys.exit(0) | |
return p.stdout | |
if __name__ == "__main__": | |
gpioA = GPIO_A | |
gpioB = GPIO_B | |
gpioButton = GPIO_BUTTON | |
v = Volume() | |
def on_press(value): | |
v.toggle() | |
EVENT.set() | |
# This callback runs in the background thread. All it does is put turn | |
# events into a queue and flag the main thread to process them. The | |
# queueing ensures that we won't miss anything if the knob is turned | |
# extremely quickly. | |
def on_turn(delta): | |
QUEUE.put(delta) | |
EVENT.set() | |
def consume_queue(): | |
while not QUEUE.empty(): | |
delta = QUEUE.get() | |
handle_delta(delta) | |
def handle_delta(delta): | |
if delta == 1: | |
vol = v.plus() | |
else: | |
vol = v.minus() | |
print("Set volume to: {}".format(vol)) | |
def on_exit(a, b): | |
print("Exiting...") | |
encoder.destroy() | |
sys.exit(0) | |
debug("Volume knob using pins {} and {}".format(gpioA, gpioB)) | |
if gpioButton != None: | |
debug("Volume button using pin {}".format(gpioButton)) | |
debug("Initial volume: {}".format(v.volume)) | |
encoder = RotaryEncoder(GPIO_A, GPIO_B, callback=on_turn, buttonPin=GPIO_BUTTON, buttonCallback=on_press) | |
signal.signal(signal.SIGINT, on_exit) | |
while True: | |
# This is the best way I could come up with to ensure that this script | |
# runs indefinitely without wasting CPU by polling. The main thread will | |
# block quietly while waiting for the event to get flagged. When the knob | |
# is turned we're able to respond immediately, but when it's not being | |
# turned we're not looping at all. | |
# | |
# The 1200-second (20 minute) timeout is a hack; for some reason, if I | |
# don't specify a timeout, I'm unable to get the SIGINT handler above to | |
# work properly. But if there is a timeout set, even if it's a very long | |
# timeout, then Ctrl-C works as intended. No idea why. | |
EVENT.wait(1200) | |
consume_queue() | |
EVENT.clear() |