Skip to content

Instantly share code, notes, and snippets.

@snizovtsev
Created July 1, 2021 11:59
Show Gist options
  • Save snizovtsev/c49336265c6f20d36de756eecaf9ea29 to your computer and use it in GitHub Desktop.
Save snizovtsev/c49336265c6f20d36de756eecaf9ea29 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import mmap
import struct
import array
import socket
import time
import os
import ctypes
from pprint import pprint
from dataclasses import dataclass
import dbus
from gi.repository import GLib
from dbus.mainloop.glib import DBusGMainLoop
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
libc = ctypes.cdll.LoadLibrary("libc.so.6")
def eventfd(init_val, flags):
return libc.eventfd(init_val, flags)
def memfd_create(name, flags):
return libc.memfd_create(name, flags)
@dataclass
class BatteryState:
present: int = 0
status: int = 0
rate: int = 0xFFFFFFFF
energy: int = 0xFFFFFFFF
voltage: int = 0xFFFFFFFF
energy_design: int = 0xFFFFFFFF
energy_full: int = 0xFFFFFFFF
voltage_design: int = 0xFFFFFFFF
model: bytearray = b''
serial: bytearray = b''
vendor: bytearray = b''
def update_from_upower(self, props):
for name, value in props.items():
if name == 'State':
self.status = 1 if int(value) in [0,2,3,6] else 2
elif name == 'EnergyRate':
self.rate = round(value * 1000)
elif name == 'Energy':
self.energy = round(value * 1000)
elif name == 'Voltage':
self.voltage = round(value * 1000)
elif name == 'EnergyFullDesign':
self.energy_design = round(value * 1000)
elif name == 'EnergyFull':
self.energy_full = round(value * 1000)
elif name == 'Model':
self.model = bytearray(value, 'ascii')
elif name == 'Serial':
self.serial = bytearray(value, 'ascii')
elif name == 'Vendor':
self.vendor = bytearray(value, 'ascii')
class PeerHandler:
""" A resources of a connected peer """
def __init__(self):
mem_name = ctypes.c_char_p(b"shm")
self.tbl_battery = struct.Struct('<Bxxx4L3L32s32s32s')
self.mem = os.fdopen(memfd_create(mem_name, 0), 'w+b', buffering=0)
self.mem.truncate(4096)
self.irq = os.fdopen(eventfd(0, 0), 'ab', buffering=0)
self.doorbell = os.fdopen(eventfd(0, 0), 'rb', buffering=0)
self.shm = mmap.mmap(self.mem.fileno(), 0)
def interact(self, conn):
fdlist = array.array("i", [-1])
cmsg = (socket.SOL_SOCKET, socket.SCM_RIGHTS, fdlist)
fdlist[0] = self.mem.fileno()
conn.sendmsg([b'm'], [cmsg])
fdlist[0] = self.irq.fileno()
conn.sendmsg([b'i'], [cmsg])
fdlist[0] = self.doorbell.fileno()
conn.sendmsg([b'd'], [cmsg])
def notify(self):
c_one = ctypes.c_ulonglong(1)
self.irq.write(c_one)
def battery_update(self, state):
data = self.tbl_battery.pack(
state.present,
state.status,
state.rate,
state.energy,
state.voltage,
state.energy_design,
state.energy_full,
state.voltage_design,
state.model,
state.serial,
state.vendor
)
self.shm[:len(data)] = data
self.notify()
def changed(name, props, arr):
assert name == 'org.freedesktop.UPower.Device'
state.update_from_upower(props)
pprint(state)
peer.battery_update(state)
if __name__ == '__main__':
bus = dbus.SystemBus()
bat0 = bus.get_object('org.freedesktop.UPower',
'/org/freedesktop/UPower/devices/battery_BAT0')
bat0_dev = dbus.Interface(bat0,
dbus_interface='org.freedesktop.UPower.Device')
bat0_dev.Refresh()
properties = bat0.GetAll('org.freedesktop.UPower.Device',
dbus_interface='org.freedesktop.DBus.Properties')
state = BatteryState()
state.update_from_upower(properties)
state.present = 1
bus.add_signal_receiver(changed,
dbus_interface='org.freedesktop.DBus.Properties',
signal_name="PropertiesChanged",
path='/org/freedesktop/UPower/devices/battery_BAT0')
SOCKFILE = "state/acpi-helper.sock"
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.bind(SOCKFILE)
sock.listen(1)
try:
while True:
peer = PeerHandler()
peer.battery_update(state)
conn, addr = sock.accept()
with conn:
peer.interact(conn)
GLib.MainLoop().run()
finally:
sock.close()
os.unlink(SOCKFILE)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment