Created
September 20, 2024 11:59
-
-
Save arkq/a8fdba8494ef9286f0aed381d086c087 to your computer and use it in GitHub Desktop.
GATT client/server for Matter BLE
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # | |
| # Copyright 2024 Arkadiusz Bokowy | |
| # | |
| # Simple GATT client/server which emulates Matter device. This application can | |
| # be used to test Matter devices and their interaction with the BLE. E.g., to | |
| # test the Matter commissioning process or to search for potential security | |
| # vulnerabilities in the Matter BLE stack. | |
| import logging | |
| import os.path | |
| from argparse import ArgumentParser | |
| from gi.repository import Gio, GLib | |
| class DBusObject: | |
| """Helper class to simplify DBus object registration.""" | |
| INTERFACE = None | |
| def __init__(self, path): | |
| self.bus = Gio.bus_get_sync(Gio.BusType.SYSTEM, None) | |
| self.bus.register_object( | |
| path, self.INTERFACE, self.on_method_call, self.get_property | |
| ) | |
| self.path = path | |
| def on_method_call(self, conn, sender, path, iface, name, params, inv): | |
| logging.debug("Called: %s.%s", iface, name) | |
| getattr(self, f"_{name}")(params, inv) | |
| def get_property(self, conn, sender, path, iface, name): | |
| logging.debug("Get property: %s.%s", iface, name) | |
| return getattr(self, f"_{name}") | |
| def get_properties(self): | |
| properties = {} | |
| for prop in self.INTERFACE.properties: | |
| properties[prop.name] = self.get_property( | |
| None, None, None, self.INTERFACE.name, prop.name | |
| ) | |
| return properties | |
| def call(self, service, path, iface, method, params, callback): | |
| self.bus.call(service, path, iface, method, params, | |
| None, 0, -1, None, callback) | |
| class Advertisement(DBusObject): | |
| """LE advertisement for Matter device.""" | |
| INTERFACE = Gio.DBusNodeInfo.new_for_xml(r""" | |
| <node> | |
| <interface name="org.bluez.LEAdvertisement1"> | |
| <method name="Release" /> | |
| <property name="Type" type="s" access="read" /> | |
| <property name="ServiceUUIDs" type="as" access="read" /> | |
| <property name="ServiceData" type="a{sv}" access="read" /> | |
| <property name="Discoverable" type="b" access="read" /> | |
| <property name="Includes" type="as" access="read" /> | |
| <property name="LocalName" type="s" access="read" /> | |
| </interface> | |
| </node> | |
| """).interfaces[0] | |
| @property | |
| def _Type(self): | |
| return GLib.Variant.new_string("peripheral") | |
| @property | |
| def _ServiceUUIDs(self): | |
| return GLib.Variant.new_strv(["0xFFF6"]) | |
| @property | |
| def _ServiceData(self): | |
| data = [0x00, 0x00, 0x0F, 0xF1, 0xFF, 0x01, 0x80, 0x00] | |
| props = GLib.VariantDict.new() | |
| props.insert_value( | |
| "0xFFF6", | |
| GLib.Variant.new_array( | |
| GLib.VariantType("y"), | |
| list(map(GLib.Variant.new_byte, data)), | |
| ), | |
| ) | |
| return props.end() | |
| @property | |
| def _Discoverable(self): | |
| return GLib.Variant.new_boolean(True) | |
| @property | |
| def _Includes(self): | |
| return GLib.Variant.new_strv([]) | |
| @property | |
| def _LocalName(self): | |
| return GLib.Variant.new_string(self.name) | |
| def __init__(self, adapter, path, name): | |
| super().__init__(path) | |
| self.adapter = adapter | |
| self.name = name | |
| def on_register_ready(self, conn, result): | |
| logging.info("Advertisement registered") | |
| conn.call_finish(result) | |
| def register(self): | |
| self.call( | |
| "org.bluez", | |
| f"/org/bluez/{self.adapter}", | |
| "org.bluez.LEAdvertisingManager1", | |
| "RegisterAdvertisement", | |
| GLib.Variant("(oa{sv})", (self.path, {})), | |
| self.on_register_ready, | |
| ) | |
| class GattService(DBusObject): | |
| """GATT service object.""" | |
| INTERFACE = Gio.DBusNodeInfo.new_for_xml(r""" | |
| <node> | |
| <interface name="org.bluez.GattService1"> | |
| <property name="UUID" type="s" access="read" /> | |
| <property name="Primary" type="b" access="read" /> | |
| </interface> | |
| </node> | |
| """).interfaces[0] | |
| @property | |
| def _UUID(self): | |
| return GLib.Variant.new_string(self.uuid) | |
| @property | |
| def _Primary(self): | |
| return GLib.Variant.new_boolean(True) | |
| def __init__(self, path, uuid): | |
| super().__init__(path) | |
| self.uuid = uuid | |
| class GattCharacteristic(DBusObject): | |
| """GATT characteristic object.""" | |
| INTERFACE = Gio.DBusNodeInfo.new_for_xml(r""" | |
| <node> | |
| <interface name="org.bluez.GattCharacteristic1"> | |
| <method name="ReadValue"> | |
| <arg name="options" type="a{sv}" direction="in"/> | |
| <arg name="value" type="ay" direction="out" /> | |
| </method> | |
| <method name="WriteValue"> | |
| <arg name="value" type="ay" direction="in" /> | |
| <arg name="options" type="a{sv}" direction="in"/> | |
| </method> | |
| <method name="StartNotify" /> | |
| <method name="StopNotify" /> | |
| <method name="Confirm" /> | |
| <property name="UUID" type="s" access="read" /> | |
| <property name="Service" type="o" access="read" /> | |
| <property name="Value" type="ay" access="read" /> | |
| <property name="Flags" type="as" access="read" /> | |
| </interface> | |
| </node> | |
| """).interfaces[0] | |
| @property | |
| def _UUID(self): | |
| return GLib.Variant.new_string(self.uuid) | |
| @property | |
| def _Service(self): | |
| return GLib.Variant.new_object_path(self.service.path) | |
| @property | |
| def _Value(self): | |
| return GLib.Variant.new_fixed_array(GLib.VariantType("y"), None, 0, 1) | |
| @property | |
| def _Flags(self): | |
| return GLib.Variant.new_strv(self.flags) | |
| def _WriteValue(self, params, inv, *argc, **kw): | |
| logging.info("Write value: %s", params.get_child_value(0)) | |
| logging.info("Write options: %s", params.get_child_value(1)) | |
| def __init__(self, service, path, uuid, flags: list): | |
| super().__init__(path) | |
| self.service = service | |
| self.uuid = uuid | |
| self.flags = flags | |
| class Application(DBusObject): | |
| """GATT application for Matter device.""" | |
| INTERFACE = Gio.DBusNodeInfo.new_for_xml(r""" | |
| <node> | |
| <interface name="org.freedesktop.DBus.ObjectManager"> | |
| <method name="GetManagedObjects"> | |
| <arg type="a{oa{sa{sv}}}" direction="out" /> | |
| </method> | |
| </interface> | |
| </node> | |
| """).interfaces[0] | |
| def _GetManagedObjects(self, params, inv): | |
| objects = GLib.Variant( | |
| "a{oa{sa{sv}}}", | |
| { | |
| self.service.path: { | |
| "org.bluez.GattService1": self.service.get_properties() | |
| }, | |
| self.c1.path: { | |
| "org.bluez.GattCharacteristic1": self.c1.get_properties() | |
| }, | |
| self.c2.path: { | |
| "org.bluez.GattCharacteristic1": self.c2.get_properties() | |
| }, | |
| }, | |
| ) | |
| inv.return_value(GLib.Variant.new_tuple(objects)) | |
| def __init__(self, adapter, path, service_uuid, | |
| c1_uuid, c1_flags, c2_uuid, c2_flags): | |
| super().__init__(path) | |
| self.service = GattService(os.path.join(path, "service"), service_uuid) | |
| self.c1 = GattCharacteristic(self.service, os.path.join(path, "c1"), | |
| c1_uuid, c1_flags) | |
| self.c2 = GattCharacteristic(self.service, os.path.join(path, "c2"), | |
| c2_uuid, c2_flags) | |
| self.adapter = adapter | |
| def on_register_ready(self, conn, result): | |
| logging.info("GATT application registered") | |
| conn.call_finish(result) | |
| def register(self): | |
| self.call( | |
| "org.bluez", | |
| f"/org/bluez/{self.adapter}", | |
| "org.bluez.GattManager1", | |
| "RegisterApplication", | |
| GLib.Variant("(oa{sv})", (self.path, {})), | |
| self.on_register_ready, | |
| ) | |
| parser = ArgumentParser(description="GATT client/server for Matter BLE") | |
| parser.add_argument( | |
| "--adapter", | |
| default="hci0", | |
| help="set the adapter to use; default is '%(default)s'", | |
| ) | |
| parser.add_argument( | |
| "--role", | |
| choices=("central", "peripheral"), | |
| default="central", | |
| help="set the role of the device; default is '%(default)s'", | |
| ) | |
| parser.add_argument( | |
| "--advertise", | |
| default="PyMatter", | |
| help="advertise the device with a given name; default is '%(default)s'", | |
| ) | |
| parser.add_argument( | |
| "--gatt-service", | |
| default="0xfff6", | |
| help="UUID of the Matter service; default is '%(default)s'", | |
| ) | |
| parser.add_argument( | |
| "--gatt-c1", | |
| default="18ee2ef5-263d-4559-959f-4f9c429f9d11", | |
| help="UUID of the Matter service; default is '%(default)s'", | |
| ) | |
| parser.add_argument( | |
| "--gatt-c1-flags", | |
| nargs="+", | |
| default=["write"], | |
| help="flags for the C1 characteristic; default is '%(default)s'", | |
| ) | |
| parser.add_argument( | |
| "--gatt-c2", | |
| default="18ee2ef5-263d-4559-959f-4f9c429f9d12", | |
| help="UUID of the Matter service; default is '%(default)s'", | |
| ) | |
| parser.add_argument( | |
| "--gatt-c2-flags", | |
| nargs="+", | |
| default=["indicate"], | |
| help="flags for the C2 characteristic; default is '%(default)s'", | |
| ) | |
| args = parser.parse_args() | |
| logging.basicConfig(level=logging.DEBUG) | |
| if args.role == "peripheral": | |
| app = Application(args.adapter, "/py/gatt/matter/app", args.gatt_service, | |
| args.gatt_c1, args.gatt_c1_flags, | |
| args.gatt_c2, args.gatt_c2_flags) | |
| app.register() | |
| adv = Advertisement(args.adapter, "/py/gatt/matter/adv", args.advertise) | |
| adv.register() | |
| GLib.MainLoop().run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment