Created
June 16, 2020 08:10
-
-
Save bojanpotocnik/2049e4914ea581d6814019992850743d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Various utility classes for working with BlueZ via D-Bus | |
For this to work the following most be executed (installed) on most machines: | |
1. `sudo apt install bluez bluez-tools` | |
2. `sudo apt install python3-dbus libgirepository1.0-dev python3-gi` | |
3. `pip install PyGObject` globally or in the virtual environment | |
""" | |
import ctypes | |
import enum | |
import multiprocessing | |
import os | |
import re | |
import time | |
import warnings | |
from pathlib import Path | |
from typing import Optional, Iterable, Tuple, List, Union, Dict, Any, Iterator | |
import dbus | |
import dbus.mainloop.glib | |
import dbus.service | |
try: | |
# noinspection PyUnresolvedReferences,PyPackageRequirements | |
from gi.repository import GLib | |
except ModuleNotFoundError: | |
raise ModuleNotFoundError(f"Read the docstring in {os.path.realpath(__file__)}") | |
class _Singleton(type): | |
# noinspection PyInitNewSignature | |
def __new__(mcs, class_name: str, parents: tuple, attributes: dict, instance_name: str) -> Any: | |
# Create the original class | |
cls = super().__new__(mcs, class_name, parents, attributes) | |
# Patch the __new__ method of the created class | |
original_new = cls.__new__ | |
def __singleton_new__(*args, **kwargs): | |
existing = globals().get(instance_name) | |
if existing: | |
warnings.warn(f"The existing instance of this class shall be used" | |
f", `{Path(__file__).stem}.bluez`", stacklevel=2) | |
return existing | |
return original_new(*args, **kwargs) | |
cls.__new__ = __singleton_new__ | |
return cls | |
class MainLoop(GLib.MainLoop): | |
pass | |
class BlueZ(metaclass=_Singleton, instance_name='bluez'): | |
""" | |
Utility class for interfacing BlueZ via D-Bus | |
This is singleton because it is important that all related operations get the same instance if dbus.Bus. | |
""" | |
SERVICE = "org.bluez" | |
class Rejected(dbus.DBusException): | |
_dbus_error_name = "org.bluez.Error.Rejected" | |
class Canceled(dbus.DBusException): | |
_dbus_error_name = "org.bluez.Error.Canceled" | |
# noinspection PyTypeChecker | |
def __init__(self): | |
# Most of the interfaces cannot be used before mainloop is set and initialized, | |
# therefore initialization cannot be done here, but lazily when used. | |
# Also do not set default loop here because user might set its own loop or do | |
# some special initialization later (note - this class is instantiated when | |
# this module is imported). | |
self._bus: dbus.Bus = None | |
# Used for manual mainloop iterations | |
self._mainloop_context: GLib.MainContext = None | |
@property | |
def bus(self) -> dbus.Bus: | |
if not self._bus: | |
# Implicitly initialize mainloop if not yet initialized. If user would need a reference or | |
# custom initialization, then the mainloop was most likely initialized prior to this call. | |
if not dbus.get_default_main_loop(): | |
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) | |
if not self._mainloop_context: | |
# noinspection PyUnresolvedReferences | |
self._mainloop_context = GLib.MainContext.default() | |
self._bus = dbus.SystemBus() | |
return self._bus | |
def object(self, path: str) -> dbus.proxies.ProxyObject: | |
return self.bus.get_object(self.SERVICE, path) | |
def interface(self, path: str, interface: str) -> dbus.Interface: | |
return dbus.Interface(self.object(path), interface) | |
@property | |
def interfaces(self) -> Dict[dbus.ObjectPath, dbus.Dictionary]: | |
""" | |
Return all interfaces of this service | |
:return: Dictionary {path: interfaces} | |
""" | |
return self.interface("/", "org.freedesktop.DBus.ObjectManager").GetManagedObjects() | |
def property(self, of: dbus.Interface, which: str, allow_missing: bool = False) -> Optional[Any]: | |
try: | |
return self.interface(of.object_path, "org.freedesktop.DBus.Properties").Get(of.INTERFACE, which) | |
except dbus.exceptions.DBusException as e: | |
if allow_missing and (e.get_dbus_message() == f"No such property '{which}'"): | |
return None | |
raise | |
def mainloop_iterate(self, timeout: Union[None, float], until_idle: bool = False) -> Iterator[float]: | |
""" | |
Run GLib.MainLoop | |
Some operations require running mainloop (e.g. BLE device scan/discovery) to operate, | |
but basic GLib.MainLoop.run() does not implement a timeout or check for arbitrary | |
conditions, which is frequently required. This method extends the default mainloop | |
operation in a form of iterator to enable additional checks on every loop. | |
:param timeout: If not None then the iterations will stop after this many seconds, | |
irrespective of the other parameters. If 0 then one iteration will be | |
performed, yielding -1 and iterations stopped. | |
:param until_idle: If True then stop the iterations as soon as none of the associated sources | |
have pending events (as soon as GLib.MainContext::pending() returns False), | |
even if timeout not reached. This is useful to execute some work queued for | |
execution in the main loop. It is usually False when waiting for some I/O | |
event to happen because if iterations are fast there can be a lot of idle | |
time - time with no events pending. | |
:return: Iterator yielding: | |
Positive value representing remaining time [seconds] until timeout, | |
-1 if timeout was reached (no time remaining), | |
0 if `timeout` was None and therefore looping forever, | |
-2 if `until_idle` was True and there was no pending events (ignoring `timeout`). | |
The iterator will return (will be exhausted) after yielding negative value. | |
""" | |
# One way to do it would be to run the mainloop in a separate thread: | |
# import threading | |
# | |
# finished = threading.Event() | |
# ... | |
# loop = GLib.MainLoop() | |
# thread = threading.Thread(target=loop.run, daemon=False) | |
# thread.start() | |
# # Non-busy wait for event, condition, timeout, ... | |
# finished.wait(timeout) | |
# loop.quit() | |
# thread.join() | |
# but there is not much sense in spawning another thread if we are then blocking here anyway. | |
# Manually perform iterations of the main loop as it would be done in GLib.MainLoop.run(). | |
end_time = (time.perf_counter() + timeout) if (timeout is not None) else None | |
while True: | |
# noinspection PyUnresolvedReferences | |
self._mainloop_context.iteration(False) | |
# Yield value of 0 means infinite iteration (no timeout) | |
ret = 0 | |
# If timeout is used then remaining time is yielded or -1 for timeout | |
if end_time: | |
ret = end_time - time.perf_counter() | |
if ret <= 0: | |
ret = -1 | |
# noinspection PyUnresolvedReferences | |
if until_idle and (not self._mainloop_context.pending()): | |
ret = -2 | |
yield ret | |
if ret < 0: | |
break | |
def mainloop_run(self, until_idle: bool = False, timeout: Optional[float] = None) -> int: | |
"""Blocking version of :meth:`mainloop_iterate`""" | |
ret = 0 | |
for ret in self.mainloop_iterate(timeout=timeout, until_idle=until_idle): | |
pass | |
return ret | |
bluez = BlueZ() | |
"""Single global instance of a BlueZ singleton class""" | |
class Device(dbus.Interface): | |
INTERFACE = BlueZ.SERVICE + ".Device1" | |
def __init__(self, path: str): | |
super().__init__(bluez.object(path), self.INTERFACE) | |
# This class can be instantiated many times (e.g. during scan) - do not register listener for every instance! | |
self._property_change_listener_added: bool = False | |
# States changed from PropertiesChanged listener | |
self._connected = multiprocessing.Value(ctypes.c_bool) | |
self._paired = multiprocessing.Value(ctypes.c_bool) | |
def __del__(self): | |
if self._property_change_listener_added: | |
bluez.bus.remove_signal_receiver(self._on_properties_changed) | |
self._property_change_listener_added = True | |
def _listen_to_property_changes(self) -> None: | |
if self._property_change_listener_added: | |
return | |
bluez.bus.add_signal_receiver(self._on_properties_changed, | |
dbus_interface="org.freedesktop.DBus.Properties", | |
signal_name="PropertiesChanged", | |
arg0=self.INTERFACE, | |
path_keyword="path") | |
self._property_change_listener_added = True | |
def _on_properties_changed(self, interface: str, changed_properties: Dict[str, GLib.Variant], | |
_invalidated_properties: List[str], path: dbus.ObjectPath): | |
if (interface != self.INTERFACE) or (path != self.object_path): | |
return | |
# This is called for every received advertisement packet and usually contains at least RSSI key. However if | |
# other values are present in the advertisement packet (e.g. ManufacturerData) they will also be reported as | |
# changed for every received packet - even if they are the same. Therefore only act on actually relevant | |
# changes. | |
if 'Connected' in changed_properties: | |
self._connected.value = changed_properties['Connected'] | |
print("Connected = ", self._connected.value) | |
del changed_properties['Connected'] | |
if 'Paired' in changed_properties: | |
self._paired.value = changed_properties['Paired'] | |
print("Paired = ", self._paired.value) | |
del changed_properties['Paired'] | |
if 'ServicesResolved' in changed_properties: | |
print("Resolved:", changed_properties['ServicesResolved']) | |
del changed_properties['ServicesResolved'] | |
try: | |
del changed_properties['RSSI'] | |
del changed_properties['ManufacturerData'] | |
except KeyError: | |
pass | |
if changed_properties: | |
print(changed_properties) | |
@property | |
def address(self) -> str: | |
return bluez.property(self, "Address") | |
@property | |
def name(self) -> Optional[str]: | |
return bluez.property(self, "Name", allow_missing=True) | |
@property | |
def rssi(self) -> Optional[int]: | |
return bluez.property(self, "RSSI", allow_missing=True) | |
@property | |
def tx_power(self) -> Optional[int]: | |
return bluez.property(self, "TxPower", allow_missing=True) | |
@property | |
def manufacturer_data(self) -> Dict[int, bytearray]: | |
mds_raw = bluez.property(self, "ManufacturerData", allow_missing=True) | |
if not mds_raw: | |
return {} | |
mds = {} | |
# Keys are 16 bits Manufacturer ID followed by its byte array value. | |
for md_id, md_value in mds_raw.items(): | |
mds[md_id] = bytearray(md_value) | |
return mds | |
@property | |
def connected(self) -> str: | |
return bluez.property(self, "Connected") | |
@property | |
def uuids(self) -> str: | |
return bluez.property(self, "UUIDs") | |
@property | |
def services(self) -> str: | |
return bluez.property(self, "UUIDs") | |
@property | |
def services_resolved(self) -> str: | |
return bluez.property(self, "ServicesResolved") | |
@property | |
def paired(self) -> str: | |
return bluez.property(self, "Paired") | |
def connect(self, timeout: float = 30, authentication_delay: Optional[float] = 1) -> Tuple[bool, Optional[str]]: | |
""" | |
Connect to BLE device | |
:param timeout: Timeout [s] for all operations. | |
:param authentication_delay: If this is not None, it is assumed that this device possibly has pairing | |
enabled and connection will be rejected if not paired. However even if | |
authentication is mandatory the devices will connect first - that's why | |
this method waits `authentication_delay` seconds before performing | |
check for connectable again. | |
:return: Success status, optional error code if success is False. | |
""" | |
finish = [] | |
self._listen_to_property_changes() | |
self.Connect( | |
reply_handler=lambda: finish.append("OK"), | |
error_handler=lambda error: finish.append(error.get_dbus_message()), | |
timeout=timeout | |
) | |
# First, wait for connect operation to finish | |
for _ in bluez.mainloop_iterate(timeout=timeout): | |
if finish: | |
break | |
if not finish: | |
return False, "Timeout" | |
if finish[0] != "OK": | |
return False, finish[0] | |
if not self.connected: | |
return False, "Unknown" | |
# Device can be connected here, but if pairing is mandatory it will be disconnected in few moments | |
if authentication_delay: | |
bluez.mainloop_run(timeout=authentication_delay) | |
if not self.connected: | |
return False, "Rejected" # Authentication Failed | |
return True, None | |
def pair(self, timeout: float = 100) -> Tuple[bool, Optional[str]]: | |
""" | |
This method will connect to the remote device, initiate pairing and then retrieve all SDP records | |
(or GATT primary services). If the application has registered its own agent, then that specific | |
agent will be used. Otherwise it will use the default agent. | |
This method will block until pairing finishes (successfully or with error) or timeout is reached. | |
Possible errors: org.bluez.Error.InvalidArguments | |
org.bluez.Error.Failed | |
org.bluez.Error.AlreadyExists | |
org.bluez.Error.AuthenticationCanceled | |
org.bluez.Error.AuthenticationFailed | |
org.bluez.Error.AuthenticationRejected | |
org.bluez.Error.AuthenticationTimeout | |
org.bluez.Error.ConnectionAttemptFailed | |
:param timeout: Pairing timeout in seconds. | |
:return: Success status, optional error code if success is False. | |
""" | |
# Even if blocking, do not use self.Pair() without reply and error handles as this also | |
# blocks the execution of the custom Agent methods (e.g. PasskeyRequest). This one | |
# call dbus.connection.call_async(...) internally. | |
reply = [] | |
self._listen_to_property_changes() | |
self.Pair( | |
reply_handler=lambda: reply.append("OK"), | |
error_handler=lambda error: reply.append(error.get_dbus_message()), | |
timeout=timeout | |
) | |
# First, wait for connect operation to finish | |
for _ in bluez.mainloop_iterate(timeout=timeout): | |
if reply: | |
break | |
if not reply: | |
return False, "Timeout" | |
if reply[0] != "OK": | |
return False, reply[0] | |
if not (self.connected and self.paired): | |
return False, "Unknown" | |
# Wait a bit more for device to disconnect if rejected | |
bluez.mainloop_run(timeout=1) | |
if not (self.connected and self.paired): | |
return False, "Rejected" | |
return True, None | |
class Adapter(dbus.Interface): | |
INTERFACE = BlueZ.SERVICE + ".Adapter1" | |
def __init__(self, pattern: str = None): | |
""" | |
Get the BlueZ Adapter interface | |
:param pattern: Path (e.g. "hci0" or just "0") or MAC address ("xx:xx:xx:xx:xx:xx") of the desired | |
adapter or None to use the firs one. | |
""" | |
obj = None | |
for path, interfaces in bluez.interfaces.items(): | |
adapter = interfaces.get(self.INTERFACE) | |
if not adapter: | |
continue | |
if pattern and (adapter["Address"].lower() != pattern.lower()) and (not path.endswith(pattern)): | |
continue | |
obj = bluez.object(path) | |
break | |
if not obj: | |
raise dbus.DBusException("Adapter " + (pattern + " " if pattern else "") + "not found") | |
super().__init__(obj, self.INTERFACE) | |
@property | |
def discovering(self) -> bool: | |
return bluez.property(self, "Discovering") | |
@discovering.setter | |
def discovering(self, enable: bool) -> None: | |
"""Start or stop discovering""" | |
try: | |
if enable: | |
self.StartDiscovery() | |
else: | |
self.StopDiscovery() | |
except dbus.exceptions.DBusException as e: | |
if enable and (e.get_dbus_name() == "org.bluez.Error.InProgress"): # Operation already in progress | |
return | |
elif enable and (e.get_dbus_name() == "org.bluez.Error.Failed"): # No discovery started | |
return | |
raise | |
@property | |
def devices(self) -> Iterable[Tuple[dbus.ObjectPath, dbus.Dictionary]]: | |
for path, interface in bluez.interfaces.items(): | |
if not path.startswith(self.object_path): | |
continue | |
device = interface.get(Device.INTERFACE) | |
if not device: | |
continue | |
yield path, device | |
def remove_device(self, device: Device) -> bool: | |
""" | |
This removes the remote device object and also it's the pairing information | |
:return: Whether the device was removed. | |
""" | |
try: | |
self.RemoveDevice(device.object_path) | |
return True | |
except dbus.exceptions.DBusException as e: | |
if e.get_dbus_name() == "org.bluez.Error.DoesNotExist": | |
return False | |
raise | |
def scan(self, uuid: Union[None, int, str] = None, | |
mfd: Union[None, int] = None, | |
rssi: Union[None, int] = None, | |
pattern: Union[None, str, re.Pattern] = None, | |
timeout: Union[None, float] = 30, | |
duplicates: bool = False) -> Iterator[Device]: | |
""" | |
Scan for BLE devices | |
:param uuid: Only report devices advertising specified service UUID. | |
:param mfd: Only report devices advertising manufacturer data longer or equal to specified number | |
of bytes (0 is valid, use None to disable this filter). | |
:param rssi: Only report devices with received RSSI higher than this threshold value. | |
:param pattern: String or regex pattern to match the device, tested in the following sequence: | |
- Device MAC address | |
- Device name, if advertised | |
String is compared with '==', compiled regex patter with fullmatch(). | |
:param timeout: Scan timeout in seconds or None for no timeout. | |
:param duplicates: Whether to report every advertisement packet (True) or only once for new devices (False). | |
:return: Iterator returning advertisement packets as Devices, exhausted after timeout. | |
""" | |
if not isinstance(pattern, (str, re.Pattern, type(None))): | |
raise TypeError(f"Pattern can only be string, compiled Regex pattern or None, not {type(pattern)}") | |
devices: Dict[str, Device] = {} | |
new_devices: List[str] = [] | |
def on_properties_changed(interface: str, _changed_properties: Dict[str, GLib.Variant], | |
_invalidated_properties: List[str], path: dbus.ObjectPath): | |
if interface != Device.INTERFACE: | |
return | |
if path in new_devices: | |
# D-Bus paths are unique (per bus) so if this device is still pending to be yielded it makes no | |
# sense to instantiate it again as it would only create another Python object using the exact | |
# identical device and its existing properties. | |
return | |
# Just get an actual DBus device instance instead of parsing the data (it is already parsed internally). | |
# If this device (path) is already cached do not just add it to new_devices again, check if it still | |
# passes trough all the filters. | |
dev = devices.get(path, Device(path)) | |
matches = True | |
# UUID and RSSI are already filtered by discovery filter | |
# Manufacturer Data | |
if matches and (mfd is not None): | |
dev_mfd = dev.manufacturer_data | |
if (dev_mfd is None) or all(len(v) < mfd for v in dev_mfd.values()): | |
matches = False | |
# Address/Name | |
if matches and (pattern is not None): | |
addr = dev.address | |
name = dev.name | |
if isinstance(pattern, str): | |
if (pattern != addr) and (pattern != name): | |
matches = False | |
elif isinstance(pattern, re.Pattern): | |
if (not pattern.fullmatch(addr)) and name and (not pattern.fullmatch(name)): | |
matches = False | |
if matches: | |
if path not in devices: | |
devices[path] = dev | |
new_devices.append(path) | |
elif duplicates: | |
new_devices.append(path) | |
elif path in devices: | |
del devices[path] | |
bluez.bus.add_signal_receiver(on_properties_changed, | |
dbus_interface="org.freedesktop.DBus.Properties", | |
signal_name="PropertiesChanged", | |
arg0=Device.INTERFACE, | |
path_keyword="path") | |
discovery_filter = { | |
# Only scan BLE devices | |
"Transport": 'le', | |
# Trigger PropertiesChanged signal for every received advertisement packet, use custom filter | |
# for duplicates as this one would not report devices already seen before on the system | |
"DuplicateData": True, | |
} | |
if uuid: | |
discovery_filter["UUIDs"] = [f"0x{uuid:04x}" if isinstance(uuid, int) else uuid] | |
if rssi is not None: | |
discovery_filter["RSSI"] = dbus.Int16(rssi) | |
self.SetDiscoveryFilter(discovery_filter) | |
# Note: GLib.MainLoop must run for discovery to work | |
self.discovering = True | |
# Spin a mainloop until iterator is destroyed or timeout reached | |
for _ in bluez.mainloop_iterate(timeout=timeout): | |
if new_devices: | |
try: | |
yield devices[new_devices.pop()] | |
except GeneratorExit: | |
break | |
bluez.bus.remove_signal_receiver(on_properties_changed) | |
# BlueZ on Ubuntu removes device when advertising is stopped if it does not advertise Flags | |
# with TX Power, because it cannot calculate RSSI to show it in the UI ('RSSI is nil'). But | |
# it shows the RSSI while discovering, so device.rssi cannot be checked here. | |
# Until better fix is found, leave discovery running in such case to prevent removal of devices. | |
# self.discovering = False | |
# # Spin a bit more to handle StopDiscovery properly | |
# bluez.mainloop_run(until_idle=True, timeout=5) | |
class AgentManager(dbus.Interface, metaclass=_Singleton, instance_name='agentManager'): | |
INTERFACE = BlueZ.SERVICE + ".AgentManager1" | |
def __init__(self): | |
super().__init__(bluez.object("/org/bluez"), self.INTERFACE) | |
def register_agent(self, agent: 'Agent', capability: 'Agent.Capability') -> None: | |
""" | |
This registers an agent handler. | |
The object path defines the path of the agent | |
that will be called when user input is needed. | |
Every application can register its own agent and | |
for all actions triggered by that application its | |
agent is used. | |
It is not required by an application to register | |
an agent. If an application does chooses to not | |
register an agent, the default agent is used. This | |
is on most cases a good idea. Only application | |
like a pairing wizard should register their own | |
agent. | |
An application can only register one agent. Multiple | |
agents per application is not supported. | |
The capability parameter can have the values | |
"DisplayOnly", "DisplayYesNo", "KeyboardOnly", | |
"NoInputNoOutput" and "KeyboardDisplay" which | |
reflects the input and output capabilities of the | |
agent. | |
If an empty string is used it will fallback to | |
"KeyboardDisplay". | |
Possible errors: org.bluez.Error.InvalidArguments | |
org.bluez.Error.AlreadyExists | |
""" | |
self.RegisterAgent(agent, capability.value) | |
def unregister_agent(self, agent: 'Agent') -> bool: | |
""" | |
This unregisters the agent that has been previously | |
registered. The object path parameter must match the | |
same value that has been used on registration. | |
Possible errors: org.bluez.Error.DoesNotExist | |
""" | |
try: | |
self.UnregisterAgent(agent) | |
return True | |
except dbus.exceptions.DBusException as e: | |
if e.get_dbus_name() != "org.bluez.Error.DoesNotExist": | |
raise | |
return False | |
agentManager = AgentManager() | |
"""Single global instance of a AgentManager singleton class""" | |
class Agent(dbus.service.Object): | |
INTERFACE = BlueZ.SERVICE + ".Agent1" | |
@enum.unique | |
class Capability(enum.Enum): | |
DISPLAY_ONLY = "DisplayOnly" | |
DISPLAY_YES_NO = "DisplayYesNo" | |
KEYBOARD_ONLY = "KeyboardOnly" | |
NO_IO = "NoInputNoOutput" | |
KEYBOARD_DISPLAY = "KeyboardDisplay" | |
def __init__(self, io_capabilities: Capability = Capability.KEYBOARD_DISPLAY, register: bool = True): | |
self.io_capabilities = io_capabilities | |
super().__init__(bluez.bus, "/test/agent", None) | |
self.passkey: Optional[int] = None | |
"""6-digit passkey for Passkey Entry pairing method, if supported""" | |
if register: | |
self.register() | |
def __del__(self): | |
self.unregister() | |
def register(self) -> None: | |
agentManager.register_agent(self, self.io_capabilities) | |
def unregister(self) -> None: | |
agentManager.unregister_agent(self) | |
# noinspection PyPep8Naming | |
@dbus.service.method(INTERFACE, in_signature="", out_signature="") | |
def Release(self) -> None: | |
""" | |
This method gets called when the service daemon | |
unregisters the agent. An agent can use it to do | |
cleanup tasks. There is no need to unregister the | |
agent, because when this method gets called it has | |
already been unregistered. | |
""" | |
print("Release()") | |
# Agent was released - eventually quit mainloop? | |
# noinspection PyPep8Naming | |
@dbus.service.method(INTERFACE, in_signature="o", out_signature="s") | |
def RequestPinCode(self, device: dbus.service.Object) -> dbus.String: | |
""" | |
This method gets called when the service daemon | |
needs to get the passkey for an authentication. | |
The return value should be a string of 1-16 characters | |
length. The string can be alphanumeric. | |
Possible errors: org.bluez.Error.Rejected | |
org.bluez.Error.Canceled | |
""" | |
pincode = "1234" | |
print(f"RequestPinCode({device})->{pincode}") | |
return dbus.String(pincode) | |
# noinspection PyPep8Naming | |
@dbus.service.method(INTERFACE, in_signature="os", out_signature="") | |
def DisplayPinCode(self, device: dbus.service.Object, pincode: dbus.String) -> None: | |
""" | |
This method gets called when the service daemon | |
needs to display a pincode for an authentication. | |
An empty reply should be returned. When the pincode | |
needs no longer to be displayed, the Cancel method | |
of the agent will be called. | |
This is used during the pairing process of keyboards | |
that don't support Bluetooth 2.1 Secure Simple Pairing, | |
in contrast to DisplayPasskey which is used for those | |
that do. | |
This method will only ever be called once since | |
older keyboards do not support typing notification. | |
Note that the PIN will always be a 6-digit number, | |
zero-padded to 6 digits. This is for harmony with | |
the later specification. | |
Possible errors: org.bluez.Error.Rejected | |
org.bluez.Error.Canceled | |
""" | |
print(f"DisplayPinCode({device}, pincode={pincode})") | |
# noinspection PyPep8Naming | |
@dbus.service.method(INTERFACE, in_signature="o", out_signature="u") | |
def RequestPasskey(self, device: dbus.service.Object) -> dbus.UInt32: | |
""" | |
This method gets called when the service daemon | |
needs to get the passkey for an authentication. | |
The return value should be a numeric value | |
between 0-999999. | |
Possible errors: org.bluez.Error.Rejected | |
org.bluez.Error.Canceled | |
""" | |
print(f"RequestPasskey({device})->{self.passkey}") | |
if self.passkey is None: | |
raise bluez.Rejected("Connection rejected by user") | |
return dbus.UInt32(self.passkey) | |
# noinspection PyPep8Naming | |
@dbus.service.method(INTERFACE, in_signature="ouq", out_signature="") | |
def DisplayPasskey(self, device: dbus.service.Object, passkey: dbus.UInt32, entered: dbus.UInt16) -> None: | |
""" | |
This method gets called when the service daemon | |
needs to display a passkey for an authentication. | |
The entered parameter indicates the number of already | |
typed keys on the remote side. | |
An empty reply should be returned. When the passkey | |
needs no longer to be displayed, the Cancel method | |
of the agent will be called. | |
During the pairing process this method might be | |
called multiple times to update the entered value. | |
Note that the passkey will always be a 6-digit number, | |
so the display should be zero-padded at the start if | |
the value contains less than 6 digits. | |
""" | |
print(f"DisplayPasskey({device}, passkey={passkey}, entered={entered})") | |
# noinspection PyPep8Naming | |
@dbus.service.method(INTERFACE, in_signature="ou", out_signature="") | |
def RequestConfirmation(self, device: dbus.service.Object, passkey: dbus.UInt32) -> None: | |
""" | |
This method gets called when the service daemon | |
needs to confirm a passkey for an authentication. | |
To confirm the value it should return an empty reply | |
or an error in case the passkey is invalid. | |
Note that the passkey will always be a 6-digit number, | |
so the display should be zero-padded at the start if | |
the value contains less than 6 digits. | |
Possible errors: org.bluez.Error.Rejected | |
org.bluez.Error.Canceled | |
""" | |
confirm = True | |
print(f"RequestConfirmation({device}, passkey={passkey})->{confirm}") | |
if not confirm: | |
raise bluez.Rejected("Passkey does not match") | |
# noinspection PyPep8Naming | |
@dbus.service.method(INTERFACE, in_signature="o", out_signature="") | |
def RequestAuthorization(self, device: dbus.service.Object) -> None: | |
""" | |
This method gets called to request the user to | |
authorize an incoming pairing attempt which | |
would in other circumstances trigger the just-works | |
model, or when the user plugged in a device that | |
implements cable pairing. In the latter case, the | |
device would not be connected to the adapter via | |
Bluetooth yet. | |
Possible errors: org.bluez.Error.Rejected | |
org.bluez.Error.Canceled | |
""" | |
authorize = True | |
print(f"RequestAuthorization({device})->{authorize}") | |
if not authorize: | |
raise bluez.Rejected("Pairing rejected") | |
# noinspection PyPep8Naming | |
@dbus.service.method(INTERFACE, in_signature="os", out_signature="") | |
def AuthorizeService(self, device: dbus.service.Object, uuid: dbus.String) -> None: | |
""" | |
This method gets called when the service daemon | |
needs to authorize a connection/service request. | |
Possible errors: org.bluez.Error.Rejected | |
org.bluez.Error.Canceled | |
""" | |
authorize = True | |
print(f"AuthorizeService({device}, uuid={uuid})->{authorize}") | |
if not authorize: | |
raise bluez.Rejected("Connection rejected by user") | |
# noinspection PyPep8Naming | |
@dbus.service.method(INTERFACE, in_signature="", out_signature="") | |
def Cancel(self) -> None: | |
""" | |
This method gets called to indicate that the agent | |
request failed before a reply was returned. | |
""" | |
print("Cancel()") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment