Created
July 1, 2021 11:59
-
-
Save snizovtsev/c49336265c6f20d36de756eecaf9ea29 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import mmap | |
import struct | |
import array | |
import socket | |
import time | |
import os | |
import ctypes | |
from pprint import pprint | |
from dataclasses import dataclass | |
import dbus | |
from gi.repository import GLib | |
from dbus.mainloop.glib import DBusGMainLoop | |
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) | |
libc = ctypes.cdll.LoadLibrary("libc.so.6") | |
def eventfd(init_val, flags): | |
return libc.eventfd(init_val, flags) | |
def memfd_create(name, flags): | |
return libc.memfd_create(name, flags) | |
@dataclass | |
class BatteryState: | |
present: int = 0 | |
status: int = 0 | |
rate: int = 0xFFFFFFFF | |
energy: int = 0xFFFFFFFF | |
voltage: int = 0xFFFFFFFF | |
energy_design: int = 0xFFFFFFFF | |
energy_full: int = 0xFFFFFFFF | |
voltage_design: int = 0xFFFFFFFF | |
model: bytearray = b'' | |
serial: bytearray = b'' | |
vendor: bytearray = b'' | |
def update_from_upower(self, props): | |
for name, value in props.items(): | |
if name == 'State': | |
self.status = 1 if int(value) in [0,2,3,6] else 2 | |
elif name == 'EnergyRate': | |
self.rate = round(value * 1000) | |
elif name == 'Energy': | |
self.energy = round(value * 1000) | |
elif name == 'Voltage': | |
self.voltage = round(value * 1000) | |
elif name == 'EnergyFullDesign': | |
self.energy_design = round(value * 1000) | |
elif name == 'EnergyFull': | |
self.energy_full = round(value * 1000) | |
elif name == 'Model': | |
self.model = bytearray(value, 'ascii') | |
elif name == 'Serial': | |
self.serial = bytearray(value, 'ascii') | |
elif name == 'Vendor': | |
self.vendor = bytearray(value, 'ascii') | |
class PeerHandler: | |
""" A resources of a connected peer """ | |
def __init__(self): | |
mem_name = ctypes.c_char_p(b"shm") | |
self.tbl_battery = struct.Struct('<Bxxx4L3L32s32s32s') | |
self.mem = os.fdopen(memfd_create(mem_name, 0), 'w+b', buffering=0) | |
self.mem.truncate(4096) | |
self.irq = os.fdopen(eventfd(0, 0), 'ab', buffering=0) | |
self.doorbell = os.fdopen(eventfd(0, 0), 'rb', buffering=0) | |
self.shm = mmap.mmap(self.mem.fileno(), 0) | |
def interact(self, conn): | |
fdlist = array.array("i", [-1]) | |
cmsg = (socket.SOL_SOCKET, socket.SCM_RIGHTS, fdlist) | |
fdlist[0] = self.mem.fileno() | |
conn.sendmsg([b'm'], [cmsg]) | |
fdlist[0] = self.irq.fileno() | |
conn.sendmsg([b'i'], [cmsg]) | |
fdlist[0] = self.doorbell.fileno() | |
conn.sendmsg([b'd'], [cmsg]) | |
def notify(self): | |
c_one = ctypes.c_ulonglong(1) | |
self.irq.write(c_one) | |
def battery_update(self, state): | |
data = self.tbl_battery.pack( | |
state.present, | |
state.status, | |
state.rate, | |
state.energy, | |
state.voltage, | |
state.energy_design, | |
state.energy_full, | |
state.voltage_design, | |
state.model, | |
state.serial, | |
state.vendor | |
) | |
self.shm[:len(data)] = data | |
self.notify() | |
def changed(name, props, arr): | |
assert name == 'org.freedesktop.UPower.Device' | |
state.update_from_upower(props) | |
pprint(state) | |
peer.battery_update(state) | |
if __name__ == '__main__': | |
bus = dbus.SystemBus() | |
bat0 = bus.get_object('org.freedesktop.UPower', | |
'/org/freedesktop/UPower/devices/battery_BAT0') | |
bat0_dev = dbus.Interface(bat0, | |
dbus_interface='org.freedesktop.UPower.Device') | |
bat0_dev.Refresh() | |
properties = bat0.GetAll('org.freedesktop.UPower.Device', | |
dbus_interface='org.freedesktop.DBus.Properties') | |
state = BatteryState() | |
state.update_from_upower(properties) | |
state.present = 1 | |
bus.add_signal_receiver(changed, | |
dbus_interface='org.freedesktop.DBus.Properties', | |
signal_name="PropertiesChanged", | |
path='/org/freedesktop/UPower/devices/battery_BAT0') | |
SOCKFILE = "state/acpi-helper.sock" | |
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | |
sock.bind(SOCKFILE) | |
sock.listen(1) | |
try: | |
while True: | |
peer = PeerHandler() | |
peer.battery_update(state) | |
conn, addr = sock.accept() | |
with conn: | |
peer.interact(conn) | |
GLib.MainLoop().run() | |
finally: | |
sock.close() | |
os.unlink(SOCKFILE) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment