Created
October 1, 2023 02:50
-
-
Save JCloudYu/4d54d52d80a041191807a5d803a06467 to your computer and use it in GitHub Desktop.
ibeacon.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
The MIT License (MIT) | |
Copyright © 2020 Walkline Wang (https://walkline.wang) | |
Gitee: https://gitee.com/walkline/esp32-ble-uart | |
Original repo: https://github.com/micropython/micropython/blob/master/examples/bluetooth/ble_uart_peripheral.py | |
""" | |
import ubluetooth as bt | |
__UART_UUID = bt.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E") | |
__RX_UUID = bt.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E") | |
__TX_UUID = bt.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E") | |
__UART_SERVICE = ( | |
__UART_UUID, | |
( | |
(__TX_UUID, bt.FLAG_NOTIFY,), | |
(__RX_UUID, bt.FLAG_WRITE,), | |
), | |
) | |
class BLEUART: | |
def __init__(self, ble, rx_callback=None, name="mpy-uart", rxbuf=100): | |
self.__ble = ble | |
self.__rx_cb = rx_callback | |
self.__conn_handle = None | |
self.__write = self.__ble.gatts_write | |
self.__read = self.__ble.gatts_read | |
self.__notify = self.__ble.gatts_notify | |
self.__ble.active(False) | |
print("activating ble...") | |
self.__ble.active(True) | |
print("ble activated") | |
self.__ble.config(rxbuf=rxbuf) | |
self.__ble.irq(self.__irq) | |
self.__register_services() | |
self.__adv_payload = BLETools.advertising_generic_payload( | |
services=(__UART_UUID,), | |
appearance=BLEConst.Appearance.GENERIC_COMPUTER, | |
) | |
self.__resp_payload = BLETools.advertising_resp_payload( | |
name=name | |
) | |
self.__advertise() | |
def __register_services(self): | |
( | |
( | |
self.__tx_handle, | |
self.__rx_handle, | |
), | |
) = self.__ble.gatts_register_services((__UART_SERVICE,)) | |
def __advertise(self, interval_us=500000): | |
self.__ble.gap_advertise(None) | |
self.__ble.gap_advertise(interval_us, adv_data=self.__adv_payload, resp_data=self.__resp_payload) | |
print("advertising...") | |
#中断请求 | |
def __irq(self, event, data): | |
if event == BLEConst.IRQ.IRQ_CENTRAL_CONNECT: | |
self.__conn_handle, addr_type, addr, = data | |
print("[{}] connected, handle: {}".format(BLETools.decode_mac(addr), self.__conn_handle)) | |
self.__ble.gap_advertise(None) | |
elif event == BLEConst.IRQ.IRQ_CENTRAL_DISCONNECT: | |
self.__conn_handle, _, addr, = data | |
print("[{}] disconnected, handle: {}".format(BLETools.decode_mac(addr), self.__conn_handle)) | |
self.__conn_handle = None | |
self.__advertise() | |
elif event == BLEConst.IRQ.IRQ_GATTS_WRITE: | |
conn_handle, value_handle = data | |
if conn_handle == self.__conn_handle and value_handle == self.__rx_handle: | |
if self.__rx_cb: | |
self.__rx_cb(self.__read(self.__rx_handle)) | |
def send(self, data): | |
""" | |
将数据写入本地缓存,并推送到中心设备 | |
""" | |
self.__write(self.__tx_handle, data) | |
if self.__conn_handle is not None: | |
self.__notify(self.__conn_handle, self.__tx_handle, data) | |
#从tools.py中摘取。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。 | |
import struct | |
from ubluetooth import UUID | |
PACK = struct.pack | |
UNPACK = struct.unpack | |
class BLETools(object): | |
""" | |
Payload Generator Functions | |
""" | |
# Advertising payloads are repeated packets of the following form: | |
# 1 byte data length (N + 1) | |
# 1 byte type (see constants below) | |
# N bytes type-specific data | |
@staticmethod | |
def advertising_generic_payload(limited_disc=False, br_edr=False, name=None, services=None, appearance=0): | |
""" | |
Generate a payload to be passed to gap_advertise(adv_data=...). | |
""" | |
payload = bytearray() | |
def _append(adv_type, value): | |
nonlocal payload | |
payload += PACK('BB', len(value) + 1, adv_type) + value | |
_append(BLEConst.ADType.AD_TYPE_FLAGS, PACK('B', (0x01 if limited_disc else 0x02) + (0x00 if br_edr else 0x04))) | |
if name: | |
_append(BLEConst.ADType.AD_TYPE_COMPLETE_LOCAL_NAME, name) | |
if services: | |
for uuid in services: | |
b = bytes(uuid) | |
if len(b) == 2: | |
_append(BLEConst.ADType.AD_TYPE_16BIT_SERVICE_UUID_COMPLETE, b) | |
elif len(b) == 4: | |
_append(BLEConst.ADType.AD_TYPE_32BIT_SERVICE_UUID_COMPLETE, b) | |
elif len(b) == 16: | |
_append(BLEConst.ADType.AD_TYPE_128BIT_SERVICE_UUID_COMPLETE, b) | |
# See org.bluetooth.characteristic.gap.appearance.xml | |
_append(BLEConst.ADType.AD_TYPE_APPEARANCE, PACK('<h', appearance)) | |
return payload | |
@staticmethod | |
def advertising_resp_payload(name=None, services=None): | |
""" | |
Generate payload for Scan Response | |
""" | |
payload = bytearray() | |
def _append(adv_type, value): | |
nonlocal payload | |
payload += PACK('BB', len(value) + 1, adv_type) + value | |
if name: | |
_append(BLEConst.ADType.AD_TYPE_COMPLETE_LOCAL_NAME, name) | |
if services: | |
for uuid in services: | |
b = bytes(uuid) | |
if len(b) == 2: | |
_append(BLEConst.ADType.AD_TYPE_16BIT_SERVICE_UUID_COMPLETE, b) | |
elif len(b) == 4: | |
_append(BLEConst.ADType.AD_TYPE_32BIT_SERVICE_UUID_COMPLETE, b) | |
elif len(b) == 16: | |
_append(BLEConst.ADType.AD_TYPE_128BIT_SERVICE_UUID_COMPLETE, b) | |
return payload | |
@staticmethod | |
def decode_mac(addr): | |
""" | |
Decode readable mac address from advertising addr | |
""" | |
if isinstance(addr, memoryview): | |
addr = bytes(addr) | |
assert isinstance(addr, bytes) and len(addr) == 6, ValueError("mac address value error") | |
return ":".join(['%02X' % byte for byte in addr]) | |
#从const.py中摘取。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。 | |
from micropython import const | |
class BLEConst(object): | |
class IRQ(object): | |
IRQ_CENTRAL_CONNECT = const(1) | |
IRQ_CENTRAL_DISCONNECT = const(2) | |
IRQ_GATTS_WRITE = const(3) | |
class Appearance(object): | |
Unknown = const(0) # None | |
GENERIC_PHONE = const(64) # Generic category | |
GENERIC_COMPUTER = const(128) # Generic category | |
class ADType(object): | |
''' | |
Advertising Data Type | |
''' | |
AD_TYPE_FLAGS = const(0x01) # Flags for discoverability. | |
AD_TYPE_16BIT_SERVICE_UUID_COMPLETE = const(0x03) # Complete list of 16 bit service UUIDs. | |
AD_TYPE_32BIT_SERVICE_UUID_COMPLETE = const(0x05) # Complete list of 32 bit service UUIDs. | |
AD_TYPE_128BIT_SERVICE_UUID_COMPLETE = const(0x07) # Complete list of 128 bit service UUIDs. | |
AD_TYPE_COMPLETE_LOCAL_NAME = const(0x09) # Complete local device name. | |
AD_TYPE_APPEARANCE = const(0x19) # Appearance. | |
def demo(): | |
from machine import Pin | |
def rx_callback(data): | |
print("rx received: {}".format(data)) | |
led.value(1 if data == b'on\0' else 0) | |
uart.send("on" if led.value() else "off") | |
def button_callback(pin): | |
led.value(not led.value()) | |
uart.send("on" if led.value() else "off") | |
ble = bt.BLE() | |
uart = BLEUART(ble, rx_callback) | |
led = Pin(2, Pin.OUT, value=0) | |
button = Pin(0, Pin.IN, Pin.PULL_UP) | |
button.irq(button_callback, Pin.IRQ_RISING) | |
demo() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment