Last active
August 3, 2023 07:17
-
-
Save wipfli/c75bb178d75c3e91b6eb7ed0992c4d16 to your computer and use it in GitHub Desktop.
iBBQ Python Bleak Script
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# iBBQ sensor readout script using python bleak | |
# | |
# Source code was copied from https://github.com/8none1/pybq which is licensed under the GPLv3.0. | |
# For the license of the original script see https://github.com/8none1/pybq/blob/master/LICENSE | |
# | |
# Was adapted to use the python bleak package instead of bluepy using ChatGPT. | |
# | |
# Thanks to Will Cooke (https://github.com/8none1/) for providing the orginal version! | |
import asyncio | |
from bleak import BleakScanner, BleakClient | |
CREDENTIALS_MESSAGE = bytearray.fromhex("21 07 06 05 04 03 02 01 b8 22 00 00 00 00 00") | |
REALTIME_DATA_ENABLE = bytearray.fromhex("0B 01 00 00 00 00") | |
UNITS_CELSIUS = bytearray.fromhex("02 00 00 00 00 00") | |
BATTERY_LEVEL = bytearray.fromhex("08 24 00 00 00 00") | |
PAIR_UUID = "0000FFF2-0000-1000-8000-00805F9B34FB" | |
REALTIMEDATA_UUID = "0000FFF4-0000-1000-8000-00805F9B34FB" | |
CMD_UUID = "0000FFF5-0000-1000-8000-00805F9B34FB" | |
async def handle_notification(sender, data): | |
cHandle = sender.handle | |
if cHandle == 47: | |
temps = [int.from_bytes(data[i:i + 2], "little") for i in range(0, len(data), 2)] | |
for idx, item in enumerate(temps): | |
if item != 65526: | |
item = item / 10 | |
print("bbq/temperature/" + str(idx + 1), int(item)) | |
else: | |
print("Unknown data received from handle %s: %s" % (cHandle, data)) | |
async def find_bbq_hwaddr(): | |
bbqs = {} | |
devices = await BleakScanner.discover() | |
for dev in devices: | |
print("Device %s, RSSI=%sdB" % (dev.address, dev.rssi)) | |
if "iBBQ" in dev.name: | |
bbqs[dev.rssi] = dev | |
print("Found iBBQ device %s at address %s. RSSI %s" % (dev.name, dev.address, dev.rssi)) | |
if len(bbqs) > 0: | |
bbq = bbqs[sorted(bbqs.keys(), reverse=True)[0]] | |
print("Using hwaddr %s" % bbq.address) | |
return bbq.address | |
else: | |
return None | |
async def connect_to_bbq(hwid): | |
bbq = BleakClient(hwid) | |
await bbq.connect() | |
return bbq | |
async def main(): | |
hwid = await find_bbq_hwaddr() | |
if hwid is not None: | |
bbq = await connect_to_bbq(hwid) | |
else: | |
raise NameError("No devices found") | |
await bbq.write_gatt_char(PAIR_UUID, CREDENTIALS_MESSAGE, response=True) | |
await bbq.start_notify(REALTIMEDATA_UUID, handle_notification) | |
await bbq.write_gatt_char(CMD_UUID, REALTIME_DATA_ENABLE, response=True) | |
await bbq.write_gatt_char(CMD_UUID, UNITS_CELSIUS, response=True) | |
await bbq.write_gatt_char(CMD_UUID, BATTERY_LEVEL, response=True) | |
try: | |
while True: | |
await asyncio.sleep(1) | |
except KeyboardInterrupt: | |
print("Caught ctrl-c. Disconnecting from device.") | |
await bbq.disconnect() | |
if __name__ == "__main__": | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment