Created
October 24, 2024 09:28
-
-
Save agners/a72788d8166aa68340a8e98e0de059b9 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import asyncio | |
import logging | |
from typing import Any, Optional, cast | |
from zeroconf import IPVersion, ServiceInfo, ServiceStateChange, Zeroconf, ServiceListener | |
from zeroconf.asyncio import AsyncServiceBrowser, AsyncServiceInfo, AsyncZeroconf | |
DNS_SD_TBR_SERVICE_TYPE = "_meshcop._udp.local." | |
DNS_SD_TREL_SERVICE_TYPE = "_trel._udp.local." | |
# Converts byte arrays to hex strings | |
def bytes_to_hex_str(byte_array: Optional[bytes]) -> str: | |
if byte_array: | |
return byte_array.hex() | |
return "" | |
# Converts binary values to unsigned integers | |
def bytes_to_uint(byte_array: Optional[bytes]) -> Optional[int]: | |
if byte_array: | |
return int.from_bytes(byte_array, 'big') | |
return None | |
# Asynchronous function to display and process service info for TREL services | |
async def async_display_trel_info(zeroconf: Zeroconf, service_type: str, name: str) -> None: | |
info = AsyncServiceInfo(service_type, name) | |
# Request service info with a 3000ms timeout | |
await info.async_request(zeroconf, 3000) | |
if info: | |
# Extract the properties | |
xa = info.properties.get(b'xa') | |
xa_hex = bytes_to_hex_str(xa) | |
# Build the TREL service entry | |
service_info = { | |
"addresses": [addr for addr in info.parsed_scoped_addresses()], | |
"port": info.port, | |
"Extended Address (xa)": xa_hex | |
} | |
# Store the info in the trel_services dictionary | |
trel_services[xa_hex] = service_info | |
print(f"Discovered TREL Service {name}") | |
print(f"TREL Info: {trel_services.get(xa_hex)}") | |
print(f"TREL Addresses: {trel_services.get(addresses)}") | |
print(f"TREL Port: {trel_services.get(port)}") | |
# Handler for meshcop service state changes | |
def meshcop_service_state_change(zeroconf: Zeroconf, service_type: str, name: str, state_change: ServiceStateChange) -> None: | |
if state_change == ServiceStateChange.Added: | |
asyncio.ensure_future(async_display_meshcop_info(zeroconf, service_type, name)) | |
# Handler for TREL service state changes | |
def trel_service_state_change(zeroconf: Zeroconf, service_type: str, name: str, state_change: ServiceStateChange) -> None: | |
print(f"Service {name} of type {service_type} changed state to {state_change}") | |
if state_change == ServiceStateChange.Added: | |
asyncio.ensure_future(async_display_trel_info(zeroconf, service_type, name)) | |
# Runner class to manage Zeroconf browsing | |
class ThreadBorderRouterBrowser: | |
def __init__(self, args: Any) -> None: | |
self.args = args | |
self._in_startup = True | |
self._meshcop_services: dict[str, dict] = {} | |
self._meshcop_xa: dict[bytes, dict] = {} | |
self._trel_services: dict[bytes, dict] = {} | |
self._update = False | |
self.aiobrowser_meshcop: Optional[AsyncServiceBrowser] = None | |
self.aiobrowser_trel: Optional[AsyncServiceBrowser] = None | |
self.aiozc: Optional[AsyncZeroconf] = None | |
async def async_add_service(self, zeroconf: Zeroconf, service_type: str, name: str) -> None: | |
info = AsyncServiceInfo(service_type, name) | |
# Request service info with a 3000ms timeout | |
if not await info.async_request(zeroconf, 3000): | |
logging.warning(f"Failed to get info for service {name}") | |
if b"xa" not in info.properties: | |
logging.warning(f"Service {name} is missing required property 'xa'") | |
logging.warning(f"Service info: {info.properties}") | |
return | |
self._update = True | |
if service_type == DNS_SD_TBR_SERVICE_TYPE: | |
if name in self._meshcop_services: | |
logging.info(f"Updating Thread BR service {name}") | |
self._meshcop_services[name] = info | |
else: | |
logging.info(f"Adding Thread BR service {name}") | |
self._meshcop_services[name] = info | |
elif service_type == DNS_SD_TREL_SERVICE_TYPE: | |
if name in self._trel_services: | |
logging.info(f"Updating TREL service {name}") | |
self._trel_services[name] = info | |
else: | |
logging.info(f"Adding TREL service {name}") | |
self._trel_services[name] = info | |
logging.info(f"TREL addresses {info.parsed_scoped_addresses()}") | |
logging.info(f"TREL port {info.port}") | |
def display_service_info(self, info: ServiceInfo) -> None: | |
properties = info.properties | |
xa = properties.get(b'xa') | |
xp = properties.get(b'xp') | |
id_ = properties.get(b'id') | |
pt = properties.get(b'pt') | |
vn = properties.get(b'vn') | |
mn = properties.get(b'mn') | |
at = properties.get(b'at') | |
vd = properties.get(b'vd') | |
vo = properties.get(b'vo') | |
dn = properties.get(b'dn') | |
sq = properties.get(b'sq') | |
bb = properties.get(b'bb') | |
omr = properties.get(b'omr') | |
# Convert byte arrays and strings to human-readable format | |
xa_hex = bytes_to_hex_str(xa) | |
xp_hex = bytes_to_hex_str(xp) | |
id_hex = bytes_to_hex_str(id_) | |
pt_hex = bytes_to_hex_str(pt) | |
at_hex = bytes_to_hex_str(at) | |
vd_hex = bytes_to_hex_str(vd) | |
vo_hex = bytes_to_hex_str(vo) | |
omr_hex = bytes_to_hex_str(omr) | |
vn_str = vn.decode("utf-8") if vn else "" | |
mn_str = mn.decode("utf-8") if mn else "" | |
dn_str = dn.decode("utf-8") if dn else "" | |
sq_value = bytes_to_uint(sq) | |
bb_value = bytes_to_uint(bb) | |
# Build the service entry | |
service_info = { | |
"IP addresses": [addr for addr in info.parsed_scoped_addresses()], | |
"Extended Address (xa)": xa_hex, | |
"Extended PAN ID (xp)": xp_hex, | |
"Border Router ID (id)": id_hex, | |
"Partition ID (pt)": pt_hex, | |
"Active Operational Dataset Timestamp (at)": at_hex, | |
"Vendor-Specific Data (vd)": vd_hex, | |
"Vendor OUI (vo)": vo_hex, | |
"Domain Name (dn)": dn_str, | |
"Vendor Name (vn)": vn_str, | |
"Model Name (mn)": mn_str, | |
"BBR Sequence Number (sq)": sq_value, | |
"BBR Port (bb)": bb_value, | |
"Off-Mesh Routable Prefix (omr)": omr_hex | |
} | |
print(info) | |
print(f"Discovered Border Router: {info.name}") | |
for info in service_info: | |
print(f" {info}: {service_info[info]}") | |
def add_service(self, zeroconf: Zeroconf, service_type: str, name: str) -> None: | |
asyncio.create_task(self.async_add_service(zeroconf, service_type, name)) | |
def remove_service(self, zeroconf: Zeroconf, service_type: str, name: str) -> None: | |
pass | |
def update_service(self, zeroconf: Zeroconf, service_type: str, name: str) -> None: | |
asyncio.create_task(self.async_add_service(zeroconf, service_type, name)) | |
async def async_run(self) -> None: | |
# Initialize AsyncZeroconf with the specified IP version | |
self.aiozc = AsyncZeroconf(ip_version=ip_version) | |
# Browsing for both _meshcop._udp and _trel._udp services | |
print("\nBrowsing for _meshcop._udp and _trel._udp services...\n") | |
# Start service browsers for both service types | |
self.aiobrowser_meshcop = AsyncServiceBrowser( | |
self.aiozc.zeroconf, DNS_SD_TBR_SERVICE_TYPE, listener=self | |
) | |
self.aiobrowser_trel = AsyncServiceBrowser( | |
self.aiozc.zeroconf, DNS_SD_TREL_SERVICE_TYPE, listener=self | |
) | |
# Run the browsers for 5 seconds, then start printing updates when services are found | |
await asyncio.sleep(5) | |
print(f"\nInitial Scan Complete - Meshcop services: {len(self._meshcop_services)}, TREL services: {len(self._trel_services)}\n") | |
for service in self._meshcop_services: | |
self.display_service_info(self._meshcop_services[service]) | |
# Keep running until the user presses Ctrl+C | |
while True: | |
await asyncio.sleep(1) | |
if self._update: | |
for service in self._meshcop_services: | |
self.display_service_info(self._meshcop_services[service]) | |
self._update = False | |
async def async_close(self) -> None: | |
# Properly close the zeroconf services when the program exits | |
if self.aiobrowser_meshcop: | |
await self.aiobrowser_meshcop.async_cancel() | |
if self.aiobrowser_trel: | |
await self.aiobrowser_trel.async_cancel() | |
if self.aiozc: | |
await self.aiozc.async_close() | |
# Main execution starts here | |
if __name__ == "__main__": | |
logging.basicConfig(level=logging.INFO) | |
# Argument parsing for additional options | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--debug", action="store_true") | |
version_group = parser.add_mutually_exclusive_group() | |
version_group.add_argument("--v6", action="store_true") | |
version_group.add_argument("--v6-only", action="store_true") | |
args = parser.parse_args() | |
# Set logging level for debugging | |
if args.debug: | |
logging.getLogger("zeroconf").setLevel(logging.DEBUG) | |
# Configure IP version based on arguments | |
if args.v6: | |
ip_version = IPVersion.All | |
elif args.v6_only: | |
ip_version = IPVersion.V6Only | |
else: | |
ip_version = IPVersion.V4Only | |
# Start the asyncio loop and run the ThreadBorderRouterBrowser | |
try: | |
asyncio.run(ThreadBorderRouterBrowser(args).async_run()) | |
except KeyboardInterrupt: | |
asyncio.run(ThreadBorderRouterBrowser(args).async_close()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment