Last active
February 8, 2023 09:34
-
-
Save infirit/5de882fe27fe52a45d0af6a86ccbb102 to your computer and use it in GitHub Desktop.
gdbus_connection_proxy
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from gi.repository import Gio, GLib, GObject | |
| import logging | |
| import random | |
| class Base(Gio.DBusProxy): | |
| connect_signal = GObject.GObject.connect | |
| disconnect_signal = GObject.GObject.disconnect | |
| __name = 'org.bluez' | |
| __bus_type = Gio.BusType.SYSTEM | |
| __gsignals__ = { | |
| 'property-changed': (GObject.SignalFlags.NO_HOOKS, None, | |
| (GObject.TYPE_PYOBJECT, GObject.TYPE_PYOBJECT, GObject.TYPE_PYOBJECT)) | |
| } | |
| def __init__(self, interface_name, obj_path, *args, **kwargs): | |
| def finish(proxy, res): | |
| proxy.init_finish(res) | |
| self.__fallback = {'Icon': 'blueman', 'Class': 0, 'Appearance': 0} | |
| self.__variant_map = {str: 's', int: 'u', bool: 'b'} | |
| super(Base, self).__init__( | |
| g_name=self.__name, | |
| g_interface_name=interface_name, | |
| g_object_path=obj_path, | |
| g_bus_type=self.__bus_type, | |
| # FIXME See issue 620 | |
| g_flags=Gio.DBusProxyFlags.GET_INVALIDATED_PROPERTIES, | |
| *args, **kwargs) | |
| self.init_async(GLib.PRIORITY_DEFAULT, None, finish) | |
| def do_g_properties_changed(self, changed_properties, _invalidated_properties): | |
| changed = changed_properties.unpack() | |
| object_path = self.get_object_path() | |
| logging.debug("%s %s" % (object_path, changed)) | |
| for key, value in changed.items(): | |
| self.emit("property-changed", key, value, object_path) | |
| def _call(self, method, param=None, reply_handler=None, error_handler=None): | |
| def callback(proxy, result, reply, error): | |
| try: | |
| value = proxy.call_finish(result).unpack() | |
| if reply: | |
| reply(*value) | |
| else: | |
| return value | |
| except GLib.Error as e: | |
| if error: | |
| error(e) | |
| else: | |
| raise e | |
| self.call(method, param, Gio.DBusCallFlags.NONE, GLib.MAXINT, None, | |
| callback, reply_handler, error_handler) | |
| def get(self, name): | |
| try: | |
| prop = self.call_sync( | |
| 'org.freedesktop.DBus.Properties.Get', | |
| GLib.Variant('(ss)', (self._interface_name, name)), | |
| Gio.DBusCallFlags.NONE, | |
| GLib.MAXINT, | |
| None) | |
| return prop.unpack()[0] | |
| except GLib.Error as e: | |
| if name in self.get_cached_property_names(): | |
| return self.get_cached_property(name).unpack() | |
| elif name in self.__fallback: | |
| return self.__fallback[name] | |
| else: | |
| raise parse_dbus_error(e) | |
| def set(self, name, value): | |
| v = GLib.Variant(self.__variant_map[type(value)], value) | |
| param = GLib.Variant('(ssv)', (self._interface_name, name, v)) | |
| self.call('org.freedesktop.DBus.Properties.Set', | |
| param, | |
| Gio.DBusCallFlags.NONE, | |
| GLib.MAXINT, | |
| None) | |
| def get_properties(self): | |
| param = GLib.Variant('(s)', (self._interface_name,)) | |
| res = self.call_sync('org.freedesktop.DBus.Properties.GetAll', | |
| param, | |
| Gio.DBusCallFlags.NONE, | |
| GLib.MAXINT, | |
| None) | |
| props = res.unpack()[0] | |
| for k, v in self.__fallback.items(): | |
| if k in props: | |
| continue | |
| else: | |
| props[k] = v | |
| return props | |
| def __getitem__(self, key): | |
| return self.get(key) | |
| def __setitem__(self, key, value): | |
| self.set(key, value) | |
| def __contains__(self, key): | |
| return key in self.get_properties() | |
| class Adapter(Base): | |
| _interface_name = 'org.bluez.Adapter1' | |
| def __init__(self, obj_path=None): | |
| super().__init__(self._interface_name, obj_path=obj_path) | |
| def start_discovery(self): | |
| self._call('StartDiscovery') | |
| def stop_discovery(self): | |
| self._call('StopDiscovery') | |
| def remove_device(self, device): | |
| param = GLib.Variant('(o)', (device.get_object_path(),)) | |
| self._call('RemoveDevice', param) | |
| # FIXME in BlueZ 5.31 getting and setting Alias appears to never fail | |
| def get_name(self): | |
| if 'Alias' in self: | |
| return self['Alias'] | |
| else: | |
| return self['Name'] | |
| def set_name(self, name): | |
| try: | |
| return self.set('Alias', name) | |
| except GLib.Error: | |
| return self.set('Name', name) | |
| def timeout(proxy): | |
| print('All properties') | |
| print(proxy.get_properties()) | |
| new_alias = 'gdbus test %s' % random.randint(0, 100) | |
| print('Setting Alias to %s' % new_alias) | |
| proxy['Alias'] = new_alias | |
| alias = proxy['Alias'] | |
| print('New Alias is "%s"' % alias) | |
| loop = GLib.MainLoop() | |
| adapter = Adapter('/org/bluez/hci0') | |
| GLib.timeout_add_seconds(2, timeout, adapter) | |
| loop.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment