Last active
December 18, 2024 00:02
-
-
Save aaugustin/3b6f8f4ee11d08dbe4bc59432d9c8cf4 to your computer and use it in GitHub Desktop.
Hook a Griffin PowerMate to a Raspberry Pi running HifiBerry!
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3.8 | |
""" | |
To enable this service, copy this file to /opt, then: | |
# chmod +x /opt/powermate.py | |
# pip3.8 install websockets | |
# cat > /etc/systemd/system/powermate.service | |
[Unit] | |
Description=PowerMate | |
Wants=beocreate2.service | |
After=beocreate2.service | |
[Service] | |
ExecStartPre=/opt/hifiberry/bin/bootmsg "Starting PowerMate" | |
ExecStart=/opt/powermate.py | |
StandardOutput=journal | |
Restart=always | |
RestartSec=5 | |
[Install] | |
WantedBy=multi-user.target | |
# systemctl daemon-reload | |
# systemctl enable powermate | |
# systemctl start powermate | |
""" | |
import asyncio | |
import dataclasses | |
import glob | |
import json | |
import logging | |
import signal | |
import struct | |
import sys | |
import time | |
import websockets | |
# ~~~~~~~~ ~~~~~~~~ PowerMate connector ~~~~~~~~ ~~~~~~~~ # | |
@dataclasses.dataclass | |
class PowerMateEvent: | |
""" | |
PowerMate event read from or written to /dev/input/eventX. | |
""" | |
time_: float | |
type_: int | |
code: int | |
value: int | |
FORMAT = "nnHHl" | |
SIZE = struct.calcsize(FORMAT) | |
@classmethod | |
def parse(cls, data): | |
# See https://www.kernel.org/doc/Documentation/input/input.txt. | |
# | |
# > You'll always get a whole number of input events on a read. | |
# > Their layout is: | |
# > | |
# > struct input_event { | |
# > struct timeval time; | |
# > unsigned short type; | |
# > unsigned short code; | |
# > unsigned int value; | |
# > }; | |
tv_sec, tv_usec, type_, code, value = struct.unpack(cls.FORMAT, data) | |
return cls(tv_sec + tv_usec / 1_000_000, type_, code, value) | |
def serialize(self): | |
tv_sec = int(self.time_) | |
tv_usec = int((self.time_ - tv_sec) * 1_000_000) | |
return struct.pack( | |
self.FORMAT, tv_sec, tv_usec, self.type_, self.code, self.value | |
) | |
class PowerMate: | |
""" | |
Interface with a PowerMate. | |
""" | |
LOGGER = logging.getLogger("powermate") | |
DEV_INPUT = "/dev/input/by-id/*PowerMate*" | |
# From uapi/linux/input-event-codes.h | |
EV_SYN = 0x00 | |
EV_KEY = 0x01 | |
EV_REL = 0x02 | |
EV_MSC = 0x04 | |
SYN_REPORT = 0 | |
BTN_MISC = 0x100 | |
REL_DIAL = 0x07 | |
MSC_PULSELED = 0x01 | |
def __init__(self, handle_rotate=None, handle_button=None): | |
""" | |
Configure a PowerMate interface. | |
``handle_rotate`` is a callback for rotate events. It receives the | |
event value, usually +1 / -1 for right / left respectively, but up to | |
+7 / -7 when rotating fast, and the event timestamp. | |
``handle_button`` is a callback for button presses. It receives the | |
event value, ``True`` / ``False`` for down / up respectively, and the | |
event timestamp. | |
If you need to track state, for example to detect double clicks, you | |
can make ``handle_rotate`` and ``handle_button`` methods of a class. | |
""" | |
self.device = None | |
self.handle_rotate = handle_rotate | |
self.handle_button = handle_button | |
async def read_event(self): | |
""" | |
Read a single event from device. | |
""" | |
data = await asyncio.get_event_loop().run_in_executor( | |
None, self.device.read, PowerMateEvent.SIZE, | |
) | |
assert len(data) == PowerMateEvent.SIZE, "incomplete read" | |
event = PowerMateEvent.parse(data) | |
self.LOGGER.debug("read event %r", event) | |
return event | |
async def write_event(self, type_, code, value): | |
""" | |
Write a single event to device. | |
""" | |
event = PowerMateEvent(time.time(), type_, code, value) | |
self.LOGGER.debug("write event %r", event) | |
data = event.serialize() | |
bytes_written = await asyncio.get_event_loop().run_in_executor( | |
None, self.device.write, data, | |
) | |
assert bytes_written == len(data), "incomplete write" | |
async def handle_event(self, event): | |
""" | |
Process a single event read from device. | |
""" | |
if event.type_ == self.EV_SYN: | |
assert event.code == self.SYN_REPORT | |
elif event.type_ == self.EV_KEY: | |
assert event.code == self.BTN_MISC | |
down = bool(event.value) | |
self.LOGGER.debug("button %s", "down" if down else "up") | |
if self.handle_button is not None: | |
await self.handle_button(down, event.time_) | |
elif event.type_ == self.EV_REL: | |
assert event.code == self.REL_DIAL | |
rotation = event.value | |
self.LOGGER.debug( | |
"rotate %d %s", abs(rotation), "right" if rotation > 0 else "left" | |
) | |
if self.handle_rotate is not None: | |
await self.handle_rotate(rotation, event.time_) | |
elif event.type_ == self.EV_MSC: | |
assert event.code == self.MSC_PULSELED | |
else: | |
self.LOGGER.error("unsupported event type %02x", event.type_) | |
async def run(self, input_=None): | |
""" | |
Process stream of events received from PowerMate. | |
This function calls``handle_rotate`` and ``handle_button`` as events | |
are received. | |
If ``input_`` is provided, it must be a file name under ``/dev/input`` | |
corresponding to a PowerMate. If it isn't provided and a single | |
PowerMate is attached to the system, connect to that PowerMate. | |
""" | |
if input_ is None: | |
inputs = glob.glob(self.DEV_INPUT) | |
if len(inputs) != 1: | |
raise RuntimeError("zero or several PowerMate found") | |
input_ = inputs[0] | |
self.LOGGER.debug("reading events from %s", input_) | |
# While it could be safer to check in /proc/bus/input/handlers that | |
# we're connecting to a PowerMate, or to use the EVIOCGNAME ioctl, | |
# this isn't really useful in practice. | |
with open(input_, mode="r+b", buffering=0) as self.device: | |
try: | |
while True: | |
event = await self.read_event() | |
await self.handle_event(event) | |
finally: | |
# Send a dummy event; it will be echoed; this will unblock the | |
# thread that does the blocking read. This hack is necessary | |
# because there's no way to kill a thread blocked on I/O. | |
await self.write_event(self.EV_SYN, self.SYN_REPORT, 0) | |
async def set_led( | |
self, | |
brightness=1, | |
speed=1, | |
pulse_table=0, | |
pulse_asleep=False, | |
pulse_awake=False, | |
): | |
""" | |
Configure LED. | |
``brightness`` is a float between 0 and 1. | |
``speed`` is a float between 0 and 2, where 1 is the standard speed, | |
and only values really close to 1 are usable. | |
``pulse_table`` is 0, 1, or 2. It sets the type of pulse. | |
``pulse_asleep`` and ``pulse_awake`` set whether the PowerMate pulses | |
when the computer is asleep or awake respctively. | |
""" | |
self.LOGGER.debug( | |
"set LED brightness=%.3f speed=%.3f " | |
"pulse_table=%d pulse_asleep=%s pulse_awake=%s", | |
brightness, | |
speed, | |
pulse_table, | |
pulse_asleep, | |
pulse_awake, | |
) | |
brightness = int(brightness * 255) | |
assert 0 <= brightness <= 255 | |
speed = int(speed * 255) | |
assert 0 <= speed <= 510 | |
pulse_table = int(pulse_table) | |
assert 0 <= pulse_table <= 2 | |
pulse_asleep = int(pulse_asleep) | |
assert 0 <= pulse_asleep <= 1 | |
pulse_awake = int(pulse_awake) | |
assert 0 <= pulse_awake <= 1 | |
value = ( | |
brightness | |
| speed << 8 | |
| pulse_table << 17 | |
| pulse_asleep << 19 | |
| pulse_awake << 20 | |
) | |
await self.write_event(self.EV_MSC, self.MSC_PULSELED, value) | |
# ~~~~~~~~ ~~~~~~~~ Beocreate 2 connector ~~~~~~~~ ~~~~~~~~ # | |
@dataclasses.dataclass | |
class BeocreateEvent: | |
""" | |
Beocreate 2 server event read from or written to WebSocket connection. | |
""" | |
target: str | |
header: str | |
content: object = None | |
@classmethod | |
def parse(cls, msg): | |
data = json.loads(msg) | |
return cls(data["target"], data["header"], data.get("content")) | |
def serialize(self): | |
data = dataclasses.asdict(self) | |
if data["content"] is None: | |
del data["content"] | |
return json.dumps(data) | |
class Beocreate: | |
""" | |
Interface with a Beocreate 2 server. | |
""" | |
LOGGER = logging.getLogger("beocreate") | |
BEO_WS = "ws://localhost/" | |
def __init__(self, handle_volume=None, handle_playpause=None): | |
""" | |
Configure a Beocreate 2 server interface. | |
``handle_volume`` is a callback for volume events. It receives the | |
volume as a value between 0 and 100. | |
``handle_playpause`` is a callback for button presses. It receives the | |
play or pause status as``True`` or ``False`` respectively. | |
""" | |
self.websocket = None | |
self.volume = None | |
self.handle_volume = handle_volume | |
self.playing = None | |
self.handle_playpause = handle_playpause | |
async def handle_event(self, event): | |
""" | |
Process a single event read from Beocreate2 server. | |
""" | |
if event.target == "sound": | |
if event.header == "systemVolume": | |
self.volume = event.content["volume"] | |
self.LOGGER.debug("volume %d", self.volume) | |
if self.handle_volume is not None: | |
await self.handle_volume(self.volume) | |
elif event.target == "sources": | |
if event.header == "sources": | |
try: | |
source_name = event.content["focusedSource"] | |
source = event.content["sources"][source_name] | |
self.playing = source["playerState"] == "playing" | |
except KeyError: | |
self.playing = False | |
self.LOGGER.debug( | |
"player %s", "playing" if self.playing else "stopped" | |
) | |
if self.handle_playpause is not None: | |
await self.handle_playpause(self.playing) | |
async def run(self, beo_ws=None): | |
""" | |
Process stream of events received from Beocreate 2 server. | |
This function calls``handle_volume`` and ``handle_playpause`` as events | |
are received. | |
If ``beo_ws`` is provided, it is the address of the WebSocket endpoint | |
of the Beocreate 2 server. | |
""" | |
if beo_ws is None: | |
beo_ws = self.BEO_WS | |
self.LOGGER.debug("reading events from %s", beo_ws) | |
async with websockets.connect( | |
beo_ws, subprotocols=["beocreate"], | |
) as self.websocket: | |
assert self.websocket.subprotocol == "beocreate" | |
await self.send_event("sound", "getVolume") | |
await self.send_event("sources", "getSources") | |
async for msg in self.websocket: | |
event = BeocreateEvent.parse(msg) | |
self.LOGGER.debug("read event %r", event) | |
await self.handle_event(event) | |
self.websocket = None | |
async def send_event(self, target, header, content=None): | |
""" | |
Send an event to Beocreate 2 server. | |
""" | |
assert self.websocket is not None, "send_event() outside of run()" | |
event = BeocreateEvent(target, header, content) | |
self.LOGGER.debug("write event %r", event) | |
await self.websocket.send(event.serialize()) | |
async def set_volume(self, volume): | |
""" | |
Set volume. | |
""" | |
self.LOGGER.debug("set volume to %d", volume) | |
await self.send_event("sound", "setVolume", volume) | |
async def toggle_playing(self): | |
""" | |
Toggle between play and pause. | |
""" | |
self.LOGGER.debug("toggle playing") | |
content = {"action": "playPause"} | |
await self.send_event("now-playing", "transport", content) | |
# ~~~~~~~~ ~~~~~~~~ Main program ~~~~~~~~ ~~~~~~~~ # | |
async def main(input_=None, beo_ws=None): | |
volume = 0 | |
playing = False | |
async def handle_rotate(rotation, timestamp): | |
nonlocal beocreate, volume | |
volume += rotation | |
await beocreate.set_volume(volume) | |
await set_led() | |
async def handle_button(down, timestamp): | |
nonlocal beocreate, playing | |
if down: | |
playing = not playing | |
await beocreate.toggle_playing() | |
await set_led() | |
async def set_led(): | |
nonlocal powermate, volume, playing | |
await powermate.set_led(volume / 100 if playing else 0.1) | |
async def handle_volume(new_volume): | |
nonlocal volume | |
if new_volume != volume: | |
volume = new_volume | |
await set_led() | |
async def handle_playpause(new_playing): | |
nonlocal playing | |
if new_playing != playing: | |
playing = new_playing | |
await set_led() | |
loop = asyncio.get_event_loop() | |
powermate = PowerMate(handle_rotate, handle_button) | |
powermate_task = loop.create_task(powermate.run()) | |
beocreate = Beocreate(handle_volume, handle_playpause) | |
beocreate_task = loop.create_task(beocreate.run()) | |
stop = loop.create_future() | |
loop.add_signal_handler(signal.SIGINT, stop.set_result, None) | |
loop.add_signal_handler(signal.SIGTERM, stop.set_result, None) | |
await asyncio.wait( | |
[powermate_task, beocreate_task, stop], return_when=asyncio.FIRST_COMPLETED, | |
) | |
beocreate_task.cancel() | |
try: | |
await beocreate_task | |
except asyncio.CancelledError: | |
pass | |
else: | |
logging.exception("beocreate connector failed") | |
powermate_task.cancel() | |
try: | |
await powermate_task | |
except asyncio.CancelledError: | |
pass | |
else: | |
logging.exception("powermate connector failed") | |
if __name__ == "__main__": | |
logging.basicConfig(level=logging.DEBUG) | |
logging.getLogger("websockets").setLevel(logging.INFO) | |
if all( | |
[ | |
len(sys.argv) < 2 or sys.argv[1].startswith("/"), | |
len(sys.argv) < 3 or sys.argv[2].startswith("ws"), | |
len(sys.argv) < 4, | |
] | |
): | |
asyncio.run(main(*sys.argv[1:])) | |
else: | |
sys.stderr.write( | |
f"usage: {sys.argv[0]} [/dev/input/eventX] [ws://beocreate2/]\n" | |
) | |
sys.exit(2) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment