Skip to content

Instantly share code, notes, and snippets.

@r0x0d
Created June 2, 2025 19:22
Show Gist options
  • Save r0x0d/0ceb293d4ee4881171777cb8228f6fbd to your computer and use it in GitHub Desktop.
Save r0x0d/0ceb293d4ee4881171777cb8228f6fbd to your computer and use it in GitHub Desktop.
Server and client example from dasbus lib
#
# Reply to a message in the chat room.
# Start the server, start the listener and run the client.
#
import threading
from dasbus.loop import EventLoop
from common import CHAT
event_stop = threading.Event()
def callback(event, msg):
"""The callback of the DBus signal MessageReceived."""
print("received %s" % msg)
exit(0)
def t(proxy, event: threading.Event):
proxy.MessageReceived.connect(lambda msg: callback(event, msg))
if __name__ == "__main__":
# Create a proxy of the object /org/example/Chat
# provided by the service org.example.Chat.
chat_proxy = CHAT.get_proxy()
# Find a chat room to monitor.
object_path = chat_proxy.FindRoom("Bob's room")
# Create a proxy of the object /org/example/Chat/Rooms/1
# provided by the service org.example.Chat.
room_proxy = CHAT.get_proxy(object_path)
a = threading.Thread(target=t, args=(room_proxy,event_stop))
a.start()
room_proxy.SendMessage("hi?")
a.join()
# Start the event loop.
loop = EventLoop()
loop.run()
#
# Run the service org.example.Chat.
#
import os
import threading
import time
from dasbus.loop import EventLoop
from dasbus.server.interface import dbus_interface, dbus_signal
from dasbus.server.publishable import Publishable
from dasbus.server.template import InterfaceTemplate
from dasbus.signal import Signal
from dasbus.typing import Str, ObjPath
from dasbus.xml import XMLGenerator
from common import SESSION_BUS, CHAT, ROOM, ROOM_CONTAINER
@dbus_interface(ROOM.interface_name)
class RoomInterface(InterfaceTemplate):
"""The DBus interface of the chat room."""
def connect_signals(self):
"""Connect the signals."""
self.implementation.message_received.connect(self.MessageReceived)
@dbus_signal
def MessageReceived(self, msg: Str):
"""Signal that a message has been received."""
pass
def SendMessage(self, msg: Str):
"""Send a message to the chat room."""
t = threading.Thread(target=self.implementation.send_message, args=(msg,))
print(f"Starting a new thread from {os.getpid()} {t.name}")
t.start()
return True
class Room(Publishable):
"""The implementation of the chat room."""
def __init__(self, name):
self._name = name
self._message_received = Signal()
def for_publication(self):
"""Return a DBus representation."""
return RoomInterface(self)
@property
def message_received(self):
"""Signal that a message has been received."""
return self._message_received
def send_message(self, msg):
"""Send a message to the chat room."""
print("{}: {}".format(self._name, msg))
time.sleep(8)
print(f"Emitting stuff from {os.getpid()}")
self.message_received.emit(msg)
@dbus_interface(CHAT.interface_name)
class ChatInterface(InterfaceTemplate):
"""The DBus interface of the chat service."""
def FindRoom(self, name: Str) -> ObjPath:
"""Find or create a chat room."""
return ROOM_CONTAINER.to_object_path(
self.implementation.find_room(name)
)
class Chat(Publishable):
"""The implementation of the chat."""
def __init__(self):
self._rooms = {}
def for_publication(self):
"""Return a DBus representation."""
return ChatInterface(self)
def find_room(self, name):
"""Find or create a chat room."""
if name not in self._rooms:
self._rooms[name] = Room(name)
return self._rooms[name]
if __name__ == "__main__":
# Print the generated XML specifications.
print(XMLGenerator.prettify_xml(ChatInterface.__dbus_xml__))
print(XMLGenerator.prettify_xml(RoomInterface.__dbus_xml__))
try:
# Create the chat.
chat = Chat()
# Publish the chat at /org/example/Chat.
SESSION_BUS.publish_object(CHAT.object_path, chat.for_publication())
# Register the service name org.example.Chat.
SESSION_BUS.register_service(CHAT.service_name)
# Start the event loop.
loop = EventLoop()
loop.run()
finally:
# Unregister the DBus service and objects.
SESSION_BUS.disconnect()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment