Skip to content

Instantly share code, notes, and snippets.

@dmnc-net
Forked from savetheclocktower/README.md
Last active April 1, 2020 14:35
Show Gist options
  • Save dmnc-net/c121a95ad90041fdbc9fb9bec5bb9709 to your computer and use it in GitHub Desktop.
Save dmnc-net/c121a95ad90041fdbc9fb9bec5bb9709 to your computer and use it in GitHub Desktop.
Using a rotary encoder as a volume control for the Raspberry Pi

Volumio rotary encoder control

Code below is modified version of monitor-volume written by Andrew Dupont.

Requirements

  • rpi.gpio (python3-gpio)
  • python3-queue
  • python3-persist-queue ? (TODO)
[Unit]
Description = Volumio GPIO volume
Wants=volumio.service
After=volumio.service
[Service]
Type=simple
ExecStart=/usr/bin/python3 /data/INTERNAL/volumio-gpiovolume
StandardError=syslog
SyslogIdentifier=volumio-gpiovolume
User=volumio
Group=volumio
[Install]
WantedBy=multi-user.target
#!/usr/bin/env python3
"""
Original code here: https://gist.github.com/savetheclocktower/9b5f67c20f6c04e65ed88f2e594d43c1
The daemon responsible for changing the volume in response to a turn or press
of the volume knob.
The volume knob is a rotary encoder. It turns infinitely in either direction.
Turning it to the right will increase the volume; turning it to the left will
decrease the volume. The knob can also be pressed like a button in order to
turn muting on or off.
The knob uses two GPIO pins and we need some extra logic to decode it. The
button we can just treat like an ordinary button. Rather than poll
constantly, we use threads and interrupts to listen on all three pins in one
script.
"""
import os
import signal
import subprocess
import sys
import threading
from RPi import GPIO
from queue import Queue
DEBUG = False
# SETTINGS
# ========
# The two pins that the encoder uses (BCM numbering).
GPIO_A = 27
GPIO_B = 17
ACTION_PLUS = "volume plus"
ACTION_MINUS = "volume minus"
# The pin that the knob's button is hooked up to. If you have no button, set
# this to None.
GPIO_BUTTON = 22
ACTION_TOGGLE = "toggle"
# (END SETTINGS)
#
# When the knob is turned, the callback happens in a separate thread. If
# those turn callbacks fire erratically or out of order, we'll get confused
# about which direction the knob is being turned, so we'll use a queue to
# enforce FIFO. The callback will push onto a queue, and all the actual
# volume-changing will happen in the main thread.
QUEUE = Queue()
# When we put something in the queue, we'll use an event to signal to the
# main thread that there's something in there. Then the main thread will
# process the queue and reset the event. If the knob is turned very quickly,
# this event loop will fall behind, but that's OK because it consumes the
# queue completely each time through the loop, so it's guaranteed to catch up.
EVENT = threading.Event()
def debug(str):
if not DEBUG:
return
print(str)
class RotaryEncoder:
"""
A class to decode mechanical rotary encoder pulses.
Ported to RPi.GPIO from the pigpio sample here:
http://abyz.co.uk/rpi/pigpio/examples.html
"""
def __init__(self, gpioA, gpioB, callback=None, buttonPin=None, buttonCallback=None):
"""
Instantiate the class. Takes three arguments: the two pin numbers to
which the rotary encoder is connected, plus a callback to run when the
switch is turned.
The callback receives one argument: a `delta` that will be either 1 or -1.
One of them means that the dial is being turned to the right; the other
means that the dial is being turned to the left. I'll be damned if I know
yet which one is which.
"""
self.lastGpio = None
self.gpioA = gpioA
self.gpioB = gpioB
self.callback = callback
self.gpioButton = buttonPin
self.buttonCallback = buttonCallback
self.levA = 0
self.levB = 0
GPIO.setmode(GPIO.BCM)
GPIO.setup(self.gpioA, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.setup(self.gpioB, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.add_event_detect(self.gpioA, GPIO.BOTH, self._callback)
GPIO.add_event_detect(self.gpioB, GPIO.BOTH, self._callback)
if self.gpioButton:
GPIO.setup(self.gpioButton, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.add_event_detect(self.gpioButton, GPIO.FALLING, self._buttonCallback, bouncetime=500)
def destroy(self):
GPIO.remove_event_detect(self.gpioA)
GPIO.remove_event_detect(self.gpioB)
GPIO.cleanup()
def _buttonCallback(self, channel):
self.buttonCallback(GPIO.input(channel))
def _callback(self, channel):
level = GPIO.input(channel)
if channel == self.gpioA:
self.levA = level
else:
self.levB = level
# Debounce.
if channel == self.lastGpio:
return
# When both inputs are at 1, we'll fire a callback. If A was the most
# recent pin set high, it'll be forward, and if B was the most recent pin
# set high, it'll be reverse.
self.lastGpio = channel
if channel == self.gpioA and level == 1:
if self.levB == 1:
self.callback(1)
elif channel == self.gpioB and level == 1:
if self.levA == 1:
self.callback(-1)
class VolumeError(Exception):
pass
class Volume:
def __init__(self):
self.volume = self.volumio("volume")
def plus(self):
self.volumio(ACTION_PLUS)
return self.volumio("volume")
def minus(self):
self.volumio(ACTION_MINUS)
return self.volumio("volume")
def toggle(self):
self.volumio(ACTION_TOGGLE)
def volumio(self, cmd):
p = subprocess.Popen("volumio {}".format(cmd), shell=True, stdout=subprocess.PIPE)
code = p.wait()
if code != 0:
raise VolumeError("Unknown error")
sys.exit(0)
return p.stdout
if __name__ == "__main__":
gpioA = GPIO_A
gpioB = GPIO_B
gpioButton = GPIO_BUTTON
v = Volume()
def on_press(value):
v.toggle()
EVENT.set()
# This callback runs in the background thread. All it does is put turn
# events into a queue and flag the main thread to process them. The
# queueing ensures that we won't miss anything if the knob is turned
# extremely quickly.
def on_turn(delta):
QUEUE.put(delta)
EVENT.set()
def consume_queue():
while not QUEUE.empty():
delta = QUEUE.get()
handle_delta(delta)
def handle_delta(delta):
if delta == 1:
vol = v.plus()
else:
vol = v.minus()
print("Set volume to: {}".format(vol))
def on_exit(a, b):
print("Exiting...")
encoder.destroy()
sys.exit(0)
debug("Volume knob using pins {} and {}".format(gpioA, gpioB))
if gpioButton != None:
debug("Volume button using pin {}".format(gpioButton))
debug("Initial volume: {}".format(v.volume))
encoder = RotaryEncoder(GPIO_A, GPIO_B, callback=on_turn, buttonPin=GPIO_BUTTON, buttonCallback=on_press)
signal.signal(signal.SIGINT, on_exit)
while True:
# This is the best way I could come up with to ensure that this script
# runs indefinitely without wasting CPU by polling. The main thread will
# block quietly while waiting for the event to get flagged. When the knob
# is turned we're able to respond immediately, but when it's not being
# turned we're not looping at all.
#
# The 1200-second (20 minute) timeout is a hack; for some reason, if I
# don't specify a timeout, I'm unable to get the SIGINT handler above to
# work properly. But if there is a timeout set, even if it's a very long
# timeout, then Ctrl-C works as intended. No idea why.
EVENT.wait(1200)
consume_queue()
EVENT.clear()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment