Last active
November 24, 2024 15:46
-
-
Save nanase/988a31cc505e0ccf7c1fca9e5cef2b31 to your computer and use it in GitHub Desktop.
M5AtomS3R -> BLE -> Python (FastAPI & Bleak) - [Fragmentary code] The first code that successfully retrieved the BLE Characteristic for me in Windows.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import logging | |
import logging.config | |
import uuid | |
from bleak import BleakClient, BleakScanner | |
from bleak.backends.characteristic import BleakGATTCharacteristic | |
from bleak.backends.device import BLEDevice | |
from uvicorn.config import LOGGING_CONFIG | |
logging.config.dictConfig(LOGGING_CONFIG) | |
logger = logging.getLogger("uvicorn.BLEScan") | |
BLE_DEVICE_NAME = "ACObservator" | |
BLE_SERVICE_UUID = uuid.UUID("0000181a-0000-1000-8000-00805f9b34fb") | |
BLE_SERVICE_DATA_UUID = uuid.UUID("0000855b-0000-1000-8000-00805f9b34fb") | |
BLE_CHARACTERISTIC_VOLTAGE_UUID = uuid.UUID("00002b18-0000-1000-8000-00805f9b34fb") | |
BLE_CHARACTERISTIC_FREQ_UUID = uuid.UUID("00002be8-0000-1000-8000-00805f9b34fb") | |
async def find_device() -> BLEDevice | None: | |
advertisement_data_store = {} | |
def callback(device, advertisement_data): | |
if device.name == BLE_DEVICE_NAME: | |
advertisement_data_store[device.address] = advertisement_data | |
scanner = BleakScanner(detection_callback=callback) | |
await scanner.start() | |
device = await BleakScanner.find_device_by_name(BLE_DEVICE_NAME, timeout=5) | |
await scanner.stop() | |
if device: | |
advertisement_data = advertisement_data_store.get(device.address) | |
if advertisement_data: | |
value = advertisement_data.service_data.get(str(BLE_SERVICE_DATA_UUID)) | |
if value: | |
return device | |
return None | |
def has_matched_service(client: BleakClient) -> bool: | |
for service in client.services: | |
logger.debug(f"[Service] {service.handle} {service}, {service.uuid}") | |
for characteristic in service.characteristics: | |
logger.debug( | |
f" - [Characteristic] {characteristic.handle} - {characteristic.description}," | |
f" {characteristic.uuid} | props: {characteristic.properties}" | |
) | |
service = client.services.get_service(BLE_SERVICE_UUID) | |
if service: | |
characteristic_voltage = service.get_characteristic(BLE_CHARACTERISTIC_VOLTAGE_UUID) | |
characteristic_freq = service.get_characteristic(BLE_CHARACTERISTIC_FREQ_UUID) | |
if ( | |
characteristic_voltage | |
and characteristic_freq | |
and "notify" in characteristic_voltage.properties | |
and "notify" in characteristic_freq.properties | |
): | |
return True | |
return False | |
async def notification_handler(sender: BleakGATTCharacteristic, data: bytearray) -> None: | |
""" | |
sender: CharacteristicのUUID | |
data: 通知データ(バイト列) | |
""" | |
print(f"Notify from {sender}: {data.hex()}") # データを16進数で表示 | |
async def bleScanTask(stop_event) -> None: | |
logger.info("BLE Scan Task Started") | |
while not stop_event.is_set(): | |
device = await find_device() | |
if not device: | |
await asyncio.sleep(30) | |
continue | |
else: | |
logger.info(f"device found: {device.name} ({device.address})") | |
logger.info(f"connecting: {device.name} ({device.address})") | |
async with BleakClient(device, timeout=10.0, winrt={"use_cached_services": False}) as client: | |
if client.is_connected and has_matched_service(client): | |
logger.info(f"connected: {device.name} ({device.address})") | |
logger.info(f"start notify: {device.name} ({device.address})") | |
await client.start_notify(BLE_CHARACTERISTIC_VOLTAGE_UUID, notification_handler) | |
await client.start_notify(BLE_CHARACTERISTIC_FREQ_UUID, notification_handler) | |
while not stop_event.is_set() and client.is_connected: | |
await asyncio.sleep(1) | |
await client.stop_notify(BLE_CHARACTERISTIC_VOLTAGE_UUID) | |
await client.stop_notify(BLE_CHARACTERISTIC_FREQ_UUID) | |
logger.info("finished notify: {device.name} ({device.address})") | |
logger.info(f"disconnected: {device.name} ({device.address})") | |
logger.info("BLE Scan Task Finished") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <Arduino.h> | |
#include <BLE2902.h> | |
#include <BLEDevice.h> | |
#include <BLEServer.h> | |
#include <BLEUtils.h> | |
#include <M5AtomS3.h> | |
#include <cmath> | |
#include <cstring> | |
#include "HLW8032.h" | |
#include "config.h" | |
#include "freqcount.h" | |
#define BLE_SERVICE_UUID "0000181a-0000-1000-8000-00805f9b34fb" | |
#define BLE_CHARACTERISTIC_VOLTAGE_UUID "00002b18-0000-1000-8000-00805f9b34fb" | |
#define BLE_CHARACTERISTIC_FREQ_UUID "00002be8-0000-1000-8000-00805f9b34fb" | |
M5GFX lcd; | |
M5Canvas canvas(&M5.Lcd); | |
FreqCountIRQ<PIN_IN_FREQ> freq_count; | |
HLW8032 sensor; | |
BLEServer *pServer = nullptr; | |
BLECharacteristic *pCharacteristicVoltage = nullptr; | |
BLECharacteristic *pCharacteristicFreq = nullptr; | |
BLEAdvertising *pAdvertising = nullptr; | |
bool deviceConnected = false; | |
int32_t deviceCount = 0; | |
class BLECallbacks : public BLEServerCallbacks { | |
void onConnect(BLEServer *pBLEServer) override { | |
deviceConnected = true; | |
deviceCount++; | |
Serial.println("Device connected"); | |
}; | |
void onDisconnect(BLEServer *pBLEServer) override { | |
deviceConnected = false; | |
deviceCount--; | |
Serial.println("Device disconnected"); | |
} | |
}; | |
void updateDisplay() { | |
char buff[20]; | |
static uint8_t seq; | |
bool freqIsValid = freq_count.update(); | |
double freq = (double)(freq_count.get_observed_frequency() * 0.9999724008 + 0.0000109997); | |
double voltage = sensor.getEffectiveVoltage(); | |
if (std::isnan(freq)) { | |
freq = 0.0; | |
} | |
if (std::isnan(voltage)) { | |
voltage = 0.0; | |
} | |
canvas.clear(BLACK); | |
canvas.setTextFont(4); | |
canvas.setTextColor(GREENYELLOW, BLACK); | |
dtostrf(voltage, 3, 3, buff); | |
canvas.drawRightString(buff, 100, 0); | |
canvas.drawString("V", 100, 0); | |
if (freqIsValid) { | |
canvas.setTextColor(ORANGE, BLACK); | |
dtostrf(freq, 2, 3, buff); | |
canvas.drawRightString(buff, 100, 20); | |
canvas.drawString("Hz", 100, 20); | |
} else { | |
canvas.setTextColor(ORANGE, BLACK); | |
canvas.drawRightString("--.---", 100, 20); | |
canvas.drawString("Hz", 100, 20); | |
} | |
// canvas.setTextFont(2); | |
if (deviceConnected) { | |
canvas.setTextColor(RED, BLACK); | |
ltoa(deviceCount, buff, 10); | |
canvas.drawRightString(buff, 100, 40); | |
canvas.drawString("CN", 100, 40); | |
std::string strNotifyDataVoltage = ""; | |
strNotifyDataVoltage.append(reinterpret_cast<const char *>(&voltage), sizeof(double)); | |
pCharacteristicVoltage->setValue(strNotifyDataVoltage); | |
pCharacteristicVoltage->notify(); | |
std::string strNotifyDataFreq = ""; | |
strNotifyDataFreq.append(reinterpret_cast<const char *>(&freq), sizeof(double)); | |
pCharacteristicFreq->setValue(strNotifyDataFreq); | |
pCharacteristicFreq->notify(); | |
} else { | |
canvas.setTextColor(RED, BLACK); | |
ltoa(deviceCount, buff, 10); | |
canvas.drawRightString(buff, 100, 40); | |
canvas.drawString("CN", 100, 40); | |
BLEAdvertisementData advertisementData = BLEAdvertisementData(); | |
advertisementData.setFlags(0x06); // BR_EDR_NOT_SUPPORTED | General Discoverable Mode | |
std::string strServiceData = ""; | |
strServiceData += (char)0x04; // 長さ(4Byte) | |
strServiceData += (char)0x16; // Type 0x16: Service Data | |
strServiceData += (char)0x5b; // Service Data UUID | |
strServiceData += (char)0x85; // Service Data UUID | |
strServiceData += (char)seq++; | |
// strServiceData.append(reinterpret_cast<const char *>(&voltage), sizeof(float)); | |
// strServiceData.append(reinterpret_cast<const char *>(&freq), sizeof(float)); | |
advertisementData.addData(strServiceData); | |
pAdvertising->setAdvertisementData(advertisementData); | |
} | |
canvas.pushSprite(0, 0); | |
} | |
void setup() { | |
auto config = M5.config(); | |
AtomS3.begin(config); | |
Serial.begin(115200); | |
M5.Lcd.init(); | |
BLEDevice::init("ACObservator"); | |
pServer = BLEDevice::createServer(); | |
pServer->setCallbacks(new BLECallbacks()); | |
BLEService *pService = pServer->createService(BLE_SERVICE_UUID); | |
pCharacteristicVoltage = | |
pService->createCharacteristic(BLE_CHARACTERISTIC_VOLTAGE_UUID, BLECharacteristic::PROPERTY_NOTIFY); | |
pCharacteristicVoltage->addDescriptor(new BLE2902()); | |
pCharacteristicFreq = | |
pService->createCharacteristic(BLE_CHARACTERISTIC_FREQ_UUID, BLECharacteristic::PROPERTY_NOTIFY); | |
pCharacteristicFreq->addDescriptor(new BLE2902()); | |
pService->start(); | |
pAdvertising = BLEDevice::getAdvertising(); | |
pAdvertising->addServiceUUID(BLE_SERVICE_UUID); | |
pAdvertising->setScanResponse(true); | |
pAdvertising->setMinPreferred(0x06); | |
pAdvertising->setMinPreferred(0x12); | |
canvas.setColorDepth(8); | |
canvas.createSprite(M5.Lcd.width(), M5.Lcd.height()); | |
pinMode(PIN_IN_FREQ, INPUT); | |
pinMode(PIN_IN_SD, INPUT); | |
pinMode(PIN_IN_PF, INPUT); | |
Serial1.begin(4800, SERIAL_8E1, PIN_IN_SD); | |
freq_count.begin(); | |
canvas.setTextSize(1); | |
} | |
void loop() { | |
AtomS3.update(); | |
while (Serial1.available() > 0) { | |
sensor.processData(Serial1.read()); | |
} | |
updateDisplay(); | |
if (!deviceConnected) { | |
pAdvertising->start(); | |
delay(1000); | |
pAdvertising->stop(); | |
} else { | |
delay(1000); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment