Created
June 25, 2019 23:16
-
-
Save dlech/71344ceb5aa66862ad8932064bfe1a22 to your computer and use it in GitHub Desktop.
MicroPython bindings for libudev
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# SPDX-License-Identifier: MIT | |
# Copyright (c) 2019 David Lechner <[email protected]> | |
import ffi | |
from uerrno import ENOENT | |
_udev = ffi.open('libudev.so.1') | |
_libc = ffi.open('libc.so.6') | |
_errno = _libc.var("i", "errno") | |
class Udev(): | |
_new = _udev.func('p', 'udev_new', '') | |
_unref = _udev.func('p', 'udev_unref', 'p') | |
def __init__(self): | |
self._handle = Udev._new() | |
if not self._handle: | |
raise OSError(_errno.get()) | |
# must be called manually! Use context manager! | |
def __del__(self): | |
if self._handle: | |
self._handle = Udev._unref(self._handle) | |
def __enter__(self): | |
return self | |
def __exit__(self, *args): | |
self.__del__() | |
def scan(self, subsystem, **kwargs): | |
"""Scans for matching devices. | |
Parameters | |
---------- | |
subsystem : string | |
The subsystem (name of class or bus) to scan. | |
**kwargs | |
Property name/value pairs to match. | |
Yields | |
------ | |
Device | |
The matching devices. | |
""" | |
with Enumerate(self) as e: | |
e.add_match_subsystem(subsystem) | |
for k, v in kwargs.items(): | |
e.add_match_property(k, v) | |
try: | |
e.scan_devices() | |
except OSError as err: | |
# not sure why we get ENOENT even on success | |
if err.args[0] != ENOENT: | |
raise err | |
# NB: don't capture e in closure since it is freed by context | |
# manager when we return from this method | |
paths = list(e.list()) | |
def iter(): | |
for p in paths: | |
yield Device.from_syspath(self, p) | |
return iter() | |
class Enumerate(): | |
_new = _udev.func('p', 'udev_enumerate_new', 'p') | |
_unref = _udev.func('p', 'udev_enumerate_unref', 'p') | |
_add_match_subsystem = _udev.func('p', 'udev_enumerate_add_match_subsystem', 'ps') | |
_add_match_property = _udev.func('i', 'udev_enumerate_add_match_property', 'pss') | |
_scan_devices = _udev.func('i', 'udev_enumerate_scan_devices', 'p') | |
_get_list_entry = _udev.func('p', 'udev_enumerate_get_list_entry', 'p') | |
def __init__(self, udev): | |
self._handle = Enumerate._new(udev._handle) | |
if not self._handle: | |
raise OSError(_errno.get()) | |
# must be called manually! Use context manager! | |
def __del__(self): | |
if self._handle: | |
self._handle = Enumerate._unref(self._handle) | |
def __enter__(self): | |
return self | |
def __exit__(self, *args): | |
self.__del__() | |
def add_match_subsystem(self, subsystem): | |
err = Enumerate._add_match_subsystem(self._handle, subsystem) | |
if err < 0: | |
raise OSError(-err) | |
def add_match_property(self, name, value): | |
err = Enumerate._add_match_property(self._handle, name, value) | |
if err < 0: | |
raise OSError(-err) | |
def scan_devices(self): | |
err = Enumerate._scan_devices(self._handle) | |
if err < 0: | |
raise OSError(-err) | |
def list(self): | |
handle = Enumerate._get_list_entry(self._handle) | |
if not handle: | |
raise OSError(_errno.get()) | |
def iter(entry): | |
while entry: | |
yield entry.name | |
entry = entry.next | |
return iter(ListEntry(handle)) | |
class Device(): | |
_new_from_syspath = _udev.func('p', 'udev_device_new_from_syspath', 'ps') | |
_unref = _udev.func('p', 'udev_device_unref', 'p') | |
_get_devnode = _udev.func('s', 'udev_device_get_devnode', 'p') | |
@staticmethod | |
def from_syspath(udev, syspath): | |
handle = Device._new_from_syspath(udev._handle, syspath) | |
if not handle: | |
raise OSError(_errno.get()) | |
return Device(handle) | |
def __init__(self, handle): | |
self._handle = handle | |
# must be called manually! Use context manager! | |
def __del__(self): | |
if self._handle: | |
self._handle = Device._unref(self._handle) | |
def __enter__(self): | |
return self | |
def __exit__(self, *args): | |
self.__del__() | |
@property | |
def devnode(self): | |
return Device._get_devnode(self._handle) | |
class Monitor(): | |
_new_from_netlink = _udev.func('p', 'udev_monitor_new_from_netlink', 'ps') | |
_unref = _udev.func('p', 'udev_monitor_unref', 'p') | |
def __init__(self, udev): | |
self._handle = Monitor._new_from_netlink(udev._handle, 'udev') | |
if not self._handle: | |
raise OSError(_errno.get()) | |
# must be called manually! Use context manager! | |
def __del__(self): | |
if self._handle: | |
self._handle = Monitor._unref(self._handle) | |
def __enter__(self): | |
return self | |
def __exit__(self, *args): | |
self.__del__() | |
class ListEntry(): | |
_get_next = _udev.func('p', 'udev_list_entry_get_next', 'p') | |
_get_name = _udev.func('s', 'udev_list_entry_get_name', 'p') | |
_get_value = _udev.func('s', 'udev_list_entry_get_value', 'p') | |
def __init__(self, handle): | |
self._handle = handle | |
@property | |
def next(self): | |
handle = ListEntry._get_next(self._handle) | |
return ListEntry(handle) if handle else None | |
@property | |
def name(self): | |
return ListEntry._get_name(self._handle) | |
@property | |
def value(self): | |
return ListEntry._get_value(self._handle) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment