Last active
July 7, 2023 22:48
-
-
Save EarlGray/5e7ccad00772f61c3843b39f0d1175d2 to your computer and use it in GitHub Desktop.
dmytrish's swaybar
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
A swaybar implementation. | |
Current plugins: | |
- keyboard layout state and tracking | |
- PulseAudio-compatible sound: device, volume and mute status | |
- power supply: on battery/ac, percentage, time to emtpy/full | |
- network: the name of the primary connection, `nmmenu` on click | |
- time in different formats | |
Use `status -a` to show all. | |
Dependencies: | |
- dbus_next | |
This script logs to `/tmp/swaybar.log` by default, use `SWAYBAR_LOG=/your/path` | |
to override. | |
""" | |
import os | |
import re | |
import sys | |
import json | |
import subprocess | |
import logging | |
import argparse | |
import traceback | |
import dataclasses | |
from enum import Enum | |
from datetime import datetime | |
from pathlib import Path | |
from collections import namedtuple | |
from dataclasses import dataclass | |
from typing import (Dict, Self, Union) | |
import asyncio | |
from asyncio import Queue | |
from asyncio.exceptions import CancelledError | |
from dbus_next import BusType | |
from dbus_next.signature import Variant | |
from dbus_next.aio import MessageBus | |
from dbus_next.aio.proxy_object import ProxyObject, ProxyInterface | |
# TODO: replace swaymsg invocations with sway ipc (`man sway-ipc`) | |
class Mouse(int, Enum): | |
""" sway mouse buttons """ | |
BTN_LEFT = 1 | |
BTN_MIDDLE = 2 | |
BTN_RIGHT = 3 | |
WHEEL_UP = 4 | |
WHEEL_DOWN = 5 | |
@dataclass | |
class ClickEvent: | |
""" swaybar-protocol click event """ | |
name: str | |
button: Mouse | |
event: str | |
x: int | |
y: int | |
relative_x: int | |
relative_y: int | |
width: int | |
height: int | |
scale: float | |
@classmethod | |
def from_dict(cls, d) -> Self: | |
if not hasattr(cls, '_fields'): | |
cls._fields = set(f.name for f in dataclasses.fields(cls)) | |
kv = { k: v for k, v in d.items() if k in cls._fields } | |
return cls(**kv) | |
SYSTEM_BUS: MessageBus = None | |
SESSION_BUS: MessageBus = None | |
async def dbus_system_bus() -> MessageBus: | |
global SYSTEM_BUS | |
if not SYSTEM_BUS: | |
SYSTEM_BUS = await MessageBus(bus_type=BusType.SYSTEM).connect() | |
return SYSTEM_BUS | |
async def dbus_session_bus() -> MessageBus: | |
global SESSION_BUS | |
if not SESSION_BUS: | |
SESSION_BUS = await MessageBus(bus_type=BusType.SESSION).connect() | |
return SESSION_BUS | |
class Tracker: | |
NAME: str = '???' | |
DBUS_IF_PROPERTIES: str = 'org.freedesktop.DBus.Properties' | |
def __init__(self): | |
self.task = None | |
def start(self, eventq: Queue): | |
self.task = asyncio.create_task(self._run(eventq)) | |
return self.task | |
async def _run(self, eventq): | |
try: | |
await self.run(eventq) | |
except CancelledError: | |
raise | |
except Exception as e: | |
logging.error(f'{self.NAME}: {e}') | |
raise | |
finally: | |
logging.info(f'{self.NAME}: exited') | |
await eventq.put({self.NAME: f'{self.NAME}: ---'}) | |
def stop(self): | |
if not self.task.done(): | |
self.task.cancel() | |
return self.task.exception() or self.task.result() | |
def on_click(self, event: ClickEvent): | |
logging.info(f'{self.NAME}.on_click: {event}') | |
@staticmethod | |
async def run_subprocess(*cmd): | |
proc = await asyncio.create_subprocess_exec(*cmd) | |
await proc.wait() | |
@staticmethod | |
async def dbus_proxy(svc: str, path: str, iface: str) -> ProxyInterface: | |
bus = await dbus_system_bus() | |
objxml = await bus.introspect(svc, path); | |
obj = bus.get_proxy_object(svc, path, objxml) | |
return obj.get_interface(iface) | |
class TaskKeyboard(Tracker): | |
NAME = 'kbd' | |
KNOWN_LAYOUTS = { | |
'English (US)': 'en', | |
'Ukrainian': '🇺🇦', | |
} | |
@staticmethod | |
def shorten_layout(name): | |
return TaskKeyboard.KNOWN_LAYOUTS.get(name, name) | |
@staticmethod | |
def get_inputs(): | |
inputs = subprocess.check_output('swaymsg -t get_inputs'.split()) | |
inputs = inputs.decode('utf-8') | |
return json.loads(inputs) | |
def __init__(self): | |
super().__init__() | |
state = {} # input_name => layout | |
inputs = TaskKeyboard.get_inputs() | |
for inp in inputs: | |
if not (inp['type'] == 'keyboard' and 'keyboard' in inp['name']): | |
continue | |
state[inp['identifier']] = inp['xkb_active_layout_name'] | |
self.state = state | |
def _format(self): | |
return 'kbd: ' + ','.join(self.shorten_layout(lt) for lt in self.state.values()) | |
async def run(self, eventq: Queue): | |
try: | |
await eventq.put({'kbd': self._format()}) | |
proc = await asyncio.create_subprocess_exec( | |
*'swaymsg -t subscribe -m ["input"]'.split(), | |
stdout=subprocess.PIPE, | |
) | |
while proc.returncode is None: | |
output = await proc.stdout.readline() | |
output = output.decode('utf-8') | |
event = json.loads(output) | |
if event['change'] != 'xkb_layout': | |
continue | |
input = event['input'] | |
kbd = input['identifier'] | |
if kbd in self.state: | |
active_layout = input['xkb_active_layout_name'] | |
self.state[kbd] = active_layout | |
await eventq.put({'kbd': self._format()}) | |
logging.info(f'{self.NAME}: swaymsg exited ({proc.returncode})') | |
finally: | |
await eventq.put({'kbd': 'kbd: --'}) | |
if proc.returncode is None: | |
proc.terminate() | |
def on_click(self, event: ClickEvent): | |
for kbd in self.state: | |
cmd = ['swaymsg', 'input', kbd, 'xkb_switch_layout', 'next'] | |
logging.info(f'{self.NAME}: {cmd}') | |
subprocess.run(cmd, capture_output=True) | |
logging.info(f'{self.NAME}.on_click: {event}') | |
class TaskTime(Tracker): | |
NAME = 'time' | |
FORMATS = [ | |
"%a %b %d, %T", | |
"%b %d, %T", | |
"%b %d, %H:%M", | |
"%T", | |
] | |
def __init__(self): | |
self._format = 0 | |
def _tick(self): | |
fmt = self.FORMATS[self._format] | |
time = datetime.now().strftime(fmt) | |
return {self.NAME: time} | |
async def run(self, eventq: Queue): | |
self.eventq = eventq | |
while True: | |
await self.eventq.put(self._tick()) | |
usec = datetime.now().microsecond | |
await asyncio.sleep(1 - usec/1000000) | |
def on_click(self, event: ClickEvent): | |
self._format = (self._format + 1) % len(self.FORMATS) | |
logging.info(f'{self.NAME}.on_click: {self._format=}') | |
try: | |
self.eventq.put_nowait(self._tick()) | |
except asyncio.QueueFull: | |
pass | |
class TaskSound(Tracker): | |
""" Check and track sound devices/volume/mutes via PulseAudio interfaces | |
""" | |
NAME = 'sound' | |
# TODO: use `org.pulseaudio.Server` on DBus | |
# but: pipewire/pipewire-pulse has no support for dbus at the moment. | |
# rewrite this with `import pulsectl` instead? | |
# I use these, add yours if needed. | |
KNOWN_SINKS = { | |
'hdmi-stereo': 'hdmi', | |
'analog-stereo': 'spkr', | |
'a2dp-sink': 'bluz', | |
} | |
@staticmethod | |
def pactl_sinks(): | |
output = subprocess.check_output('pactl -f json list sinks'.split()) | |
output = output.decode('utf-8') | |
return json.loads(output) | |
@classmethod | |
def sound_info(cls): | |
try: | |
info = subprocess.check_output('pactl -f json info'.split()) | |
info = json.loads(info.decode('utf-8')) | |
snddev = info['default_sink_name'] | |
sinks = TaskSound.pactl_sinks() | |
sinks = { s['name']: s for s in sinks } | |
sink = sinks[snddev] | |
for s, sh in TaskSound.KNOWN_SINKS.items(): | |
if s in snddev: | |
snddev = sh | |
break | |
logging.debug(f'{cls.__name__}: sink = {json.dumps(sink)}') | |
lperc = sink['volume']['front-left']['value_percent'] | |
rperc = sink['volume']['front-right']['value_percent'] | |
vol = f'{snddev}: {lperc}' | |
if lperc != rperc: | |
vol += '/{rperc}' | |
if sink['mute']: | |
vol += ' \U0001F507' # "muted speaker" | |
return vol | |
except Exception as e: | |
logging.error(f'task_sound: {str(e)}') | |
return 'snd: ???' | |
async def run(self, eventq: Queue): | |
event_re = re.compile(r"Event '(?P<event>.*)' on sink #(?P<id>\d+)") | |
# initial value | |
volume = self.sound_info() | |
await eventq.put({'sound': volume}) | |
try: | |
proc = await asyncio.create_subprocess_exec( | |
# TODO: pactl -f json subscribe ? | |
'pactl', 'subscribe', | |
stdout=subprocess.PIPE, | |
) | |
while proc.returncode is None: | |
output = await proc.stdout.readline() | |
output = output.decode('utf-8') | |
m = event_re.match(output) | |
if m: | |
volume = self.sound_info() | |
await eventq.put({'sound': volume}) | |
logging.info(f'pactl exited ({proc.returncode})') | |
finally: | |
if proc.returncode is None: | |
proc.terminate() | |
def on_click(self, event: ClickEvent): | |
logging.info(f'sound.on_click: {event.button}') | |
if event.button == Mouse.BTN_RIGHT: | |
asyncio.create_task( | |
self.run_subprocess('pavucontrol'), | |
) | |
elif event.button == Mouse.BTN_MIDDLE: | |
cmd = "pactl set-sink-mute 0 toggle" | |
subprocess.run(cmd.split(), capture_output=True) | |
logging.info(cmd) | |
elif event.button == Mouse.WHEEL_UP: | |
cmd = 'pactl set-sink-volume 0 +5%' | |
proc = subprocess.run(cmd.split(), capture_output=True) | |
logging.info(f'{cmd} => {proc}') | |
elif event.button == Mouse.WHEEL_DOWN: | |
cmd = 'pactl set-sink-volume 0 -5%' | |
subprocess.run(cmd.split(), capture_output=True) | |
class TaskNetwork(Tracker): | |
"""Display short networking info | |
- `net: ---` if disabled | |
- `<type>: <name|ip4|ip6|...>` for primary connection | |
On right click: run `nmmenu` | |
""" | |
NAME = 'net' | |
# TODO: use `org.freedesktop.NetworkMonitor` on DBus | |
# o.f.NM /o/f/NM o.f.NM.NetworkingEnabled | |
# o.f.NM /o/f/NM o.f.NM.Connectivity | |
# o.f.NM /o/f/NM o.f.NM.PrimaryConnection -> /o/f/NM/ActiveConnection/<N> | |
# o.f.NM /o/f/NM o.f.NM.PrimaryConnectionType | |
# o.f.NM /o/f/NM/ActiveConnection/<N> o.f.NM.Connection.Active.Ip4Config | |
# \> o.f.NM /o/f/NM/IP4Config/<C> o.f.NM.IP4Config.AddressData | |
# o.f.NM /o/f/NM/ActiveConnection/<N> o.f.NM.Connection.Active.Ip6Config | |
# \> o.f.NM /o/f/NM/IP6Config/<C> o.f.NM.IP6Config.AddressData | |
# Subscribe to /o/f/NM o.f.NM.Connection.StateChanged | |
@staticmethod | |
def network_info() -> str: | |
nmcli_output = subprocess.check_output( | |
'nmcli --terse con show --active'.split(), | |
).decode('utf-8') | |
networks = [] | |
for network in nmcli_output.splitlines(): | |
name, id, typ, iface = network.split(":") | |
if typ in ["802-11-wireless"]: | |
return f'wifi: {name}' | |
if typ == 'loopback': | |
continue | |
net = { | |
'name': name, | |
'id': id, | |
'type': typ, | |
'iface': iface, | |
'routes': [], | |
'gateway': '', | |
'dns': '', | |
} | |
networks.append(net) | |
# TODO: get routes and ips via | |
# nmcli -t con show $names | grep -Ei '^(ip4|ip6|connection.id)' | |
return f'{len(networks)} net' | |
async def run(self, eventq: Queue): | |
try: | |
proc = await asyncio.create_subprocess_exec( | |
*'nmcli monitor'.split(), | |
stdout=subprocess.PIPE, | |
) | |
# initial information | |
info = TaskNetwork.network_info() | |
await eventq.put({'net': info}) | |
while proc.returncode is None: | |
output = await proc.stdout.readline() | |
output = output.decode('utf-8') | |
info = TaskNetwork.network_info() | |
await eventq.put({'net': info}) | |
logging.info('`nmcli monitor` exited') | |
finally: | |
if proc.returncode is None: | |
proc.terminate() | |
def on_click(self, event: ClickEvent): | |
if event.button == Mouse.BTN_RIGHT: | |
asyncio.create_task( | |
self.run_subprocess(*'nmmenu --wrap -pnet'.split()), | |
) | |
class TaskPower(Tracker): | |
"""Display power supplies | |
- `ac: 100%` when plugged in and charged | |
- `ac: XX%, <time to full>` when plugged in and charging | |
- `bat0: XX%, <time to empty>` when on battery | |
Text color changes to reflect the active power profile: | |
- `balanced`: default color | |
- `power-saver`: green text | |
- `performance`: yellow text | |
""" | |
NAME = 'power' | |
DBUS_PWRPROF_SVC = 'net.hadess.PowerProfiles' | |
DBUS_PWRPROF_PATH = '/net/hadess/PowerProfiles' | |
DBUS_UPOWER_SVC = 'org.freedesktop.UPower' | |
DBUS_UPOWER_PATH = '/org/freedesktop/UPower' | |
DBUS_UPOWER_DEV_IF = 'org.freedesktop.UPower.Device' | |
# TODO: discover these: | |
DBUS_DEV_BAT0 = '/org/freedesktop/UPower/devices/battery_BAT0' | |
DBUS_DEV_AC = '/org/freedesktop/UPower/devices/line_power_AC' | |
async def get_powerprofile(self) -> str: | |
if not hasattr(self, 'dbus_pwrprof'): | |
self.dbus_pwrprof = await self.dbus_proxy( | |
self.DBUS_PWRPROF_SVC, | |
self.DBUS_PWRPROF_PATH, | |
self.DBUS_PWRPROF_SVC, | |
) | |
profile = await self.dbus_pwrprof.get_active_profile() | |
return profile | |
async def _get_upower_bat0(self) -> ProxyInterface: | |
if not hasattr(self, 'dbus_bat0'): | |
self.dbus_bat0 = await self.dbus_proxy( | |
self.DBUS_UPOWER_SVC, | |
self.DBUS_DEV_BAT0, | |
self.DBUS_UPOWER_DEV_IF, | |
) | |
return self.dbus_bat0 | |
async def acpi_info(self) -> Union[str, Dict[str, str]]: | |
bat0 = await self._get_upower_bat0() | |
energy, energy_full, time_empty, time_full, profile = await asyncio.gather( | |
bat0.get_energy(), | |
bat0.get_energy_full(), | |
bat0.get_time_to_empty(), | |
bat0.get_time_to_full(), | |
self.get_powerprofile(), | |
return_exceptions = True, | |
) | |
perc = int(100 * energy / energy_full) | |
if time_empty != 0: | |
discharging = True | |
time = int(time_empty) // 60 | |
hours, mins = time / 60, time % 60 | |
full_text = f'bat: {perc}%, {hours:.0f}:{mins:02.0f}' | |
elif time_full != 0: | |
discharging = False | |
time = int(time_full) // 60 | |
hours, mins = time / 60, time % 60 | |
full_text = f'ac: {perc}%, {hours:.0f}:{mins:02.0f}' | |
else: | |
discharging = False | |
full_text = f'ac: {perc}%' | |
ret = { 'full_text': full_text } | |
if profile == 'balanced': | |
pass | |
elif profile == 'power-saver': | |
ret['color'] = '#00ff00' | |
elif profile == 'performance': | |
ret['color'] = '#ffa000' | |
else: | |
ret['background'] = '#ff0000' | |
return ret | |
async def run(self, eventq: Queue): | |
sync = asyncio.Event() | |
#def on_changed(interface_name, changed_properties, invalidated_properties): | |
# """ debug callback """ | |
# logging.warning(f'UPower.Changed {interface_name=}, {changed_properties=}, {invalidated_properties=}') | |
def on_upower_changed(interface_name, changed_properties, invalidated_properties): | |
on_battery = changed_properties.get('OnBattery') | |
if on_battery is not None: | |
logging.info(f'UPower.OnBattery = {on_battery.value}') | |
sync.set() | |
def on_battery_update(interface_name, changed_properties, invalidated_properties): | |
#state = changed_properties.get('State') | |
tfull = changed_properties.get('TimeToFull') | |
tempty = changed_properties.get('TimeToEmpty') | |
if tfull or tempty: | |
sync.set() | |
upower = await self.dbus_proxy(self.DBUS_UPOWER_SVC, self.DBUS_UPOWER_PATH, self.DBUS_IF_PROPERTIES) | |
upower.on_properties_changed(on_upower_changed) | |
bat0 = await self.dbus_proxy(self.DBUS_UPOWER_SVC, self.DBUS_DEV_BAT0, self.DBUS_IF_PROPERTIES) | |
bat0.on_properties_changed(on_battery_update) | |
pwrprof = await self.dbus_proxy(self.DBUS_PWRPROF_SVC, self.DBUS_PWRPROF_PATH, self.DBUS_IF_PROPERTIES) | |
pwrprof.on_properties_changed(lambda ifname, chnged, invldted: sync.set()) | |
while True: | |
info = await self.acpi_info() | |
await eventq.put({'power': info}) | |
await sync.wait() | |
sync.clear() | |
#def on_click(self, event: ClickEvent): | |
# TODO: change power profile | |
def setup_input(eventq: Queue): | |
import fileinput | |
from threading import Thread | |
loop = asyncio.get_event_loop() | |
def on_input(line): | |
loop.create_task(eventq.put({'input': line})) | |
def thread_input(on_input): | |
try: | |
for line in fileinput.input(files='-'): | |
line = line.strip().strip(',') | |
if line == '[': continue | |
loop.call_soon_threadsafe(on_input, line) | |
except Exception as e: | |
subprocess.call(["notify-send", "thread_input: ERROR " + str(e)]) | |
t = Thread(target=thread_input, args=[on_input]) | |
t.start() | |
async def task_main(tasks: Dict[str, Tracker]): | |
# https://man.archlinux.org/man/swaybar-protocol.7 | |
state = {} | |
eventq = Queue(maxsize=1) | |
try: | |
setup_input(eventq) | |
for t in tasks.values(): | |
state[t.NAME] = '' | |
t.start(eventq) | |
print(json.dumps({"version": 1, "click_events": True})) | |
print('[') | |
while True: | |
event = await eventq.get() | |
if 'input' in event: | |
inp = event.get('input') | |
#logging.info(f'click: {inp}') | |
try: | |
inp = json.loads(inp) | |
ev = ClickEvent.from_dict(inp) | |
tasks[ev.name].on_click(ev) | |
except Exception as e: | |
logging.error(f'input parse error: {e}') | |
else: | |
state.update(event) | |
# output | |
blocks = [] | |
for name, value in state.items(): | |
output = {'name': name} | |
if isinstance(value, str): | |
full_text = value | |
elif isinstance(value, dict): | |
output.update(value) | |
full_text = value['full_text'] | |
output['full_text'] = ' '+full_text+' ' | |
blocks.append(output) | |
print(json.dumps(blocks), end=",\n") | |
sys.stdout.flush() | |
except KeyboardInterrupt: | |
pass | |
except Exception as e: | |
logging.error(f'task_main: {e}') | |
for t in tasks.values(): | |
ret = t.stop() | |
if ret: | |
logging.warn(f'{t.NAME}: {ret}') | |
def flags(): | |
flags = argparse.ArgumentParser( | |
'swaytus', | |
description='A status bar implementation for Sway', | |
) | |
flags.add_argument( | |
'-a', '--all', action='store_true', | |
help='Show everything', | |
) | |
flags.add_argument( | |
'-t', '--time', action='store_true', | |
help='Show date and time', | |
) | |
flags.add_argument( | |
'-n', '--net', action='store_true', | |
help='Show network status', | |
) | |
flags.add_argument( | |
'-k', '--kbd', action='store_true', | |
help='Show and track keyboard layouts', | |
) | |
flags.add_argument( | |
'-s', '--sound', action='store_true', | |
help='Show and track sound volume', | |
) | |
flags.add_argument( | |
'-p', '--power', action='store_true', | |
help='Show and track power (battery level, etc)', | |
) | |
return flags | |
def main(): | |
args = flags().parse_args() | |
logfile = os.getenv('SWAYBAR_LOG') or "/tmp/swaybar.log" | |
logging.basicConfig( | |
format='%(asctime)s\t%(levelname)s\t%(module)s:%(message)s', | |
level=logging.INFO, | |
filename=logfile, | |
) | |
trackers = {} | |
if args.kbd or args.all: | |
trackers[TaskKeyboard.NAME] = TaskKeyboard() | |
if args.sound or args.all: | |
trackers[TaskSound.NAME] = TaskSound() | |
if args.power or args.all: | |
trackers[TaskPower.NAME] = TaskPower() | |
if args.net or args.all: | |
trackers[TaskNetwork.NAME] = TaskNetwork() | |
if args.time or args.all: | |
trackers[TaskTime.NAME] = TaskTime() | |
asyncio.run(task_main(trackers), debug=True) | |
logging.info('oktnxbye') | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment