Created
September 22, 2023 20:31
-
-
Save tausen/d79ebc7f62845f0159d0291533886155 to your computer and use it in GitHub Desktop.
InfiniTime bluetooth attribute logging
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import time | |
import struct | |
import asyncio | |
from bleak import BleakClient, BleakScanner, exc | |
from bleak.backends.characteristic import BleakGATTCharacteristic | |
UUID_HEARTRATE = '00002a37-0000-1000-8000-00805f9b34fb' | |
UUID_MOTION = '00030002-78fc-48fe-8e23-433b3a1942d0' | |
OUTFILE_HR = None | |
OUTFILE_MOTION = None | |
def heartrate_handler(characteristic: BleakGATTCharacteristic, data: bytearray): | |
if OUTFILE_HR is not None: | |
hr = struct.unpack(">H", data)[0] | |
if hr != 0: | |
print(f"hr: {hr}") | |
OUTFILE_HR.write(f"{time.time()}\t{hr}\n") | |
OUTFILE_HR.flush() | |
def motion_handler(characteristic: BleakGATTCharacteristic, data: bytearray): | |
if OUTFILE_MOTION is not None: | |
mtn = struct.unpack("<hhh", data) | |
print(f"motion: {mtn}") | |
OUTFILE_MOTION.write(f"{time.time()}\t{mtn[0]}\t{mtn[1]}\t{mtn[2]}\n") | |
OUTFILE_MOTION.flush() | |
async def main(): | |
global OUTFILE_HR, OUTFILE_MOTION | |
OUTFILE_HR = open("hr.tsv", "w", encoding="utf8") | |
OUTFILE_MOTION = open("motion.tsv", "w", encoding="utf8") | |
disconnected_event = asyncio.Event() | |
def disconnected_callback(_): | |
print("Disconnected") | |
disconnected_event.set() | |
try: | |
while True: | |
device = await BleakScanner.find_device_by_name("InfiniTime", adapter="hci1") | |
if device is None: | |
raise RuntimeError("could not find device") | |
async with BleakClient(device, disconnected_callback=disconnected_callback) as client: | |
print("Connected") | |
await client.start_notify(UUID_HEARTRATE, heartrate_handler) | |
await client.start_notify(UUID_MOTION, motion_handler) | |
await disconnected_event.wait() | |
disconnected_event.clear() | |
try: | |
await client.stop_notify(UUID_HEARTRATE) | |
await client.stop_notify(UUID_MOTION) | |
except exc.BleakError: | |
print("Could not stop notify") | |
await asyncio.sleep(30) | |
except Exception as ex: | |
OUTFILE_HR.close() | |
OUTFILE_MOTION.close() | |
raise ex | |
if __name__ == "__main__": | |
try: | |
asyncio.run(main()) | |
except KeyboardInterrupt: | |
if not OUTFILE_HR.closed: | |
OUTFILE_HR.close() | |
if not OUTFILE_MOTION.closed: | |
OUTFILE_MOTION.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment