Skip to content

Instantly share code, notes, and snippets.

@arpruss
Created February 3, 2023 20:22
Show Gist options
  • Select an option

  • Save arpruss/227e92ee648a5f1bb35817acf198a69e to your computer and use it in GitHub Desktop.

Select an option

Save arpruss/227e92ee648a5f1bb35817acf198a69e to your computer and use it in GitHub Desktop.
import logging
import asyncio
import threading
from bleak import BleakScanner
#from typing import Any
def uuid16(n):
return "0000%04X-0000-1000-8000-00805F9B34FB" % n
hrAddress = "[deleted]"
from bless import (
BlessServer,
BlessGATTCharacteristic,
GATTCharacteristicProperties,
GATTAttributePermissions
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(name=__name__)
async def run(loop):
# Instantiate the server
my_service_name = "Heart Service"
server = BlessServer(name=my_service_name, loop=loop)
hr_service_uuid = uuid16(0x180d)
await server.add_new_service(hr_service_uuid)
measurement_char_uuid = uuid16(0x2a37)
measurement_flags = GATTCharacteristicProperties.notify
permissions = (
GATTAttributePermissions.readable |
GATTAttributePermissions.writeable
)
logger.info("adding characteristic")
await server.add_new_characteristic(
hr_service_uuid,
measurement_char_uuid,
measurement_flags,
None,
permissions)
logger.info("waiting for start")
await server.start()
logger.info("started")
hr_char = server.get_characteristic(measurement_char_uuid)
def callback(device, advertising_data):
if device.address == hrAddress.upper():
try:
sd = advertising_data.service_data[uuid16(0xFEE0).lower()]
if len(sd) == 5:
hr = sd[4]
logger.info("hr = %d" % hr)
hr_char.value = bytearray((0,hr))
server.update_value(hr_service_uuid, measurement_char_uuid)
except:
pass
async with BleakScanner(callback) as scanner:
await stop_event.wait()
await server.stop()
loop = asyncio.get_event_loop()
loop.run_until_complete(run(loop))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment