Skip to content

Instantly share code, notes, and snippets.

@mik-laj
Created August 16, 2025 17:21
Show Gist options
  • Select an option

  • Save mik-laj/4c1c363391115ccb14ee856a9c1c12a1 to your computer and use it in GitHub Desktop.

Select an option

Save mik-laj/4c1c363391115ccb14ee856a9c1c12a1 to your computer and use it in GitHub Desktop.
The script that sends ring request to all beacons nearby. https://developers.google.com/nearby/fast-pair/specifications/extensions/fmdn
import asyncio
import struct
import hmac, hashlib
from bleak import BleakScanner, BleakClient, uuids as buuids
from dataclasses import dataclass
FAST_PAIR_SERVICE_UUID = buuids.normalize_uuid_str("fe2c")
# UUID for the advertisement frame as defined in the Table 8.
# The same UUID is used by Eddystone Protocol, but the frame type is different (0x40/0x41).
ADVERTISE_FRAME_SERVICE_UUID = buuids.normalize_uuid_str("feaa")
BEACON_ACTIONS_UUID = buuids.normalize_uuid_str("FE2C1238-8366-4814-8EB0-01DE32100BEA") # GATT characteristic
DATA_ID_RING = 0x05
DATA_ID_READ_RING_STATE = 0x06
RING_KEYS = [
bytes.fromhex("c438999d5a8cb500"),
bytes.fromhex("130c1c87733c203e"),
bytes.fromhex("f93ff77957a40434"),
]
@dataclass
class FHNFrame:
utp_mode: bool # unwanted tracking protection flag
eid_hex: str # 20B or 32B
hashed_flags: int | None
def parse_feaa_fhn(payload: bytes) -> FHNFrame | None:
# https://developers.google.com/nearby/fast-pair/specifications/extensions/fmdn#advertised-frames
# Expect: [0x40/0x41][EID...][optional hashed_flags]
if not payload or payload[0] not in (0x40, 0x41):
return None
utp = (payload[0] & 0x01) == 0x01
# Try 20B first, else 32B
if len(payload) >= 1+20 and len(payload) <= 1+20+1:
eid = payload[1:1+20]
hf = payload[1+20] if len(payload) == 1+20+1 else None
elif len(payload) >= 1+32 and len(payload) <= 1+32+1:
eid = payload[1:1+32]
hf = payload[1+32] if len(payload) == 1+32+1 else None
else:
return None
return FHNFrame(utp_mode=utp, eid_hex=eid.hex(), hashed_flags=hf)
def make_auth(ring_key: bytes, proto_major: int, nonce8: bytes, data_id: int, addl: bytes) -> bytes:
# Auth: first 8 bytes HMAC-SHA256(RingKey, proto||nonce||data_id||len)
data_len = len(addl)
msg = bytes([proto_major]) + nonce8 + bytes([data_id, data_len]) + addl
mac = hmac.new(ring_key, msg, hashlib.sha256).digest()
return mac[:8]
def build_ring_message(ring_key, addr, nonce8, proto_major, op_mask=0xFF, timeout_seconds=60.0, volume=0x00):
# Documentation for ring operation parameters:
# https://developers.google.com/nearby/fast-pair/specifications/extensions/fmdn#ring
# timeout in tenths of seconds (uint16 big-endian), max 600 s
t_ds = min(int(timeout_seconds * 10), 6000)
# Additional data: ring all, 60 s, default volume
addl = bytes([op_mask]) + struct.pack(">H", t_ds) + bytes([volume])
auth8 = make_auth(ring_key, proto_major, nonce8, DATA_ID_RING, addl)
print(f"[{addr}] Auth: {auth8.hex()}")
# Final write format (Table 4): [data_id, data_len, auth(8B), addl]
payload = bytes([DATA_ID_RING, len(addl)]) + auth8 + addl
return payload
def build_read_ring_state_message(ring_key, addr, nonce8, proto_major):
addl = b"" # No additional data for read state
auth8 = make_auth(ring_key, proto_major, nonce8, DATA_ID_READ_RING_STATE, addl)
print(f"[{addr}] Auth: {auth8.hex()}")
# Final read format (Table 4): [data_id, data_len, auth(8B)]
payload = bytes([DATA_ID_READ_RING_STATE, len(addl)]) + auth8
return payload
async def try_ring(addr: str, ring_key: bytes):
print(f"[{addr}] Ringing. Using key: {ring_key.hex()}...")
async with BleakClient(addr, timeout=30.0) as cli:
print(f"[{addr}] Connected to", addr)
# A challenge-response mechanism: [proto_major (1B), nonce (8B)]
print(f"[{addr}] Reading header...")
header = await cli.read_gatt_char(BEACON_ACTIONS_UUID)
if len(header) < 9:
raise RuntimeError("Beacon actions header too short")
print(f"[{addr}] Header: {header.hex()}")
proto_major, nonce8 = header[0], header[1:9]
print(f"[{addr}] Proto major: {proto_major}, Nonce: {nonce8.hex()}")
# Subscribe notifications
try:
print(f"[{addr}] Subscribing to notifications...")
await cli.start_notify(BEACON_ACTIONS_UUID, lambda h, d: print(f"[{addr}] Notify:", d.hex()))
print(f"[{addr}] Subscribed to notifications.")
except Exception as e:
print(f"[{addr}] Failed to subscribe to notifications: {e}")
# Write the payload
message = build_ring_message(ring_key, addr, nonce8, proto_major, op_mask=0xFF, timeout_seconds=10.0, volume=0x00)
# message = build_read_ring_state_message(ring_key, addr, nonce8, proto_major)
print(f"[{addr}] Writing message: {message.hex()}")
try:
response = await cli.write_gatt_char(BEACON_ACTIONS_UUID, message, response=True)
print(f"[{addr}] Response: {response.hex() if response else 'No response'}")
except Exception as e:
print(f"[{addr}] Write failed: {e}")
print(f"[{addr}] type(e): {type(e)}")
print(f"[{addr}] args: {e.args}")
print(f"[{addr}] message: {e}")
print(f"[{addr}] Waiting for notifications...")
await asyncio.sleep(5)
print(f"[{addr}] Finished waiting.")
print(f"[{addr}] Disconnected.")
already_known = set()
async def on_advertisement_data(device, advertisement_data):
if not (FAST_PAIR_SERVICE_UUID in advertisement_data.service_data or ADVERTISE_FRAME_SERVICE_UUID in advertisement_data.service_data):
return
if device.address in already_known:
return
already_known.add(device.address)
print(f"Found new device: {device.address} ({device.name})")
fhn_data = advertisement_data.service_data.get(ADVERTISE_FRAME_SERVICE_UUID, b'')
fhn_frame = parse_feaa_fhn(fhn_data)
if fhn_frame:
print(f" UTP Mode: {fhn_frame.utp_mode}")
print(f" EID Hex: {fhn_frame.eid_hex}")
if fhn_frame.hashed_flags is not None:
print(f" Hashed Flags: 0x{fhn_frame.hashed_flags:02x}")
addr = device.address
try:
print(f"[{addr}] Attempting to ring...")
for ring_key in RING_KEYS:
await try_ring(addr, ring_key)
except Exception as e:
print(f"[{addr}] FAIL: {e}")
async def main():
timeout = 60.0 * 5
scanner = BleakScanner(detection_callback=on_advertisement_data, scanning_mode="active")
print(f"Start looking for devices for {timeout}s. Click on the tags or headphones to wake them up.")
await scanner.start()
try:
await asyncio.sleep(120)
finally:
await scanner.stop()
print("Koniec.")
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment