Last active
September 12, 2022 13:32
-
-
Save trueroad/aa2507bf64c97f34aab324d8278b5686 to your computer and use it in GitHub Desktop.
Emulate BLE MIDI Peripherals with python bless
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
""" | |
Emulate BLE-MIDI Peripherals with python bless. | |
https://gist.github.com/trueroad/aa2507bf64c97f34aab324d8278b5686 | |
Copyright (C) 2022 Masamichi Hosoda. | |
All rights reserved. | |
Redistribution and use in source and binary forms, with or without | |
modification, are permitted provided that the following conditions | |
are met: | |
* Redistributions of source code must retain the above copyright notice, | |
this list of conditions and the following disclaimer. | |
* Redistributions in binary form must reproduce the above copyright notice, | |
this list of conditions and the following disclaimer in the documentation | |
and/or other materials provided with the distribution. | |
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" | |
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE | |
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE | |
ARE DISCLAIMED. | |
IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE | |
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL | |
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS | |
OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) | |
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT | |
LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY | |
OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF | |
SUCH DAMAGE. | |
""" | |
import asyncio | |
import json | |
import sys | |
import time | |
from typing import Any, Dict, Final, List, Optional, TextIO, Union, cast | |
from bless import ( # type: ignore | |
BlessServer, | |
BlessGATTCharacteristic, | |
GATTCharacteristicProperties, | |
GATTAttributePermissions | |
) | |
# サービス名 | |
SERVICE_NAME: Final[str] = 'Emulate BLE-MIDI' | |
# BLE-MIDI の SERVICE UUID | |
SERVICE_UUID: Final[str] = '03b80e5a-ede8-4b33-a751-6ce34ec4c700' | |
# BLE-MIDI の CHARACTERISTIC UUID | |
CHARACTERISTIC_UUID: Final[str] = '7772e5db-3868-4112-a1a9-f2669d106bf3' | |
class ble_midi_server(): | |
"""BLE-MIDI サーバクラス.""" | |
def __init__(self) -> None: | |
"""__init__.""" | |
self.server: BlessServer = BlessServer(name=SERVICE_NAME) | |
self.server.read_request_func = self.__read_request | |
self.server.write_request_func = self.__write_request | |
self.characteristic: Optional[BlessGATTCharacteristic] = None | |
def __read_request(self, characteristic: BlessGATTCharacteristic, | |
**kwargs: Any | |
) -> bytearray: | |
"""Read request.""" | |
print(f'Reading {characteristic.value}', file=sys.stderr) | |
return cast(bytearray, characteristic.value) | |
def __write_request(self, characteristic: BlessGATTCharacteristic, | |
value: Any, | |
**kwargs: Any | |
) -> None: | |
"""Write request.""" | |
ns: int = time.perf_counter_ns() | |
d: Dict[str, Union[int, List[int]]] = {} | |
d['ns'] = ns | |
d['data'] = list(value) | |
print(json.dumps(d), flush=True) | |
async def prepare(self) -> None: | |
"""Prepare server.""" | |
await self.server.add_new_service(SERVICE_UUID) | |
char_flags: GATTCharacteristicProperties = ( | |
GATTCharacteristicProperties.read | | |
GATTCharacteristicProperties.write | | |
GATTCharacteristicProperties.write_without_response | | |
GATTCharacteristicProperties.notify | |
) | |
permissions: GATTAttributePermissions = ( | |
GATTAttributePermissions.readable | | |
GATTAttributePermissions.writeable | |
) | |
await self.server.add_new_characteristic( | |
SERVICE_UUID, | |
CHARACTERISTIC_UUID, | |
char_flags, | |
None, | |
permissions | |
) | |
async def start(self) -> None: | |
"""Start server.""" | |
await self.server.start() | |
async def wait_connect(self) -> None: | |
"""Wait connect.""" | |
while not await self.server.is_connected(): | |
await asyncio.sleep(1) | |
self.characteristic = \ | |
self.server.get_characteristic(CHARACTERISTIC_UUID) | |
async def stop(self) -> None: | |
"""Stop server.""" | |
await self.server.stop() | |
self.characteristic = None | |
async def send(self, data: bytearray) -> bool: | |
"""Send data.""" | |
if self.characteristic is None: | |
print('Cannot send', file=sys.stderr) | |
return False | |
self.characteristic.value = data | |
return cast(bool, self.server.update_value(SERVICE_UUID, | |
CHARACTERISTIC_UUID)) | |
# 動作メモ | |
# CME WIDI Master はどこにも接続していない場合、 | |
# アドバタイズしている BLE-MIDI ペリフェラルを見つけたら | |
# (恐らくアドバタイズしていない期間に見つけようとしていると思われる) | |
# 問答無用でセントラルになって接続をかけてくる。 | |
# 本スクリプトは BLE-MIDI ペリフェラルとして動作するので、 | |
# 起動して server.start() するといきなり接続してくる動作が見られ、 | |
# WIDI Master へ入力された MIDI メッセージは Write request で得られるし、 | |
# JSONL を MIDI メッセージとして WIDI Master から出力させることができる。 | |
# ただし、server.stop() では切断したことにならず、 | |
# WIDI Master 側はずっと接続しっぱなしの状態が残ってしまうようである。 | |
# そのため、そのまま本スクリプトを 2 回目以降起動しても | |
# 新たに接続してくることはなく server.is_connected() は False のままとなる。 | |
# しかし、入力 MIDI メッセージがあるなら Write request を飛ばしてきて | |
# なぜか本スクリプトでも受信することはできる。 | |
# だが、逆方向の server.update_value() はエラーにならないものの | |
# 送信できておらず MIDI メッセージを出力することができないようである。 | |
# WIDI Master の電源を OFF/ON することで元に戻す(切断する)ことができる。 | |
# python bless は中途半端な状態で python を終了させて、 | |
# 再度起動すると python プロセスが刺さる(ハングアップ?)することがある。 | |
# タスクマネージャで python.exe を強制終了すればとりあえず戻ってこれるが、 | |
# 再度実行するとまた刺さってしまったり、さらに変な状態に陥ることがある。 | |
# Bluetooth ドングルなら抜き差しすればとりあえず元に戻る。 | |
# (内蔵 Bluetooth なら Windows 10 の Bluetooth 設定を OFF/ON すればよい?) | |
# また、中途半端な状態で python が終了しないようにしておくとよさそう。 | |
# try ~ finally でなんとかしているがデストラクタも活用すべきか。 | |
def main() -> None: | |
"""Test main.""" | |
print('Emulate BLE-MIDI Peripherals with python bless\n\n' | |
'https://gist.github.com/trueroad/' | |
'aa2507bf64c97f34aab324d8278b5686\n\n' | |
'Copyright (C) 2022 Masamichi Hosoda.\n' | |
'All rights reserved.\n', file=sys.stderr) | |
if len(sys.argv) != 2: | |
# FILENAME.jsonl ファイルには、 | |
# Recieve from BLE-MIDI Peripherals with python bleak. | |
# https://gist.github.com/trueroad/fdb0a450c1699d67fe02f4a31b44c895 | |
# で得られたファイルや、 | |
# BLE-MIDI JSONL time converter (ms to ns). | |
# https://gist.github.com/trueroad/2626654d4ca5d0c5bf44c32837dfc53a | |
# で変換したファイルが使用できる。 | |
print('Usage: python emu_ble_midi.py [FILENAME.jsonl]', | |
file=sys.stderr) | |
return | |
# JSONL をすべて読み込み、最初の行の ns を保存しておく | |
f: TextIO | |
lines: List[str] = [] | |
ns: int = 0 | |
with open(sys.argv[1], "r") as f: | |
lines = f.readlines() | |
ns = json.loads(lines[0])['ns'] | |
bms: ble_midi_server = ble_midi_server() | |
loop: asyncio.AbstractEventLoop = asyncio.get_event_loop() | |
print('Preparing...', file=sys.stderr) | |
loop.run_until_complete(bms.prepare()) | |
print('Starting...', file=sys.stderr) | |
loop.run_until_complete(bms.start()) | |
print('Waiting for connection...', file=sys.stderr) | |
try: | |
loop.run_until_complete(bms.wait_connect()) | |
print('Connected', file=sys.stderr) | |
loop.run_until_complete(asyncio.sleep(1)) | |
# JSONL を 1 行ずつ読み込んで処理 | |
line: str | |
for line in lines: | |
jd: Dict[str, Any] = json.loads(line) | |
# デルタタイム(単位 ns)を計算 | |
new_ns: int = jd['ns'] | |
delta_ns: int = new_ns - ns | |
ns = new_ns | |
# デルタタイム待つ | |
loop.run_until_complete(asyncio.sleep(delta_ns * 1e-9)) | |
# データ送出 | |
loop.run_until_complete(bms.send( | |
bytearray(jd['data']))) | |
loop.run_until_complete(asyncio.sleep(1)) | |
except KeyboardInterrupt: | |
print('Interrupted', file=sys.stderr) | |
finally: | |
print('Stopping...', file=sys.stderr) | |
loop.run_until_complete(bms.server.stop()) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment