-
-
Save castis/0b7a162995d0b465ba9c84728e60ec01 to your computer and use it in GitHub Desktop.
# Based on ReachView code from Egor Fedorov ([email protected]) | |
# Updated for Python 3.6.8 on a Raspberry Pi | |
import time | |
import pexpect | |
import subprocess | |
import sys | |
import logging | |
logger = logging.getLogger("btctl") | |
class Bluetoothctl: | |
"""A wrapper for bluetoothctl utility.""" | |
def __init__(self): | |
subprocess.check_output("rfkill unblock bluetooth", shell=True) | |
self.process = pexpect.spawnu("bluetoothctl", echo=False) | |
def send(self, command, pause=0): | |
self.process.send(f"{command}\n") | |
time.sleep(pause) | |
if self.process.expect(["bluetooth", pexpect.EOF]): | |
raise Exception(f"failed after {command}") | |
def get_output(self, *args, **kwargs): | |
"""Run a command in bluetoothctl prompt, return output as a list of lines.""" | |
self.send(*args, **kwargs) | |
return self.process.before.split("\r\n") | |
def start_scan(self): | |
"""Start bluetooth scanning process.""" | |
try: | |
self.send("scan on") | |
except Exception as e: | |
logger.error(e) | |
def make_discoverable(self): | |
"""Make device discoverable.""" | |
try: | |
self.send("discoverable on") | |
except Exception as e: | |
logger.error(e) | |
def parse_device_info(self, info_string): | |
"""Parse a string corresponding to a device.""" | |
device = {} | |
block_list = ["[\x1b[0;", "removed"] | |
if not any(keyword in info_string for keyword in block_list): | |
try: | |
device_position = info_string.index("Device") | |
except ValueError: | |
pass | |
else: | |
if device_position > -1: | |
attribute_list = info_string[device_position:].split(" ", 2) | |
device = { | |
"mac_address": attribute_list[1], | |
"name": attribute_list[2], | |
} | |
return device | |
def get_available_devices(self): | |
"""Return a list of tuples of paired and discoverable devices.""" | |
available_devices = [] | |
try: | |
out = self.get_output("devices") | |
except Exception as e: | |
logger.error(e) | |
else: | |
for line in out: | |
device = self.parse_device_info(line) | |
if device: | |
available_devices.append(device) | |
return available_devices | |
def get_paired_devices(self): | |
"""Return a list of tuples of paired devices.""" | |
paired_devices = [] | |
try: | |
out = self.get_output("paired-devices") | |
except Exception as e: | |
logger.error(e) | |
else: | |
for line in out: | |
device = self.parse_device_info(line) | |
if device: | |
paired_devices.append(device) | |
return paired_devices | |
def get_discoverable_devices(self): | |
"""Filter paired devices out of available.""" | |
available = self.get_available_devices() | |
paired = self.get_paired_devices() | |
return [d for d in available if d not in paired] | |
def get_device_info(self, mac_address): | |
"""Get device info by mac address.""" | |
try: | |
out = self.get_output(f"info {mac_address}") | |
except Exception as e: | |
logger.error(e) | |
return False | |
else: | |
return out | |
def pair(self, mac_address): | |
"""Try to pair with a device by mac address.""" | |
try: | |
self.send(f"pair {mac_address}", 4) | |
except Exception as e: | |
logger.error(e) | |
return False | |
else: | |
res = self.process.expect( | |
["Failed to pair", "Pairing successful", pexpect.EOF] | |
) | |
return res == 1 | |
def trust(self, mac_address): | |
try: | |
self.send(f"trust {mac_address}", 4) | |
except Exception as e: | |
logger.error(e) | |
return False | |
else: | |
res = self.process.expect( | |
["Failed to trust", "Pairing successful", pexpect.EOF] | |
) | |
return res == 1 | |
def remove(self, mac_address): | |
"""Remove paired device by mac address, return success of the operation.""" | |
try: | |
self.send(f"remove {mac_address}", 3) | |
except Exception as e: | |
logger.error(e) | |
return False | |
else: | |
res = self.process.expect( | |
["not available", "Device has been removed", pexpect.EOF] | |
) | |
return res == 1 | |
def connect(self, mac_address): | |
"""Try to connect to a device by mac address.""" | |
try: | |
self.send(f"connect {mac_address}", 2) | |
except Exception as e: | |
logger.error(e) | |
return False | |
else: | |
res = self.process.expect( | |
["Failed to connect", "Connection successful", pexpect.EOF] | |
) | |
return res == 1 | |
def disconnect(self, mac_address): | |
"""Try to disconnect to a device by mac address.""" | |
try: | |
self.send(f"disconnect {mac_address}", 2) | |
except Exception as e: | |
logger.error(e) | |
return False | |
else: | |
res = self.process.expect( | |
["Failed to disconnect", "Successful disconnected", pexpect.EOF] | |
) | |
return res == 1 |
@philx17 have you tried doing this manually via bluetoothctl
and it works? Just to confirm that your system can even handle multiple simultaneously connected devices.
yes i tried it and it works.... i run all i a loop and it works but if i use connect in a loop to connect multiple devices it stucks...
When i try it via bash it works to.... but with the python scrip, no chance =/ and i don't know why....
when i use bluetoothctl in the command line first it looks like [bluetooth] then it takes the name oft the last connected device is this maybe the problem ?
[Speaker1]
[EasyAcc-MC]
pi@Room:~ $ bluetoothctl
Agent registered
[bluetooth]# connect xx:69:C2:xx:93:3F
Attempting to connect to B8:69:C2:42:93:3F
[CHG] Device xx:69:C2:xx:93:3F Connected: yes
Connection successful
[CHG] Device xx:69:C2:xx:93:3F ServicesResolved: yes
[Speaker1]# connect xx:5A:5A:xx:A5:2B
Attempting to connect to 5B:5A:5A:A6:A5:2B
[CHG] Device xx:5A:5A:xx:A5:2B Connected: yes
Connection successful
[CHG] Device xx:5A:5A:xx:A5:2B ServicesResolved: yes
[EasyAcc-MC]#
[CHG] Device 5B:5A:5A:A6:A5:2B ServicesResolved: yes
[EasyAcc-MC]#
@NerdboyQ did you manage to provide pin during pairing procedure? I've tried to send pin code but there are some issues with 'expect' line
Can anybody help, if i want to connect to two devices the script runs into a Timeout exceeded.
First device will be connected and the second one brings the Timeout