Skip to content

Instantly share code, notes, and snippets.

@infirit
Last active February 8, 2023 09:34
Show Gist options
  • Select an option

  • Save infirit/5de882fe27fe52a45d0af6a86ccbb102 to your computer and use it in GitHub Desktop.

Select an option

Save infirit/5de882fe27fe52a45d0af6a86ccbb102 to your computer and use it in GitHub Desktop.
gdbus_connection_proxy
from gi.repository import Gio, GLib, GObject
import logging
import random
class Base(Gio.DBusProxy):
connect_signal = GObject.GObject.connect
disconnect_signal = GObject.GObject.disconnect
__name = 'org.bluez'
__bus_type = Gio.BusType.SYSTEM
__gsignals__ = {
'property-changed': (GObject.SignalFlags.NO_HOOKS, None,
(GObject.TYPE_PYOBJECT, GObject.TYPE_PYOBJECT, GObject.TYPE_PYOBJECT))
}
def __init__(self, interface_name, obj_path, *args, **kwargs):
def finish(proxy, res):
proxy.init_finish(res)
self.__fallback = {'Icon': 'blueman', 'Class': 0, 'Appearance': 0}
self.__variant_map = {str: 's', int: 'u', bool: 'b'}
super(Base, self).__init__(
g_name=self.__name,
g_interface_name=interface_name,
g_object_path=obj_path,
g_bus_type=self.__bus_type,
# FIXME See issue 620
g_flags=Gio.DBusProxyFlags.GET_INVALIDATED_PROPERTIES,
*args, **kwargs)
self.init_async(GLib.PRIORITY_DEFAULT, None, finish)
def do_g_properties_changed(self, changed_properties, _invalidated_properties):
changed = changed_properties.unpack()
object_path = self.get_object_path()
logging.debug("%s %s" % (object_path, changed))
for key, value in changed.items():
self.emit("property-changed", key, value, object_path)
def _call(self, method, param=None, reply_handler=None, error_handler=None):
def callback(proxy, result, reply, error):
try:
value = proxy.call_finish(result).unpack()
if reply:
reply(*value)
else:
return value
except GLib.Error as e:
if error:
error(e)
else:
raise e
self.call(method, param, Gio.DBusCallFlags.NONE, GLib.MAXINT, None,
callback, reply_handler, error_handler)
def get(self, name):
try:
prop = self.call_sync(
'org.freedesktop.DBus.Properties.Get',
GLib.Variant('(ss)', (self._interface_name, name)),
Gio.DBusCallFlags.NONE,
GLib.MAXINT,
None)
return prop.unpack()[0]
except GLib.Error as e:
if name in self.get_cached_property_names():
return self.get_cached_property(name).unpack()
elif name in self.__fallback:
return self.__fallback[name]
else:
raise parse_dbus_error(e)
def set(self, name, value):
v = GLib.Variant(self.__variant_map[type(value)], value)
param = GLib.Variant('(ssv)', (self._interface_name, name, v))
self.call('org.freedesktop.DBus.Properties.Set',
param,
Gio.DBusCallFlags.NONE,
GLib.MAXINT,
None)
def get_properties(self):
param = GLib.Variant('(s)', (self._interface_name,))
res = self.call_sync('org.freedesktop.DBus.Properties.GetAll',
param,
Gio.DBusCallFlags.NONE,
GLib.MAXINT,
None)
props = res.unpack()[0]
for k, v in self.__fallback.items():
if k in props:
continue
else:
props[k] = v
return props
def __getitem__(self, key):
return self.get(key)
def __setitem__(self, key, value):
self.set(key, value)
def __contains__(self, key):
return key in self.get_properties()
class Adapter(Base):
_interface_name = 'org.bluez.Adapter1'
def __init__(self, obj_path=None):
super().__init__(self._interface_name, obj_path=obj_path)
def start_discovery(self):
self._call('StartDiscovery')
def stop_discovery(self):
self._call('StopDiscovery')
def remove_device(self, device):
param = GLib.Variant('(o)', (device.get_object_path(),))
self._call('RemoveDevice', param)
# FIXME in BlueZ 5.31 getting and setting Alias appears to never fail
def get_name(self):
if 'Alias' in self:
return self['Alias']
else:
return self['Name']
def set_name(self, name):
try:
return self.set('Alias', name)
except GLib.Error:
return self.set('Name', name)
def timeout(proxy):
print('All properties')
print(proxy.get_properties())
new_alias = 'gdbus test %s' % random.randint(0, 100)
print('Setting Alias to %s' % new_alias)
proxy['Alias'] = new_alias
alias = proxy['Alias']
print('New Alias is "%s"' % alias)
loop = GLib.MainLoop()
adapter = Adapter('/org/bluez/hci0')
GLib.timeout_add_seconds(2, timeout, adapter)
loop.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment