Created
August 1, 2025 12:48
-
-
Save pikesley/7a90358177baeffdd239ecd5363ae7d7 to your computer and use it in GitHub Desktop.
Bluetooth controller
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import bluetooth | |
import machine | |
import aioble | |
bluetooth_uuids = { | |
"mode": "1f99225b-be9e-4b8a-86dd-6c92033453d8", | |
"service": "3a48f6c5-22e4-4719-82dc-0abd1af1fd84", | |
} | |
device_name = "Pico" | |
_BLE_SERVICE_UUID = bluetooth.UUID(bluetooth_uuids["service"]) | |
_BLE_MODE_UUID = bluetooth.UUID(bluetooth_uuids["mode"]) | |
_ADV_INTERVAL_MS = 250_000 | |
ble_service = aioble.Service(_BLE_SERVICE_UUID) | |
mode_write_characteristic = aioble.Characteristic( | |
ble_service, _BLE_MODE_UUID, read=True, write=True, notify=True, capture=True | |
) | |
async def await_connection(): | |
"""Await connection.""" | |
while True: | |
try: | |
async with await aioble.advertise( | |
_ADV_INTERVAL_MS, | |
name=device_name, | |
services=[_BLE_SERVICE_UUID], | |
) as connection: | |
print("Connection from", connection.device) | |
await connection.disconnected() | |
except asyncio.CancelledError: | |
print("Peripheral task cancelled") | |
except Exception as e: # noqa: BLE001 | |
print("Error in peripheral_task:", e) | |
finally: | |
await asyncio.sleep_ms(100) | |
async def wait_for_write(): | |
"""Receive data.""" | |
while True: | |
try: | |
_, data = await mode_write_characteristic.written() | |
data = data.decode() | |
print(f"Received '{data}'") | |
except asyncio.CancelledError: | |
print("Peripheral task cancelled") | |
except Exception as e: # noqa: BLE001 | |
print("Error in peripheral_task:", e) | |
finally: | |
await asyncio.sleep_ms(100) | |
async def main(): | |
"""Run.""" | |
tasks = [ | |
asyncio.create_task(await_connection()), | |
asyncio.create_task(wait_for_write()), | |
] | |
await asyncio.gather(*tasks) | |
aioble.register_services(ble_service) | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment