Last active
September 6, 2018 18:00
-
-
Save damouse/1f5ccbb7291184a7dbc5c91b3a93f6ee to your computer and use it in GitHub Desktop.
Bluetoothctl text interface with asyncio in Python3.6
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
A hilarious way to avoid writing DBus implementations by hooking into bluetoothd via the text interface | |
presented by bluetoothctl. You can do anything bluetoothctl can do except see live events. | |
In my case, I needed simple ability to pair with an Xbox One controller in python 3.6 with asyncio, | |
and getting dbus set up made my teeth hurt. This implementation is half serious-- its written to back | |
honest-to-god production software, but I can't in good conscious advise you to ever use it. | |
Tested with bluez 5.41 and 5.55, python3.6, and ubuntu 16.04. The only dependency is pexpect, which you | |
can install with: | |
pip install pexpect | |
Written by damouse in 2018. Licensed under WTFPL. Inspired by egorf at https://gist.github.com/egorf/66d88056a9d703928f93. | |
""" | |
import asyncio | |
import pexpect | |
class AsyncBluetoothctlTextProxy: | |
def __init__(self): | |
self.tty = pexpect.spawn("bluetoothctl", echo=False) | |
async def _send(self, command, wait=0): | |
""" Send a command to AsyncBluetoothctlTextProxy and return its output. | |
Raises a pexpect timeout exception if the prompt never shows up again. | |
""" | |
self.tty.send(command + "\n") | |
await asyncio.sleep(wait) | |
# We're waiting on the prompt to reappear-- specifically the close brace, ANSI code, and then hash | |
await self.tty.expect_exact(']\x1b[0m#', async_=True) | |
return self.tty.before.decode().split('\r\n') | |
def _parse_devices(self, raw_list): | |
""" Find lines that start with 'Device ', remove that, then return a tuple of the result split on the first space """ | |
return [tuple(x[7:].split(' ', 1)) for x in raw_list if 'Device ' == x[:7]] | |
async def _monitor_scan_stream(self): | |
""" Spin on the scanning stream and watch for [NEW] and [DEL] messages """ | |
while True: | |
line = await self.tty.readline(async_=True) | |
print('Have line: ', line) | |
async def teardown(self): | |
return await self.power_off() | |
async def power_on(self): | |
return await self._send('power on') | |
async def power_off(self): | |
return await self._send('power off') | |
async def agent_on(self): | |
return await self._send('agent on') | |
async def agent_off(self): | |
return await self._send('agent off') | |
async def default_agent(self): | |
return await self._send('default-agent') | |
async def scan_on(self): | |
return await self._send('scan on') | |
async def scan_off(self): | |
return await self._send('scan off') | |
async def discoverable_on(self): | |
return await self._send("discoverable on") | |
async def discoverable_off(self): | |
return await self._send("discoverable off") | |
async def devices(self): | |
out = await self._send("devices") | |
return self._parse_devices(out) | |
async def paired_devices(self): | |
out = await self._send("paired-devices") | |
return self._parse_devices(out) | |
async def discoverable_devices(self): | |
available = await self.devices() | |
paired = await self.paired_devices() | |
return [d for d in available if d not in paired] | |
async def info(self, mac_address): | |
""" Return a dict representation of the bluetoothctl info command. This dict has keys: | |
Name, Alias, Class, Icon, Paired, Trusted, Blocked, Connected, LegacyPairing, UUID, UUID, Modalias, | |
""" | |
info = await self._send("info " + mac_address) | |
info = [x[1:].split(' ', 1) for x in info if x[:1] == '\t'] | |
return {k: v for k, v in info} | |
async def pair(self, mac_address): | |
await self._send("pair " + mac_address, 4) | |
return await self.tty.expect(["Failed to pair", "Pairing successful", pexpect.EOF], async_=True) == 1 | |
async def remove(self, mac_address): | |
await self._send("remove " + mac_address, 3) | |
return await self.tty.expect(["not available", "Device has been removed", pexpect.EOF], async_=True) == 1 | |
async def connect(self, mac_address): | |
await self._send("connect " + mac_address, 2) | |
return await self.tty.expect(["Failed to connect", "Connection successful", pexpect.EOF], async_=True) == 1 | |
async def disconnect(self, mac_address): | |
await self._send("disconnect " + mac_address, 2) | |
return self.tty.expect(["Failed to disconnect", "Successful disconnected", pexpect.EOF], async_=True) == 1 | |
async def main(): | |
print('Starting async test') | |
b = AsyncBluetoothctlTextProxy() | |
# Turn everything on | |
await b.power_on() | |
await b.agent_on() | |
await b.scan_on() | |
# Wait for a while to let the scan populate | |
print('Scanning for 10 seconds') | |
await asyncio.sleep(10) | |
# Lis and print devices | |
print('Available devices:') | |
devices = await b.devices() | |
for mac, name in devices: | |
print(f' {mac} {name}') | |
# Example on how to pair-- in this case I know I'm pairing with an Xbox controller | |
controller = [mac for mac, name in devices if name == 'Xbox Wireless Controller'] | |
if len(controller) != 1: | |
return print('Xbox controller not found') | |
await b.connect(controller[0]) | |
print('Connected to device') | |
if __name__ == "__main__": | |
asyncio.get_event_loop().run_until_complete(main()) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment