Created
February 3, 2023 20:22
-
-
Save arpruss/227e92ee648a5f1bb35817acf198a69e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import logging | |
| import asyncio | |
| import threading | |
| from bleak import BleakScanner | |
| #from typing import Any | |
| def uuid16(n): | |
| return "0000%04X-0000-1000-8000-00805F9B34FB" % n | |
| hrAddress = "[deleted]" | |
| from bless import ( | |
| BlessServer, | |
| BlessGATTCharacteristic, | |
| GATTCharacteristicProperties, | |
| GATTAttributePermissions | |
| ) | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(name=__name__) | |
| async def run(loop): | |
| # Instantiate the server | |
| my_service_name = "Heart Service" | |
| server = BlessServer(name=my_service_name, loop=loop) | |
| hr_service_uuid = uuid16(0x180d) | |
| await server.add_new_service(hr_service_uuid) | |
| measurement_char_uuid = uuid16(0x2a37) | |
| measurement_flags = GATTCharacteristicProperties.notify | |
| permissions = ( | |
| GATTAttributePermissions.readable | | |
| GATTAttributePermissions.writeable | |
| ) | |
| logger.info("adding characteristic") | |
| await server.add_new_characteristic( | |
| hr_service_uuid, | |
| measurement_char_uuid, | |
| measurement_flags, | |
| None, | |
| permissions) | |
| logger.info("waiting for start") | |
| await server.start() | |
| logger.info("started") | |
| hr_char = server.get_characteristic(measurement_char_uuid) | |
| def callback(device, advertising_data): | |
| if device.address == hrAddress.upper(): | |
| try: | |
| sd = advertising_data.service_data[uuid16(0xFEE0).lower()] | |
| if len(sd) == 5: | |
| hr = sd[4] | |
| logger.info("hr = %d" % hr) | |
| hr_char.value = bytearray((0,hr)) | |
| server.update_value(hr_service_uuid, measurement_char_uuid) | |
| except: | |
| pass | |
| async with BleakScanner(callback) as scanner: | |
| await stop_event.wait() | |
| await server.stop() | |
| loop = asyncio.get_event_loop() | |
| loop.run_until_complete(run(loop)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment