Skip to content

Instantly share code, notes, and snippets.

@wipfli
Last active August 3, 2023 07:17
Show Gist options
  • Save wipfli/c75bb178d75c3e91b6eb7ed0992c4d16 to your computer and use it in GitHub Desktop.
Save wipfli/c75bb178d75c3e91b6eb7ed0992c4d16 to your computer and use it in GitHub Desktop.
iBBQ Python Bleak Script
# iBBQ sensor readout script using python bleak
#
# Source code was copied from https://github.com/8none1/pybq which is licensed under the GPLv3.0.
# For the license of the original script see https://github.com/8none1/pybq/blob/master/LICENSE
#
# Was adapted to use the python bleak package instead of bluepy using ChatGPT.
#
# Thanks to Will Cooke (https://github.com/8none1/) for providing the orginal version!
import asyncio
from bleak import BleakScanner, BleakClient
CREDENTIALS_MESSAGE = bytearray.fromhex("21 07 06 05 04 03 02 01 b8 22 00 00 00 00 00")
REALTIME_DATA_ENABLE = bytearray.fromhex("0B 01 00 00 00 00")
UNITS_CELSIUS = bytearray.fromhex("02 00 00 00 00 00")
BATTERY_LEVEL = bytearray.fromhex("08 24 00 00 00 00")
PAIR_UUID = "0000FFF2-0000-1000-8000-00805F9B34FB"
REALTIMEDATA_UUID = "0000FFF4-0000-1000-8000-00805F9B34FB"
CMD_UUID = "0000FFF5-0000-1000-8000-00805F9B34FB"
async def handle_notification(sender, data):
cHandle = sender.handle
if cHandle == 47:
temps = [int.from_bytes(data[i:i + 2], "little") for i in range(0, len(data), 2)]
for idx, item in enumerate(temps):
if item != 65526:
item = item / 10
print("bbq/temperature/" + str(idx + 1), int(item))
else:
print("Unknown data received from handle %s: %s" % (cHandle, data))
async def find_bbq_hwaddr():
bbqs = {}
devices = await BleakScanner.discover()
for dev in devices:
print("Device %s, RSSI=%sdB" % (dev.address, dev.rssi))
if "iBBQ" in dev.name:
bbqs[dev.rssi] = dev
print("Found iBBQ device %s at address %s. RSSI %s" % (dev.name, dev.address, dev.rssi))
if len(bbqs) > 0:
bbq = bbqs[sorted(bbqs.keys(), reverse=True)[0]]
print("Using hwaddr %s" % bbq.address)
return bbq.address
else:
return None
async def connect_to_bbq(hwid):
bbq = BleakClient(hwid)
await bbq.connect()
return bbq
async def main():
hwid = await find_bbq_hwaddr()
if hwid is not None:
bbq = await connect_to_bbq(hwid)
else:
raise NameError("No devices found")
await bbq.write_gatt_char(PAIR_UUID, CREDENTIALS_MESSAGE, response=True)
await bbq.start_notify(REALTIMEDATA_UUID, handle_notification)
await bbq.write_gatt_char(CMD_UUID, REALTIME_DATA_ENABLE, response=True)
await bbq.write_gatt_char(CMD_UUID, UNITS_CELSIUS, response=True)
await bbq.write_gatt_char(CMD_UUID, BATTERY_LEVEL, response=True)
try:
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
print("Caught ctrl-c. Disconnecting from device.")
await bbq.disconnect()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment