Skip to content

Instantly share code, notes, and snippets.

@arkq
Created September 20, 2024 11:59
Show Gist options
  • Save arkq/a8fdba8494ef9286f0aed381d086c087 to your computer and use it in GitHub Desktop.
Save arkq/a8fdba8494ef9286f0aed381d086c087 to your computer and use it in GitHub Desktop.
GATT client/server for Matter BLE
#!/usr/bin/env python3
#
# Copyright 2024 Arkadiusz Bokowy
#
# Simple GATT client/server which emulates Matter device. This application can
# be used to test Matter devices and their interaction with the BLE. E.g., to
# test the Matter commissioning process or to search for potential security
# vulnerabilities in the Matter BLE stack.
import logging
import os.path
from argparse import ArgumentParser
from gi.repository import Gio, GLib
class DBusObject:
"""Helper class to simplify DBus object registration."""
INTERFACE = None
def __init__(self, path):
self.bus = Gio.bus_get_sync(Gio.BusType.SYSTEM, None)
self.bus.register_object(
path, self.INTERFACE, self.on_method_call, self.get_property
)
self.path = path
def on_method_call(self, conn, sender, path, iface, name, params, inv):
logging.debug("Called: %s.%s", iface, name)
getattr(self, f"_{name}")(params, inv)
def get_property(self, conn, sender, path, iface, name):
logging.debug("Get property: %s.%s", iface, name)
return getattr(self, f"_{name}")
def get_properties(self):
properties = {}
for prop in self.INTERFACE.properties:
properties[prop.name] = self.get_property(
None, None, None, self.INTERFACE.name, prop.name
)
return properties
def call(self, service, path, iface, method, params, callback):
self.bus.call(service, path, iface, method, params,
None, 0, -1, None, callback)
class Advertisement(DBusObject):
"""LE advertisement for Matter device."""
INTERFACE = Gio.DBusNodeInfo.new_for_xml(r"""
<node>
<interface name="org.bluez.LEAdvertisement1">
<method name="Release" />
<property name="Type" type="s" access="read" />
<property name="ServiceUUIDs" type="as" access="read" />
<property name="ServiceData" type="a{sv}" access="read" />
<property name="Discoverable" type="b" access="read" />
<property name="Includes" type="as" access="read" />
<property name="LocalName" type="s" access="read" />
</interface>
</node>
""").interfaces[0]
@property
def _Type(self):
return GLib.Variant.new_string("peripheral")
@property
def _ServiceUUIDs(self):
return GLib.Variant.new_strv(["0xFFF6"])
@property
def _ServiceData(self):
data = [0x00, 0x00, 0x0F, 0xF1, 0xFF, 0x01, 0x80, 0x00]
props = GLib.VariantDict.new()
props.insert_value(
"0xFFF6",
GLib.Variant.new_array(
GLib.VariantType("y"),
list(map(GLib.Variant.new_byte, data)),
),
)
return props.end()
@property
def _Discoverable(self):
return GLib.Variant.new_boolean(True)
@property
def _Includes(self):
return GLib.Variant.new_strv([])
@property
def _LocalName(self):
return GLib.Variant.new_string(self.name)
def __init__(self, adapter, path, name):
super().__init__(path)
self.adapter = adapter
self.name = name
def on_register_ready(self, conn, result):
logging.info("Advertisement registered")
conn.call_finish(result)
def register(self):
self.call(
"org.bluez",
f"/org/bluez/{self.adapter}",
"org.bluez.LEAdvertisingManager1",
"RegisterAdvertisement",
GLib.Variant("(oa{sv})", (self.path, {})),
self.on_register_ready,
)
class GattService(DBusObject):
"""GATT service object."""
INTERFACE = Gio.DBusNodeInfo.new_for_xml(r"""
<node>
<interface name="org.bluez.GattService1">
<property name="UUID" type="s" access="read" />
<property name="Primary" type="b" access="read" />
</interface>
</node>
""").interfaces[0]
@property
def _UUID(self):
return GLib.Variant.new_string(self.uuid)
@property
def _Primary(self):
return GLib.Variant.new_boolean(True)
def __init__(self, path, uuid):
super().__init__(path)
self.uuid = uuid
class GattCharacteristic(DBusObject):
"""GATT characteristic object."""
INTERFACE = Gio.DBusNodeInfo.new_for_xml(r"""
<node>
<interface name="org.bluez.GattCharacteristic1">
<method name="ReadValue">
<arg name="options" type="a{sv}" direction="in"/>
<arg name="value" type="ay" direction="out" />
</method>
<method name="WriteValue">
<arg name="value" type="ay" direction="in" />
<arg name="options" type="a{sv}" direction="in"/>
</method>
<method name="StartNotify" />
<method name="StopNotify" />
<method name="Confirm" />
<property name="UUID" type="s" access="read" />
<property name="Service" type="o" access="read" />
<property name="Value" type="ay" access="read" />
<property name="Flags" type="as" access="read" />
</interface>
</node>
""").interfaces[0]
@property
def _UUID(self):
return GLib.Variant.new_string(self.uuid)
@property
def _Service(self):
return GLib.Variant.new_object_path(self.service.path)
@property
def _Value(self):
return GLib.Variant.new_fixed_array(GLib.VariantType("y"), None, 0, 1)
@property
def _Flags(self):
return GLib.Variant.new_strv(self.flags)
def _WriteValue(self, params, inv, *argc, **kw):
logging.info("Write value: %s", params.get_child_value(0))
logging.info("Write options: %s", params.get_child_value(1))
def __init__(self, service, path, uuid, flags: list):
super().__init__(path)
self.service = service
self.uuid = uuid
self.flags = flags
class Application(DBusObject):
"""GATT application for Matter device."""
INTERFACE = Gio.DBusNodeInfo.new_for_xml(r"""
<node>
<interface name="org.freedesktop.DBus.ObjectManager">
<method name="GetManagedObjects">
<arg type="a{oa{sa{sv}}}" direction="out" />
</method>
</interface>
</node>
""").interfaces[0]
def _GetManagedObjects(self, params, inv):
objects = GLib.Variant(
"a{oa{sa{sv}}}",
{
self.service.path: {
"org.bluez.GattService1": self.service.get_properties()
},
self.c1.path: {
"org.bluez.GattCharacteristic1": self.c1.get_properties()
},
self.c2.path: {
"org.bluez.GattCharacteristic1": self.c2.get_properties()
},
},
)
inv.return_value(GLib.Variant.new_tuple(objects))
def __init__(self, adapter, path, service_uuid,
c1_uuid, c1_flags, c2_uuid, c2_flags):
super().__init__(path)
self.service = GattService(os.path.join(path, "service"), service_uuid)
self.c1 = GattCharacteristic(self.service, os.path.join(path, "c1"),
c1_uuid, c1_flags)
self.c2 = GattCharacteristic(self.service, os.path.join(path, "c2"),
c2_uuid, c2_flags)
self.adapter = adapter
def on_register_ready(self, conn, result):
logging.info("GATT application registered")
conn.call_finish(result)
def register(self):
self.call(
"org.bluez",
f"/org/bluez/{self.adapter}",
"org.bluez.GattManager1",
"RegisterApplication",
GLib.Variant("(oa{sv})", (self.path, {})),
self.on_register_ready,
)
parser = ArgumentParser(description="GATT client/server for Matter BLE")
parser.add_argument(
"--adapter",
default="hci0",
help="set the adapter to use; default is '%(default)s'",
)
parser.add_argument(
"--role",
choices=("central", "peripheral"),
default="central",
help="set the role of the device; default is '%(default)s'",
)
parser.add_argument(
"--advertise",
default="PyMatter",
help="advertise the device with a given name; default is '%(default)s'",
)
parser.add_argument(
"--gatt-service",
default="0xfff6",
help="UUID of the Matter service; default is '%(default)s'",
)
parser.add_argument(
"--gatt-c1",
default="18ee2ef5-263d-4559-959f-4f9c429f9d11",
help="UUID of the Matter service; default is '%(default)s'",
)
parser.add_argument(
"--gatt-c1-flags",
nargs="+",
default=["write"],
help="flags for the C1 characteristic; default is '%(default)s'",
)
parser.add_argument(
"--gatt-c2",
default="18ee2ef5-263d-4559-959f-4f9c429f9d12",
help="UUID of the Matter service; default is '%(default)s'",
)
parser.add_argument(
"--gatt-c2-flags",
nargs="+",
default=["indicate"],
help="flags for the C2 characteristic; default is '%(default)s'",
)
args = parser.parse_args()
logging.basicConfig(level=logging.DEBUG)
if args.role == "peripheral":
app = Application(args.adapter, "/py/gatt/matter/app", args.gatt_service,
args.gatt_c1, args.gatt_c1_flags,
args.gatt_c2, args.gatt_c2_flags)
app.register()
adv = Advertisement(args.adapter, "/py/gatt/matter/adv", args.advertise)
adv.register()
GLib.MainLoop().run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment