Skip to content

Instantly share code, notes, and snippets.

@vladiant
Created August 14, 2021 20:57
Show Gist options
  • Save vladiant/2d2f2f8eec14fbc7e2d68156e24e92b6 to your computer and use it in GitHub Desktop.
Save vladiant/2d2f2f8eec14fbc7e2d68156e24e92b6 to your computer and use it in GitHub Desktop.
Python DBus
#!/usr/bin/env python3
import dbus
class Client:
def __init__(self):
bus = dbus.SessionBus()
service = bus.get_object("com.example.service", "/com/example/service")
self._message = service.get_dbus_method(
"get_message", "com.example.service.Message"
)
self._quit = service.get_dbus_method("quit", "com.example.service.Quit")
# Introspectable interfaces define a property 'Introspect' that
# will return an XML string that describes the object's interface
introspection_interface = dbus.Interface(service, dbus.INTROSPECTABLE_IFACE)
interface = introspection_interface.Introspect()
print(interface)
def run(self):
print("Mesage from service:", self._message())
self._quit()
if __name__ == "__main__":
Client().run()
#!/usr/bin/env python3
import dbus
import dbus.service
import dbus.mainloop.glib
from gi.repository import GLib
class Service(dbus.service.Object):
def __init__(self, message):
self._message = message
def run(self):
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus_name = dbus.service.BusName("com.example.service", dbus.SessionBus())
dbus.service.Object.__init__(self, bus_name, "/com/example/service")
self._loop = GLib.MainLoop()
print("Service running...")
self._loop.run()
print("Service stopped")
@dbus.service.method(
"com.example.service.Message", in_signature="", out_signature="s"
)
def get_message(self):
print(" sending message")
return self._message
@dbus.service.method("com.example.service.Quit", in_signature="", out_signature="")
def quit(self):
print(" shutting down")
self._loop.quit()
if __name__ == "__main__":
Service("This is the service").run()
#!/usr/bin/env python3
import dbus
import xml.etree.ElementTree as ET
# http://pythonhosted.org/txdbus/dbus_overview.html
char_to_simple_data_type_map = {
"y": ("UInt8", dbus.Byte),
"b": ("Boolean", dbus.Boolean),
"n": ("Int16", dbus.Int16),
"q": ("UInt16", dbus.UInt16),
"i": ("Int32", dbus.Int32),
"u": ("UInt32", dbus.UInt32),
"x": ("Int64", dbus.Int64),
"t": ("UInt64", dbus.UInt64),
"d": ("Double", dbus.Double),
"s": ("String", dbus.String),
"o": ("D-Bus Object Path String", dbus.String),
"g": ("D-Bus Signature String", dbus.String),
}
def code_to_simple_data_type(code_symbol):
if not isinstance(code_symbol, str):
raise Exception("Type %s should be str", code_symbol)
code_symbol_length = len(code_symbol)
if code_symbol_length > 1:
res = "("
delimiter_count = code_symbol_length - 1
for single_code_symbol in code_symbol:
res = res + code_to_simple_data_type(single_code_symbol)
if delimiter_count > 0:
res = res + ", "
delimiter_count = delimiter_count - 1
res = res + ")"
return res
if code_symbol in char_to_simple_data_type_map:
return char_to_simple_data_type_map[code_symbol][0]
else:
raise Exception("Type %s isn't convertable", code_symbol)
def process_arguments(object_args):
if "name" in object_args.attrib:
print("Name: ", object_args.attrib["name"])
if "type" in object_args.attrib:
print("Type: ", code_to_simple_data_type(object_args.attrib["type"]))
if "direction" in object_args.attrib:
print("Direction: ", object_args.attrib["direction"])
def process_method(object_method):
if "method" != object_method.tag or "name" not in object_method.attrib:
return
print(object_method.attrib["name"])
for current_args in object_method:
process_arguments(current_args)
print()
def process_interface(object_interface):
if "interface" != object_interface.tag or "name" not in object_interface.attrib:
return
print(object_interface.attrib["name"])
if (
"com.clarion.projectionmanager.sdl.SDLControl"
!= object_interface.attrib["name"]
):
return
for current_method in object_interface:
process_method(current_method)
def process_interfaces(interface_root):
for object_interface in interface_root:
process_interface(object_interface)
class Client:
def __init__(self):
bus = dbus.SessionBus()
service = bus.get_object("com.example.service", "/com/example/service")
self._message = service.get_dbus_method(
"get_message", "com.example.service.Message"
)
self._quit = service.get_dbus_method("quit", "com.example.service.Quit")
# Start introspection
service_interface_introspection = dbus.Interface(
service, dbus.INTROSPECTABLE_IFACE
)
# Introspectable interfaces define a property 'Introspect' that
# will return an XML string that describes the object's interface
remote_object_interfaces = service_interface_introspection.Introspect()
remote_object_interface_root = ET.fromstring(remote_object_interfaces)
if "name" in remote_object_interface_root.attrib:
print(remote_object_interface_root.attrib["name"])
process_interfaces(remote_object_interface_root)
def run(self):
print("Mesage from service:", self._message())
self._quit()
if __name__ == "__main__":
Client().run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment