Skip to content

Instantly share code, notes, and snippets.

@nanase
Last active November 24, 2024 15:46
Show Gist options
  • Save nanase/988a31cc505e0ccf7c1fca9e5cef2b31 to your computer and use it in GitHub Desktop.
Save nanase/988a31cc505e0ccf7c1fca9e5cef2b31 to your computer and use it in GitHub Desktop.
M5AtomS3R -> BLE -> Python (FastAPI & Bleak) - [Fragmentary code] The first code that successfully retrieved the BLE Characteristic for me in Windows.
import asyncio
import logging
import logging.config
import uuid
from bleak import BleakClient, BleakScanner
from bleak.backends.characteristic import BleakGATTCharacteristic
from bleak.backends.device import BLEDevice
from uvicorn.config import LOGGING_CONFIG
logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.getLogger("uvicorn.BLEScan")
BLE_DEVICE_NAME = "ACObservator"
BLE_SERVICE_UUID = uuid.UUID("0000181a-0000-1000-8000-00805f9b34fb")
BLE_SERVICE_DATA_UUID = uuid.UUID("0000855b-0000-1000-8000-00805f9b34fb")
BLE_CHARACTERISTIC_VOLTAGE_UUID = uuid.UUID("00002b18-0000-1000-8000-00805f9b34fb")
BLE_CHARACTERISTIC_FREQ_UUID = uuid.UUID("00002be8-0000-1000-8000-00805f9b34fb")
async def find_device() -> BLEDevice | None:
advertisement_data_store = {}
def callback(device, advertisement_data):
if device.name == BLE_DEVICE_NAME:
advertisement_data_store[device.address] = advertisement_data
scanner = BleakScanner(detection_callback=callback)
await scanner.start()
device = await BleakScanner.find_device_by_name(BLE_DEVICE_NAME, timeout=5)
await scanner.stop()
if device:
advertisement_data = advertisement_data_store.get(device.address)
if advertisement_data:
value = advertisement_data.service_data.get(str(BLE_SERVICE_DATA_UUID))
if value:
return device
return None
def has_matched_service(client: BleakClient) -> bool:
for service in client.services:
logger.debug(f"[Service] {service.handle} {service}, {service.uuid}")
for characteristic in service.characteristics:
logger.debug(
f" - [Characteristic] {characteristic.handle} - {characteristic.description},"
f" {characteristic.uuid} | props: {characteristic.properties}"
)
service = client.services.get_service(BLE_SERVICE_UUID)
if service:
characteristic_voltage = service.get_characteristic(BLE_CHARACTERISTIC_VOLTAGE_UUID)
characteristic_freq = service.get_characteristic(BLE_CHARACTERISTIC_FREQ_UUID)
if (
characteristic_voltage
and characteristic_freq
and "notify" in characteristic_voltage.properties
and "notify" in characteristic_freq.properties
):
return True
return False
async def notification_handler(sender: BleakGATTCharacteristic, data: bytearray) -> None:
"""
sender: CharacteristicのUUID
data: 通知データ(バイト列)
"""
print(f"Notify from {sender}: {data.hex()}") # データを16進数で表示
async def bleScanTask(stop_event) -> None:
logger.info("BLE Scan Task Started")
while not stop_event.is_set():
device = await find_device()
if not device:
await asyncio.sleep(30)
continue
else:
logger.info(f"device found: {device.name} ({device.address})")
logger.info(f"connecting: {device.name} ({device.address})")
async with BleakClient(device, timeout=10.0, winrt={"use_cached_services": False}) as client:
if client.is_connected and has_matched_service(client):
logger.info(f"connected: {device.name} ({device.address})")
logger.info(f"start notify: {device.name} ({device.address})")
await client.start_notify(BLE_CHARACTERISTIC_VOLTAGE_UUID, notification_handler)
await client.start_notify(BLE_CHARACTERISTIC_FREQ_UUID, notification_handler)
while not stop_event.is_set() and client.is_connected:
await asyncio.sleep(1)
await client.stop_notify(BLE_CHARACTERISTIC_VOLTAGE_UUID)
await client.stop_notify(BLE_CHARACTERISTIC_FREQ_UUID)
logger.info("finished notify: {device.name} ({device.address})")
logger.info(f"disconnected: {device.name} ({device.address})")
logger.info("BLE Scan Task Finished")
#include <Arduino.h>
#include <BLE2902.h>
#include <BLEDevice.h>
#include <BLEServer.h>
#include <BLEUtils.h>
#include <M5AtomS3.h>
#include <cmath>
#include <cstring>
#include "HLW8032.h"
#include "config.h"
#include "freqcount.h"
#define BLE_SERVICE_UUID "0000181a-0000-1000-8000-00805f9b34fb"
#define BLE_CHARACTERISTIC_VOLTAGE_UUID "00002b18-0000-1000-8000-00805f9b34fb"
#define BLE_CHARACTERISTIC_FREQ_UUID "00002be8-0000-1000-8000-00805f9b34fb"
M5GFX lcd;
M5Canvas canvas(&M5.Lcd);
FreqCountIRQ<PIN_IN_FREQ> freq_count;
HLW8032 sensor;
BLEServer *pServer = nullptr;
BLECharacteristic *pCharacteristicVoltage = nullptr;
BLECharacteristic *pCharacteristicFreq = nullptr;
BLEAdvertising *pAdvertising = nullptr;
bool deviceConnected = false;
int32_t deviceCount = 0;
class BLECallbacks : public BLEServerCallbacks {
void onConnect(BLEServer *pBLEServer) override {
deviceConnected = true;
deviceCount++;
Serial.println("Device connected");
};
void onDisconnect(BLEServer *pBLEServer) override {
deviceConnected = false;
deviceCount--;
Serial.println("Device disconnected");
}
};
void updateDisplay() {
char buff[20];
static uint8_t seq;
bool freqIsValid = freq_count.update();
double freq = (double)(freq_count.get_observed_frequency() * 0.9999724008 + 0.0000109997);
double voltage = sensor.getEffectiveVoltage();
if (std::isnan(freq)) {
freq = 0.0;
}
if (std::isnan(voltage)) {
voltage = 0.0;
}
canvas.clear(BLACK);
canvas.setTextFont(4);
canvas.setTextColor(GREENYELLOW, BLACK);
dtostrf(voltage, 3, 3, buff);
canvas.drawRightString(buff, 100, 0);
canvas.drawString("V", 100, 0);
if (freqIsValid) {
canvas.setTextColor(ORANGE, BLACK);
dtostrf(freq, 2, 3, buff);
canvas.drawRightString(buff, 100, 20);
canvas.drawString("Hz", 100, 20);
} else {
canvas.setTextColor(ORANGE, BLACK);
canvas.drawRightString("--.---", 100, 20);
canvas.drawString("Hz", 100, 20);
}
// canvas.setTextFont(2);
if (deviceConnected) {
canvas.setTextColor(RED, BLACK);
ltoa(deviceCount, buff, 10);
canvas.drawRightString(buff, 100, 40);
canvas.drawString("CN", 100, 40);
std::string strNotifyDataVoltage = "";
strNotifyDataVoltage.append(reinterpret_cast<const char *>(&voltage), sizeof(double));
pCharacteristicVoltage->setValue(strNotifyDataVoltage);
pCharacteristicVoltage->notify();
std::string strNotifyDataFreq = "";
strNotifyDataFreq.append(reinterpret_cast<const char *>(&freq), sizeof(double));
pCharacteristicFreq->setValue(strNotifyDataFreq);
pCharacteristicFreq->notify();
} else {
canvas.setTextColor(RED, BLACK);
ltoa(deviceCount, buff, 10);
canvas.drawRightString(buff, 100, 40);
canvas.drawString("CN", 100, 40);
BLEAdvertisementData advertisementData = BLEAdvertisementData();
advertisementData.setFlags(0x06); // BR_EDR_NOT_SUPPORTED | General Discoverable Mode
std::string strServiceData = "";
strServiceData += (char)0x04; // 長さ(4Byte)
strServiceData += (char)0x16; // Type 0x16: Service Data
strServiceData += (char)0x5b; // Service Data UUID
strServiceData += (char)0x85; // Service Data UUID
strServiceData += (char)seq++;
// strServiceData.append(reinterpret_cast<const char *>(&voltage), sizeof(float));
// strServiceData.append(reinterpret_cast<const char *>(&freq), sizeof(float));
advertisementData.addData(strServiceData);
pAdvertising->setAdvertisementData(advertisementData);
}
canvas.pushSprite(0, 0);
}
void setup() {
auto config = M5.config();
AtomS3.begin(config);
Serial.begin(115200);
M5.Lcd.init();
BLEDevice::init("ACObservator");
pServer = BLEDevice::createServer();
pServer->setCallbacks(new BLECallbacks());
BLEService *pService = pServer->createService(BLE_SERVICE_UUID);
pCharacteristicVoltage =
pService->createCharacteristic(BLE_CHARACTERISTIC_VOLTAGE_UUID, BLECharacteristic::PROPERTY_NOTIFY);
pCharacteristicVoltage->addDescriptor(new BLE2902());
pCharacteristicFreq =
pService->createCharacteristic(BLE_CHARACTERISTIC_FREQ_UUID, BLECharacteristic::PROPERTY_NOTIFY);
pCharacteristicFreq->addDescriptor(new BLE2902());
pService->start();
pAdvertising = BLEDevice::getAdvertising();
pAdvertising->addServiceUUID(BLE_SERVICE_UUID);
pAdvertising->setScanResponse(true);
pAdvertising->setMinPreferred(0x06);
pAdvertising->setMinPreferred(0x12);
canvas.setColorDepth(8);
canvas.createSprite(M5.Lcd.width(), M5.Lcd.height());
pinMode(PIN_IN_FREQ, INPUT);
pinMode(PIN_IN_SD, INPUT);
pinMode(PIN_IN_PF, INPUT);
Serial1.begin(4800, SERIAL_8E1, PIN_IN_SD);
freq_count.begin();
canvas.setTextSize(1);
}
void loop() {
AtomS3.update();
while (Serial1.available() > 0) {
sensor.processData(Serial1.read());
}
updateDisplay();
if (!deviceConnected) {
pAdvertising->start();
delay(1000);
pAdvertising->stop();
} else {
delay(1000);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment