Created
August 16, 2025 17:21
-
-
Save mik-laj/4c1c363391115ccb14ee856a9c1c12a1 to your computer and use it in GitHub Desktop.
The script that sends ring request to all beacons nearby. https://developers.google.com/nearby/fast-pair/specifications/extensions/fmdn
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| import struct | |
| import hmac, hashlib | |
| from bleak import BleakScanner, BleakClient, uuids as buuids | |
| from dataclasses import dataclass | |
| FAST_PAIR_SERVICE_UUID = buuids.normalize_uuid_str("fe2c") | |
| # UUID for the advertisement frame as defined in the Table 8. | |
| # The same UUID is used by Eddystone Protocol, but the frame type is different (0x40/0x41). | |
| ADVERTISE_FRAME_SERVICE_UUID = buuids.normalize_uuid_str("feaa") | |
| BEACON_ACTIONS_UUID = buuids.normalize_uuid_str("FE2C1238-8366-4814-8EB0-01DE32100BEA") # GATT characteristic | |
| DATA_ID_RING = 0x05 | |
| DATA_ID_READ_RING_STATE = 0x06 | |
| RING_KEYS = [ | |
| bytes.fromhex("c438999d5a8cb500"), | |
| bytes.fromhex("130c1c87733c203e"), | |
| bytes.fromhex("f93ff77957a40434"), | |
| ] | |
| @dataclass | |
| class FHNFrame: | |
| utp_mode: bool # unwanted tracking protection flag | |
| eid_hex: str # 20B or 32B | |
| hashed_flags: int | None | |
| def parse_feaa_fhn(payload: bytes) -> FHNFrame | None: | |
| # https://developers.google.com/nearby/fast-pair/specifications/extensions/fmdn#advertised-frames | |
| # Expect: [0x40/0x41][EID...][optional hashed_flags] | |
| if not payload or payload[0] not in (0x40, 0x41): | |
| return None | |
| utp = (payload[0] & 0x01) == 0x01 | |
| # Try 20B first, else 32B | |
| if len(payload) >= 1+20 and len(payload) <= 1+20+1: | |
| eid = payload[1:1+20] | |
| hf = payload[1+20] if len(payload) == 1+20+1 else None | |
| elif len(payload) >= 1+32 and len(payload) <= 1+32+1: | |
| eid = payload[1:1+32] | |
| hf = payload[1+32] if len(payload) == 1+32+1 else None | |
| else: | |
| return None | |
| return FHNFrame(utp_mode=utp, eid_hex=eid.hex(), hashed_flags=hf) | |
| def make_auth(ring_key: bytes, proto_major: int, nonce8: bytes, data_id: int, addl: bytes) -> bytes: | |
| # Auth: first 8 bytes HMAC-SHA256(RingKey, proto||nonce||data_id||len) | |
| data_len = len(addl) | |
| msg = bytes([proto_major]) + nonce8 + bytes([data_id, data_len]) + addl | |
| mac = hmac.new(ring_key, msg, hashlib.sha256).digest() | |
| return mac[:8] | |
| def build_ring_message(ring_key, addr, nonce8, proto_major, op_mask=0xFF, timeout_seconds=60.0, volume=0x00): | |
| # Documentation for ring operation parameters: | |
| # https://developers.google.com/nearby/fast-pair/specifications/extensions/fmdn#ring | |
| # timeout in tenths of seconds (uint16 big-endian), max 600 s | |
| t_ds = min(int(timeout_seconds * 10), 6000) | |
| # Additional data: ring all, 60 s, default volume | |
| addl = bytes([op_mask]) + struct.pack(">H", t_ds) + bytes([volume]) | |
| auth8 = make_auth(ring_key, proto_major, nonce8, DATA_ID_RING, addl) | |
| print(f"[{addr}] Auth: {auth8.hex()}") | |
| # Final write format (Table 4): [data_id, data_len, auth(8B), addl] | |
| payload = bytes([DATA_ID_RING, len(addl)]) + auth8 + addl | |
| return payload | |
| def build_read_ring_state_message(ring_key, addr, nonce8, proto_major): | |
| addl = b"" # No additional data for read state | |
| auth8 = make_auth(ring_key, proto_major, nonce8, DATA_ID_READ_RING_STATE, addl) | |
| print(f"[{addr}] Auth: {auth8.hex()}") | |
| # Final read format (Table 4): [data_id, data_len, auth(8B)] | |
| payload = bytes([DATA_ID_READ_RING_STATE, len(addl)]) + auth8 | |
| return payload | |
| async def try_ring(addr: str, ring_key: bytes): | |
| print(f"[{addr}] Ringing. Using key: {ring_key.hex()}...") | |
| async with BleakClient(addr, timeout=30.0) as cli: | |
| print(f"[{addr}] Connected to", addr) | |
| # A challenge-response mechanism: [proto_major (1B), nonce (8B)] | |
| print(f"[{addr}] Reading header...") | |
| header = await cli.read_gatt_char(BEACON_ACTIONS_UUID) | |
| if len(header) < 9: | |
| raise RuntimeError("Beacon actions header too short") | |
| print(f"[{addr}] Header: {header.hex()}") | |
| proto_major, nonce8 = header[0], header[1:9] | |
| print(f"[{addr}] Proto major: {proto_major}, Nonce: {nonce8.hex()}") | |
| # Subscribe notifications | |
| try: | |
| print(f"[{addr}] Subscribing to notifications...") | |
| await cli.start_notify(BEACON_ACTIONS_UUID, lambda h, d: print(f"[{addr}] Notify:", d.hex())) | |
| print(f"[{addr}] Subscribed to notifications.") | |
| except Exception as e: | |
| print(f"[{addr}] Failed to subscribe to notifications: {e}") | |
| # Write the payload | |
| message = build_ring_message(ring_key, addr, nonce8, proto_major, op_mask=0xFF, timeout_seconds=10.0, volume=0x00) | |
| # message = build_read_ring_state_message(ring_key, addr, nonce8, proto_major) | |
| print(f"[{addr}] Writing message: {message.hex()}") | |
| try: | |
| response = await cli.write_gatt_char(BEACON_ACTIONS_UUID, message, response=True) | |
| print(f"[{addr}] Response: {response.hex() if response else 'No response'}") | |
| except Exception as e: | |
| print(f"[{addr}] Write failed: {e}") | |
| print(f"[{addr}] type(e): {type(e)}") | |
| print(f"[{addr}] args: {e.args}") | |
| print(f"[{addr}] message: {e}") | |
| print(f"[{addr}] Waiting for notifications...") | |
| await asyncio.sleep(5) | |
| print(f"[{addr}] Finished waiting.") | |
| print(f"[{addr}] Disconnected.") | |
| already_known = set() | |
| async def on_advertisement_data(device, advertisement_data): | |
| if not (FAST_PAIR_SERVICE_UUID in advertisement_data.service_data or ADVERTISE_FRAME_SERVICE_UUID in advertisement_data.service_data): | |
| return | |
| if device.address in already_known: | |
| return | |
| already_known.add(device.address) | |
| print(f"Found new device: {device.address} ({device.name})") | |
| fhn_data = advertisement_data.service_data.get(ADVERTISE_FRAME_SERVICE_UUID, b'') | |
| fhn_frame = parse_feaa_fhn(fhn_data) | |
| if fhn_frame: | |
| print(f" UTP Mode: {fhn_frame.utp_mode}") | |
| print(f" EID Hex: {fhn_frame.eid_hex}") | |
| if fhn_frame.hashed_flags is not None: | |
| print(f" Hashed Flags: 0x{fhn_frame.hashed_flags:02x}") | |
| addr = device.address | |
| try: | |
| print(f"[{addr}] Attempting to ring...") | |
| for ring_key in RING_KEYS: | |
| await try_ring(addr, ring_key) | |
| except Exception as e: | |
| print(f"[{addr}] FAIL: {e}") | |
| async def main(): | |
| timeout = 60.0 * 5 | |
| scanner = BleakScanner(detection_callback=on_advertisement_data, scanning_mode="active") | |
| print(f"Start looking for devices for {timeout}s. Click on the tags or headphones to wake them up.") | |
| await scanner.start() | |
| try: | |
| await asyncio.sleep(120) | |
| finally: | |
| await scanner.stop() | |
| print("Koniec.") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment