Last active
July 19, 2022 15:20
-
-
Save trueroad/9459958c3a876e8e26520ead776b5392 to your computer and use it in GitHub Desktop.
Scan BLE MIDI Peripherals by python bleak
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
""" | |
Scan BLE MIDI Peripherals with python bleak. | |
https://gist.github.com/trueroad/52b7c4c98eec5fdf0ff3f62d64ec17bd | |
Copyright (C) 2022 Masamichi Hosoda. | |
All rights reserved. | |
Redistribution and use in source and binary forms, with or without | |
modification, are permitted provided that the following conditions | |
are met: | |
* Redistributions of source code must retain the above copyright notice, | |
this list of conditions and the following disclaimer. | |
* Redistributions in binary form must reproduce the above copyright notice, | |
this list of conditions and the following disclaimer in the documentation | |
and/or other materials provided with the distribution. | |
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" | |
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE | |
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE | |
ARE DISCLAIMED. | |
IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE | |
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL | |
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS | |
OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) | |
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT | |
LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY | |
OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF | |
SUCH DAMAGE. | |
""" | |
from __future__ import annotations | |
import asyncio | |
import contextlib | |
import sys | |
from typing import (Any, Callable, Dict, Final, List, | |
Optional, Tuple, Union, cast) | |
from bleak import BleakScanner # type: ignore | |
from bleak.backends.device import BLEDevice # type: ignore | |
from bleak.backends.scanner import AdvertisementData # type: ignore | |
# 検索する BLE MIDI の SERVICE UUID | |
SERVICE_UUID: Final[str] = '03b80e5a-ede8-4b33-a751-6ce34ec4c700' | |
class scan_ble_midi(): | |
"""Scan BLE MIDI Peripherals class.""" | |
def __init__(self, | |
min_scan_time: float = 1.0, max_scan_time: float = 60.0, | |
callback: Optional[Callable[[scan_ble_midi, | |
str, | |
Optional[str], | |
int], | |
None]] = None | |
) -> None: | |
""" | |
__init__. | |
Args: | |
min_scan_time (float): 最低スキャン時間(単位:秒) | |
スキャン終了要求されても | |
最低限ここで設定した時間はスキャンを続ける。 | |
max_scan_time (float): 最大スキャン時間(単位:秒) | |
スキャン終了要求がなくても | |
ここで設定した時間が経過したらスキャン終了する。 | |
callback (Callable[[scan_ble_midi, str, Optional[str], str], None): | |
BLE MIDI ペリフェラルが見つかったら呼ばれるコールバック関数。 | |
同じペリフェラルについて複数回呼ばれることもある。 | |
""" | |
self.min_scan_time: Final[float] = min_scan_time | |
self.max_scan_time: Final[float] = max_scan_time | |
self.callback: Final[Optional[Callable[[scan_ble_midi, | |
str, | |
Optional[str], | |
int], | |
None]]] = callback | |
# 検出したデバイス保存用 | |
self.devices: Dict[str, Tuple[Optional[str], int]] = {} | |
# スキャン終了要求イベント | |
self.event: asyncio.Event = asyncio.Event() | |
def stop(self) -> None: | |
""" | |
スキャン終了要求. | |
スキャン終了要求するときに呼び出す。 | |
最低スキャン時間が経過していない場合は経過するまで終了しない。 | |
""" | |
self.event.set() | |
def __detection_callback(self, | |
device: BLEDevice, | |
advertisement_data: AdvertisementData) -> None: | |
"""内部検出コールバック.""" | |
address: str = device.address | |
name: Optional[str] = device.name | |
rssi: int = device.rssi | |
metadata: Dict[str, Union[List[str], | |
Dict[Any, Any]]] = device.metadata | |
uuids: List[str] = cast(List[str], metadata['uuids']) | |
# BLE MIDI の UUID か否か判定 | |
if SERVICE_UUID in uuids: | |
# BLE MIDI なので保存・更新する | |
self.devices[address] = (name, rssi) | |
if self.callback is not None: | |
self.callback(self, address, name, rssi) | |
async def scan(self) -> Dict[str, Tuple[Optional[str], int]]: | |
""" | |
BLE MIDI ペリフェラルを見つける. | |
Returns: | |
Dict: | |
str: BLE MIDI ペリフェラルのアドレス | |
Tuple: | |
str: BLE MIDI ペリフェラルの名前 | |
int: RSSI(単位:dBm ?) | |
""" | |
# 終了要求をクリア | |
self.event.clear() | |
scanner: BleakScanner = BleakScanner() | |
scanner.register_detection_callback(self.__detection_callback) | |
await scanner.start() # スキャン開始 | |
# 最低スキャン時間待つ | |
await asyncio.sleep(self.min_scan_time) | |
if self.max_scan_time > self.min_scan_time: | |
# 最大スキャン時間経過するまで終了要求を待つ | |
# タイムアウトの例外は抑制する | |
with contextlib.suppress(asyncio.TimeoutError): | |
await asyncio.wait_for(self.event.wait(), | |
self.max_scan_time - self.min_scan_time) | |
# TODO: Ctrl+C などで抜けたときにスキャン終了しないとおかしくなる? | |
await scanner.stop() # スキャン終了 | |
return self.devices | |
def found_callback(sbm: scan_ble_midi, | |
address: str, name: Optional[str], rssi: int | |
) -> None: | |
"""BLE MIDI ペリフェラル発見コールバックの例.""" | |
print(f' found: {address}, {name}, {rssi}', file=sys.stderr, flush=True) | |
# 一つでも発見したらスキャン終了要求する | |
sbm.stop() | |
# 動作メモ | |
# YAMAHA MD-BT01 は電源投入後十数秒間はアドバタイズし、 | |
# その間はスキャンにかかるが、以降はアドバタイズしなくなり、 | |
# スキャンで見つからずコネクション要求も受け付けなくなる模様。 | |
# 電源 OFF/ON すれば元に戻る。 | |
# CME WIDI Master はアドバタイズする期間としない期間が | |
# 数秒~十数秒単位ぐらいで交互に入れ替わっている模様。 | |
# アドバタイズする期間だとスキャンですぐにみつかるし、 | |
# コネクション要求もすぐに受け付けられるが、しない期間だと待たされる。 | |
# MD-BT01 とは異なり電源投入後かなり時間が経っても | |
# どこかと接続していない限りは反応する。 | |
# ただし、どこかと接続している | |
# (勝手にセントラルとして接続に行く動作があり、自分からは切断しない) | |
# とアドバタイズしない、コネクション要求を受け付けない、 | |
# セントラルとしてどこかに接続に行くこともなくなる。 | |
# やはり電源 OFF/ON すれば元に戻る(セントラルになっていても切断できる)。 | |
def main() -> None: | |
"""Test main.""" | |
print('Scan BLE MIDI Peripherals with python bleak\n\n' | |
'https://gist.github.com/trueroad/' | |
'52b7c4c98eec5fdf0ff3f62d64ec17bd\n\n' | |
'Copyright (C) 2022 Masamichi Hosoda.\n' | |
'All rights reserved.\n', file=sys.stderr) | |
sbm: scan_ble_midi = scan_ble_midi(callback=found_callback) | |
loop: asyncio.AbstractEventLoop = asyncio.get_event_loop() | |
print('Scanning...', file=sys.stderr) | |
devices: Dict[str, Tuple[Optional[str], int]] = \ | |
loop.run_until_complete(sbm.scan()) | |
print('Complete', file=sys.stderr) | |
# 見つけた BLE MIDI ペリフェラルを TSV で標準出力へ書き出す。 | |
# これで得られたアドレスは | |
# Recieve from BLE MIDI Peripherals with python bleak. | |
# https://gist.github.com/trueroad/fdb0a450c1699d67fe02f4a31b44c895 | |
# で利用できる。 | |
print('Address\tName\tRSSI') | |
address: str | |
for address in devices: | |
print(f'{address}\t{devices[address][0]}' | |
f'\t{devices[address][1]}') | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment