Skip to content

Instantly share code, notes, and snippets.

@agners
Created October 24, 2024 09:28
Show Gist options
  • Save agners/a72788d8166aa68340a8e98e0de059b9 to your computer and use it in GitHub Desktop.
Save agners/a72788d8166aa68340a8e98e0de059b9 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import argparse
import asyncio
import logging
from typing import Any, Optional, cast
from zeroconf import IPVersion, ServiceInfo, ServiceStateChange, Zeroconf, ServiceListener
from zeroconf.asyncio import AsyncServiceBrowser, AsyncServiceInfo, AsyncZeroconf
DNS_SD_TBR_SERVICE_TYPE = "_meshcop._udp.local."
DNS_SD_TREL_SERVICE_TYPE = "_trel._udp.local."
# Converts byte arrays to hex strings
def bytes_to_hex_str(byte_array: Optional[bytes]) -> str:
if byte_array:
return byte_array.hex()
return ""
# Converts binary values to unsigned integers
def bytes_to_uint(byte_array: Optional[bytes]) -> Optional[int]:
if byte_array:
return int.from_bytes(byte_array, 'big')
return None
# Asynchronous function to display and process service info for TREL services
async def async_display_trel_info(zeroconf: Zeroconf, service_type: str, name: str) -> None:
info = AsyncServiceInfo(service_type, name)
# Request service info with a 3000ms timeout
await info.async_request(zeroconf, 3000)
if info:
# Extract the properties
xa = info.properties.get(b'xa')
xa_hex = bytes_to_hex_str(xa)
# Build the TREL service entry
service_info = {
"addresses": [addr for addr in info.parsed_scoped_addresses()],
"port": info.port,
"Extended Address (xa)": xa_hex
}
# Store the info in the trel_services dictionary
trel_services[xa_hex] = service_info
print(f"Discovered TREL Service {name}")
print(f"TREL Info: {trel_services.get(xa_hex)}")
print(f"TREL Addresses: {trel_services.get(addresses)}")
print(f"TREL Port: {trel_services.get(port)}")
# Handler for meshcop service state changes
def meshcop_service_state_change(zeroconf: Zeroconf, service_type: str, name: str, state_change: ServiceStateChange) -> None:
if state_change == ServiceStateChange.Added:
asyncio.ensure_future(async_display_meshcop_info(zeroconf, service_type, name))
# Handler for TREL service state changes
def trel_service_state_change(zeroconf: Zeroconf, service_type: str, name: str, state_change: ServiceStateChange) -> None:
print(f"Service {name} of type {service_type} changed state to {state_change}")
if state_change == ServiceStateChange.Added:
asyncio.ensure_future(async_display_trel_info(zeroconf, service_type, name))
# Runner class to manage Zeroconf browsing
class ThreadBorderRouterBrowser:
def __init__(self, args: Any) -> None:
self.args = args
self._in_startup = True
self._meshcop_services: dict[str, dict] = {}
self._meshcop_xa: dict[bytes, dict] = {}
self._trel_services: dict[bytes, dict] = {}
self._update = False
self.aiobrowser_meshcop: Optional[AsyncServiceBrowser] = None
self.aiobrowser_trel: Optional[AsyncServiceBrowser] = None
self.aiozc: Optional[AsyncZeroconf] = None
async def async_add_service(self, zeroconf: Zeroconf, service_type: str, name: str) -> None:
info = AsyncServiceInfo(service_type, name)
# Request service info with a 3000ms timeout
if not await info.async_request(zeroconf, 3000):
logging.warning(f"Failed to get info for service {name}")
if b"xa" not in info.properties:
logging.warning(f"Service {name} is missing required property 'xa'")
logging.warning(f"Service info: {info.properties}")
return
self._update = True
if service_type == DNS_SD_TBR_SERVICE_TYPE:
if name in self._meshcop_services:
logging.info(f"Updating Thread BR service {name}")
self._meshcop_services[name] = info
else:
logging.info(f"Adding Thread BR service {name}")
self._meshcop_services[name] = info
elif service_type == DNS_SD_TREL_SERVICE_TYPE:
if name in self._trel_services:
logging.info(f"Updating TREL service {name}")
self._trel_services[name] = info
else:
logging.info(f"Adding TREL service {name}")
self._trel_services[name] = info
logging.info(f"TREL addresses {info.parsed_scoped_addresses()}")
logging.info(f"TREL port {info.port}")
def display_service_info(self, info: ServiceInfo) -> None:
properties = info.properties
xa = properties.get(b'xa')
xp = properties.get(b'xp')
id_ = properties.get(b'id')
pt = properties.get(b'pt')
vn = properties.get(b'vn')
mn = properties.get(b'mn')
at = properties.get(b'at')
vd = properties.get(b'vd')
vo = properties.get(b'vo')
dn = properties.get(b'dn')
sq = properties.get(b'sq')
bb = properties.get(b'bb')
omr = properties.get(b'omr')
# Convert byte arrays and strings to human-readable format
xa_hex = bytes_to_hex_str(xa)
xp_hex = bytes_to_hex_str(xp)
id_hex = bytes_to_hex_str(id_)
pt_hex = bytes_to_hex_str(pt)
at_hex = bytes_to_hex_str(at)
vd_hex = bytes_to_hex_str(vd)
vo_hex = bytes_to_hex_str(vo)
omr_hex = bytes_to_hex_str(omr)
vn_str = vn.decode("utf-8") if vn else ""
mn_str = mn.decode("utf-8") if mn else ""
dn_str = dn.decode("utf-8") if dn else ""
sq_value = bytes_to_uint(sq)
bb_value = bytes_to_uint(bb)
# Build the service entry
service_info = {
"IP addresses": [addr for addr in info.parsed_scoped_addresses()],
"Extended Address (xa)": xa_hex,
"Extended PAN ID (xp)": xp_hex,
"Border Router ID (id)": id_hex,
"Partition ID (pt)": pt_hex,
"Active Operational Dataset Timestamp (at)": at_hex,
"Vendor-Specific Data (vd)": vd_hex,
"Vendor OUI (vo)": vo_hex,
"Domain Name (dn)": dn_str,
"Vendor Name (vn)": vn_str,
"Model Name (mn)": mn_str,
"BBR Sequence Number (sq)": sq_value,
"BBR Port (bb)": bb_value,
"Off-Mesh Routable Prefix (omr)": omr_hex
}
print(info)
print(f"Discovered Border Router: {info.name}")
for info in service_info:
print(f" {info}: {service_info[info]}")
def add_service(self, zeroconf: Zeroconf, service_type: str, name: str) -> None:
asyncio.create_task(self.async_add_service(zeroconf, service_type, name))
def remove_service(self, zeroconf: Zeroconf, service_type: str, name: str) -> None:
pass
def update_service(self, zeroconf: Zeroconf, service_type: str, name: str) -> None:
asyncio.create_task(self.async_add_service(zeroconf, service_type, name))
async def async_run(self) -> None:
# Initialize AsyncZeroconf with the specified IP version
self.aiozc = AsyncZeroconf(ip_version=ip_version)
# Browsing for both _meshcop._udp and _trel._udp services
print("\nBrowsing for _meshcop._udp and _trel._udp services...\n")
# Start service browsers for both service types
self.aiobrowser_meshcop = AsyncServiceBrowser(
self.aiozc.zeroconf, DNS_SD_TBR_SERVICE_TYPE, listener=self
)
self.aiobrowser_trel = AsyncServiceBrowser(
self.aiozc.zeroconf, DNS_SD_TREL_SERVICE_TYPE, listener=self
)
# Run the browsers for 5 seconds, then start printing updates when services are found
await asyncio.sleep(5)
print(f"\nInitial Scan Complete - Meshcop services: {len(self._meshcop_services)}, TREL services: {len(self._trel_services)}\n")
for service in self._meshcop_services:
self.display_service_info(self._meshcop_services[service])
# Keep running until the user presses Ctrl+C
while True:
await asyncio.sleep(1)
if self._update:
for service in self._meshcop_services:
self.display_service_info(self._meshcop_services[service])
self._update = False
async def async_close(self) -> None:
# Properly close the zeroconf services when the program exits
if self.aiobrowser_meshcop:
await self.aiobrowser_meshcop.async_cancel()
if self.aiobrowser_trel:
await self.aiobrowser_trel.async_cancel()
if self.aiozc:
await self.aiozc.async_close()
# Main execution starts here
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
# Argument parsing for additional options
parser = argparse.ArgumentParser()
parser.add_argument("--debug", action="store_true")
version_group = parser.add_mutually_exclusive_group()
version_group.add_argument("--v6", action="store_true")
version_group.add_argument("--v6-only", action="store_true")
args = parser.parse_args()
# Set logging level for debugging
if args.debug:
logging.getLogger("zeroconf").setLevel(logging.DEBUG)
# Configure IP version based on arguments
if args.v6:
ip_version = IPVersion.All
elif args.v6_only:
ip_version = IPVersion.V6Only
else:
ip_version = IPVersion.V4Only
# Start the asyncio loop and run the ThreadBorderRouterBrowser
try:
asyncio.run(ThreadBorderRouterBrowser(args).async_run())
except KeyboardInterrupt:
asyncio.run(ThreadBorderRouterBrowser(args).async_close())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment