Last active
July 19, 2022 15:46
-
-
Save trueroad/fdb0a450c1699d67fe02f4a31b44c895 to your computer and use it in GitHub Desktop.
Recieve from BLE MIDI Peripherals with python bleak
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
""" | |
Recieve from BLE MIDI Peripherals with python bleak. | |
https://gist.github.com/trueroad/fdb0a450c1699d67fe02f4a31b44c895 | |
Copyright (C) 2022 Masamichi Hosoda. | |
All rights reserved. | |
Redistribution and use in source and binary forms, with or without | |
modification, are permitted provided that the following conditions | |
are met: | |
* Redistributions of source code must retain the above copyright notice, | |
this list of conditions and the following disclaimer. | |
* Redistributions in binary form must reproduce the above copyright notice, | |
this list of conditions and the following disclaimer in the documentation | |
and/or other materials provided with the distribution. | |
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" | |
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE | |
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE | |
ARE DISCLAIMED. | |
IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE | |
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL | |
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS | |
OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) | |
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT | |
LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY | |
OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF | |
SUCH DAMAGE. | |
""" | |
import asyncio | |
import json | |
import sys | |
import time | |
from typing import Callable, Dict, Final, List, Optional, Union, cast | |
from bleak import BleakClient # type: ignore | |
# BLE MIDI の CHARACTERISTIC UUID | |
CHARACTERISTIC_UUID: Final[str] = '7772e5db-3868-4112-a1a9-f2669d106bf3' | |
class ble_midi_client(): | |
"""BLE MIDI クライアントクラス.""" | |
def __init__(self, | |
address: str, | |
callback: Optional[Callable[[int, bytearray], None]] = None | |
) -> None: | |
""" | |
__init__. | |
Args: | |
address (str): 接続先となる BLE MIDI ペリフェラルのアドレス | |
callback (Optional[Callable[[int, bytearray], None]]): | |
受信データが来たら呼ばれるコールバック | |
""" | |
self.client: Final[BleakClient] = BleakClient(address) | |
self.callback: Final[Optional[Callable[[int, bytearray], None]]] = \ | |
callback | |
async def connect(self) -> bool: | |
""" | |
接続する. | |
Returns: | |
bool: 接続したか否か | |
True なら接続した、False なら接続していない | |
""" | |
await self.client.connect() | |
return cast(bool, self.client.is_connected) | |
async def start_notify(self) -> None: | |
"""受信データ存在通知コールバックを開始する.""" | |
if self.callback is not None: | |
await self.client.start_notify(CHARACTERISTIC_UUID, | |
self.callback) | |
async def stop_notify(self) -> None: | |
"""受信データ存在通知コールバックを終了する.""" | |
if self.callback is not None: | |
await self.client.stop_notify(CHARACTERISTIC_UUID) | |
async def disconnect(self) -> None: | |
"""切断する.""" | |
await self.client.disconnect() | |
def recieve_callback(sender: int, data: bytearray) -> None: | |
""" | |
受信データ存在通知コールバックの例. | |
ナノ秒単位の時刻と受信データそのものを JSONL として標準出力に書き込む。 | |
リダイレクトしてファイルに保存することにより、 | |
Emulate BLE MIDI Peripherals with python bless. | |
https://gist.github.com/trueroad/aa2507bf64c97f34aab324d8278b5686 | |
の入力ファイルに使用したり、 | |
BLE MIDI JSONL time converter (ns to ms). | |
https://gist.github.com/trueroad/2626654d4ca5d0c5bf44c32837dfc53a | |
の変換元ファイルとして使用することができる。 | |
Args: | |
sender (int): ? | |
data (bytearray): 受信データ | |
""" | |
ns: int = time.perf_counter_ns() | |
d: Dict[str, Union[int, List[int]]] = {} | |
d['ns'] = ns | |
d['data'] = list(data) | |
print(json.dumps(d), flush=True) | |
async def sleep_forever() -> None: | |
"""無限に待つ.""" | |
while True: | |
await asyncio.sleep(1.0) | |
# 動作メモ | |
# YAMAHA MD-BT01 は電源投入後十数秒間はアドバタイズし、 | |
# その間はスキャンにかかるが、以降はアドバタイズしなくなり、 | |
# スキャンで見つからずコネクション要求も受け付けなくなる模様。 | |
# 電源 OFF/ON すれば元に戻る。 | |
# CME WIDI Master はアドバタイズする期間としない期間が | |
# 数秒~十数秒単位ぐらいで交互に入れ替わっている模様。 | |
# アドバタイズする期間だとスキャンですぐにみつかるし、 | |
# コネクション要求もすぐに受け付けられるが、しない期間だと待たされる。 | |
# MD-BT01 とは異なり電源投入後かなり時間が経っても | |
# どこかと接続していない限りは反応する。 | |
# ただし、どこかと接続している | |
# (勝手にセントラルとして接続に行く動作があり、自分からは切断しない) | |
# とアドバタイズしない、コネクション要求を受け付けない、 | |
# セントラルとしてどこかに接続に行くこともなくなる。 | |
# やはり電源 OFF/ON すれば元に戻る(セントラルになっていても切断できる)。 | |
# python bleak は中途半端な状態で python を終了させて、 | |
# 再度起動すると python プロセスが刺さる(ハングアップ?)することがある。 | |
# タスクマネージャで python.exe を強制終了すればとりあえず戻ってこれるが、 | |
# 再度実行するとまた刺さってしまったり、さらに変な状態に陥ることがある。 | |
# Bluetooth ドングルなら抜き差しすればとりあえず元に戻る。 | |
# (内蔵 Bluetooth なら Windows 10 の Bluetooth 設定を OFF/ON すればよい?) | |
# また、中途半端な状態で python が終了しないようにしておくとよさそう。 | |
# try ~ finally でなんとかしているがデストラクタも活用すべきか。 | |
def main() -> None: | |
"""Test main.""" | |
print('Recieve from BLE MIDI Peripherals with python bleak\n\n' | |
'https://gist.github.com/trueroad/' | |
'fdb0a450c1699d67fe02f4a31b44c895\n\n' | |
'Copyright (C) 2022 Masamichi Hosoda.\n' | |
'All rights reserved.\n', file=sys.stderr) | |
if len(sys.argv) != 2: | |
# ADDRESS には | |
# Scan BLE MIDI Peripherals with python bleak. | |
# https://gist.github.com/trueroad/9459958c3a876e8e26520ead776b5392 | |
# で得られた文字列を使う。 | |
print('Usage: python recv_ble_midi.py [ADDRESS]', | |
file=sys.stdout) | |
return | |
bmc: ble_midi_client = ble_midi_client(address=sys.argv[1], | |
callback=recieve_callback) | |
loop: asyncio.AbstractEventLoop = asyncio.get_event_loop() | |
print('Connecting...', file=sys.stderr) | |
if not loop.run_until_complete(bmc.connect()): | |
print('Failed', file=sys.stderr) | |
return | |
print('Strting notify...', file=sys.stderr) | |
loop.run_until_complete(bmc.start_notify()) | |
print('Started, Ctrl+C to stop...', file=sys.stderr) | |
try: | |
loop.run_until_complete(sleep_forever()) | |
except KeyboardInterrupt: | |
print('Interrupted, stopping notify...', file=sys.stderr) | |
finally: | |
loop.run_until_complete(bmc.stop_notify()) | |
print('Notify stopped, disconnecting...', file=sys.stderr) | |
loop.run_until_complete(bmc.disconnect()) | |
print('Disconnected', file=sys.stderr) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment